提问者:小点点

FastAPI StreamingResponse不使用生成器函数流式传输


我有一个相对简单的FastAPI应用程序,它接受一个查询并从ChatGPT的API流回响应。ChatGPT正在流回结果,我可以看到它在输入时被打印到控制台。

不起作用的是通过FastAPI返回的StreamingResponse。响应被一起发送。我真的不知道为什么这不起作用。

这是FastAPI应用程序代码:

import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield current_response
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        stream_response = ask_statesman(query)
        return StreamingResponse(stream_response, media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")

这是一个非常简单的test.py文件来测试这个:

import requests

query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}

response = requests.post(url, params=params, stream=True)

for chunk in response.iter_lines():
    if chunk:
        print(chunk.decode("utf-8"))

共3个答案

匿名用户

首先,使用POST请求从服务器请求数据不是好的做法。使用GET请求更适合您的情况。除此之外,您不应该发送凭据,例如auth_key作为URL的一部分(即使用查询字符串),而是应该使用Headers和/或Cookies(使用HTTPS)。请查看此答案,了解有关标头和cookie概念的更多详细信息和示例,以及使用查询参数时涉及的风险。关于这个主题的有用帖子也可以在这里和这里找到,也可以在这里、这里和这里找到。

其次,如果您在StreamingResponse的生成器函数中执行阻塞操作(即I/O绑定或CPU绑定任务),您应该使用def(就像您现在所做的那样)而不是async def来定义函数,否则阻塞操作以及生成器中使用的time.睡眠()函数将阻止事件循环。正如这里所解释的,如果用于流式传输响应体的函数是普通的生成器/迭代器(即def)而不是异步def,FastAPI将使用iterate_in_threadpool()在单独的线程中运行迭代器/生成器,然后awaited。有关更多详细信息,请查看此详细答案。

第三,您正在使用请求'iter_lines()函数,该函数一次遍历响应数据一行。但是,如果响应数据不包含任何换行符(即\n),您将不会看到客户端控制台上的数据在到达时被打印出来,直到客户端接收到整个响应并作为一个整体打印出来。在这种情况下,您应该使用iter_content()并根据需要指定chunk_size(两种情况都在下面的示例中演示)。

最后,如果您希望StreamingResponse在每个浏览器中工作(包括Chrome)——从能够在数据流入时看到数据的意义上来说——您应该将media_type指定为与text/普通不同的类型(例如,text/event-stream),或者禁用MIME嗅探。如本文所述,浏览器将开始缓冲一定数量的text/普通响应(大约1445字节,如本文所述),以检查接收到的内容是否实际上是普通文本。为了避免这种情况,您可以将media_type设置为text/event-stream(用于服务器发送的事件),或者继续使用text/普通,但将X-Content-Type-Options响应标头设置为nosmff,这将禁用MIME嗅探(这两个选项都在下面的示例中演示)。

app.py

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time


app = FastAPI()


def fake_data_streamer():
    for i in range(10):
        yield b'some fake data\n\n'
        time.sleep(0.5)


@app.get('/')
async def main():
    return StreamingResponse(fake_data_streamer(), media_type='text/event-stream')
    # or, use:
    #headers = {'X-Content-Type-Options': 'nosniff'}
    #return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain')

test.py(使用Python请求

import requests

url = "http://localhost:8000/"

with requests.get(url, stream=True) as r:
    for chunk in r.iter_content(1024):  # or, for line in r.iter_lines():
        if chunk:
            print(chunk)

test.py(使用httpx-查看这个,以及这个和这个,了解使用httpx而不是请求的好处)

import httpx

url = 'http://127.0.0.1:8000/'
with httpx.stream('GET', url) as r:
    for chunk in r.iter_raw():  # or, for line in r.iter_lines():
        print(chunk)

匿名用户

  • ask_statesman函数中,将收益current_response语句更改为收益{"data":current_response}。这将使用"data"键将每个响应行包装在字典中。
  • request_handler函数中,不是直接返回stream_response,而是返回一个生成器表达式,该表达式生成包含在字典中的ask_statesman的每个响应行,如上所示。这是修改后的代码:
import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield {"data": current_response}
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        return StreamingResponse((line for line in ask_statesman(query)), media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")

匿名用户

如果您选择使用Langchain与OpenAI交互(我强烈推荐),它提供了流方法,它有效地返回了一个生成器。

对上面Chris的代码稍作修改,

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.llms import OpenAI


llm = OpenAI(
    streaming=True,
    verbose=True,
    temperature=0,
)

app = FastAPI()


def chat_gpt_streamer(query: str):
    for resp in llm.stream(query):
        yield resp["choices"][0]["text"]


@app.get('/streaming/ask')
async def main(query: str):
    return StreamingResponse(chat_gpt_streamer(query), media_type='text/event-stream')

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")

同样,您可以使用httpx或请求进行测试(再次从Chris的代码中复制粘贴):

import httpx

url = 'http://127.0.0.1:8000/streaming/ask?query=How are you, write in 10 sentences'
with httpx.stream('GET', url) as r:
    for chunk in r.iter_raw():  # or, for line in r.iter_lines():
        print(chunk)