我有一个相对简单的FastAPI应用程序,它接受一个查询并从ChatGPT的API流回响应。ChatGPT正在流回结果,我可以看到它在输入时被打印到控制台。
不起作用的是通过FastAPI返回的StreamingResponse
。响应被一起发送。我真的不知道为什么这不起作用。
这是FastAPI应用程序代码:
import os
import time
import openai
import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
auth_scheme = HTTPBearer()
app = fastapi.FastAPI()
openai.api_key = os.environ["OPENAI_API_KEY"]
def ask_statesman(query: str):
#prompt = router(query)
completion_reason = None
response = ""
while not completion_reason or completion_reason == "length":
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True,
)
for line in openai_stream:
completion_reason = line["choices"][0]["finish_reason"]
if "content" in line["choices"][0].delta:
current_response = line["choices"][0].delta.content
print(current_response)
yield current_response
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
if auth_key != "123":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": auth_scheme.scheme_name},
)
else:
stream_response = ask_statesman(query)
return StreamingResponse(stream_response, media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
这是一个非常简单的test.py
文件来测试这个:
import requests
query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}
response = requests.post(url, params=params, stream=True)
for chunk in response.iter_lines():
if chunk:
print(chunk.decode("utf-8"))
首先,使用POST
请求从服务器请求数据不是好的做法。使用GET
请求更适合您的情况。除此之外,您不应该发送凭据,例如auth_key
作为URL的一部分(即使用查询字符串),而是应该使用Headers
和/或Cookies
(使用HTTPS
)。请查看此答案,了解有关标头和cookie概念的更多详细信息和示例,以及使用查询参数时涉及的风险。关于这个主题的有用帖子也可以在这里和这里找到,也可以在这里、这里和这里找到。
其次,如果您在StreamingResponse
的生成器函数中执行阻塞操作(即I/O绑定或CPU绑定任务),您应该使用def
(就像您现在所做的那样)而不是async def
来定义函数,否则阻塞操作以及生成器中使用的time.睡眠()
函数将阻止事件循环。正如这里所解释的,如果用于流式传输响应体的函数是普通的生成器/迭代器(即def
)而不是异步def,FastAPI将使用iterate_in_threadpool()
在单独的线程中运行迭代器/生成器,然后await
ed。有关更多详细信息,请查看此详细答案。
第三,您正在使用请求
'iter_lines()
函数,该函数一次遍历响应数据一行。但是,如果响应数据不包含任何换行符(即\n
),您将不会看到客户端控制台上的数据在到达时被打印出来,直到客户端接收到整个响应并作为一个整体打印出来。在这种情况下,您应该使用iter_content()
并根据需要指定chunk_size
(两种情况都在下面的示例中演示)。
最后,如果您希望StreamingResponse
在每个浏览器中工作(包括Chrome)——从能够在数据流入时看到数据的意义上来说——您应该将media_type
指定为与text/普通
不同的类型(例如,text/event-stream
),或者禁用MIME嗅探。如本文所述,浏览器将开始缓冲一定数量的text/普通
响应(大约1445字节,如本文所述),以检查接收到的内容是否实际上是普通文本。为了避免这种情况,您可以将media_type
设置为text/event-stream
(用于服务器发送的事件),或者继续使用text/普通
,但将X-Content-Type-Options
响应标头设置为nosmff
,这将禁用MIME嗅探(这两个选项都在下面的示例中演示)。
app.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
def fake_data_streamer():
for i in range(10):
yield b'some fake data\n\n'
time.sleep(0.5)
@app.get('/')
async def main():
return StreamingResponse(fake_data_streamer(), media_type='text/event-stream')
# or, use:
#headers = {'X-Content-Type-Options': 'nosniff'}
#return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain')
test.py(使用Python请求
)
import requests
url = "http://localhost:8000/"
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(1024): # or, for line in r.iter_lines():
if chunk:
print(chunk)
test.py(使用httpx
-查看这个,以及这个和这个,了解使用httpx
而不是请求
的好处)
import httpx
url = 'http://127.0.0.1:8000/'
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw(): # or, for line in r.iter_lines():
print(chunk)
ask_statesman
函数中,将收益current_response
语句更改为收益{"data":current_response}
。这将使用"data"
键将每个响应行包装在字典中。request_handler
函数中,不是直接返回stream_response
,而是返回一个生成器表达式,该表达式生成包含在字典中的ask_statesman
的每个响应行,如上所示。这是修改后的代码:import os
import time
import openai
import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
auth_scheme = HTTPBearer()
app = fastapi.FastAPI()
openai.api_key = os.environ["OPENAI_API_KEY"]
def ask_statesman(query: str):
#prompt = router(query)
completion_reason = None
response = ""
while not completion_reason or completion_reason == "length":
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True,
)
for line in openai_stream:
completion_reason = line["choices"][0]["finish_reason"]
if "content" in line["choices"][0].delta:
current_response = line["choices"][0].delta.content
print(current_response)
yield {"data": current_response}
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
if auth_key != "123":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": auth_scheme.scheme_name},
)
else:
return StreamingResponse((line for line in ask_statesman(query)), media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
如果您选择使用Langchain与OpenAI交互(我强烈推荐),它提供了流方法,它有效地返回了一个生成器。
对上面Chris的代码稍作修改,
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.llms import OpenAI
llm = OpenAI(
streaming=True,
verbose=True,
temperature=0,
)
app = FastAPI()
def chat_gpt_streamer(query: str):
for resp in llm.stream(query):
yield resp["choices"][0]["text"]
@app.get('/streaming/ask')
async def main(query: str):
return StreamingResponse(chat_gpt_streamer(query), media_type='text/event-stream')
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
同样,您可以使用httpx或请求进行测试(再次从Chris的代码中复制粘贴):
import httpx
url = 'http://127.0.0.1:8000/streaming/ask?query=How are you, write in 10 sentences'
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw(): # or, for line in r.iter_lines():
print(chunk)