当前位置:首页 > 科技  > 软件

解密 SSE,像 ChatGPT 一样返回流式响应

来源: 责编: 时间:2023-11-20 08:57:07 187观看
导读我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需要不断地轮询,所以最好的做法是服务端生成结

我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需要不断地轮询,所以最好的做法是服务端生成结果之后,主动推送给客户端。YcZ28资讯网——每日最新资讯28at.com

比如 ChatGPT,它在生成内容时,也是生成一部分,就主动向客户端推送一部分。而在这个过程中,客户端不需要做任何事情,只需等待 ChatGPT 服务端返回内容即可。YcZ28资讯网——每日最新资讯28at.com

说到这儿,你肯定想到了 WebSocket,没错这是一种解决方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的应用层传输协议,只不过在握手的时候搭了 HTTP 的便车,利用 HTTP 本身的协议升级特性,伪装成 HTTP,这样就能绕过浏览器沙箱、网络防火墙等限制。YcZ28资讯网——每日最新资讯28at.com

当完成握手之后,后续传输的数据就不再是 HTTP 报文,而是 WebSocket 格式的二进制帧。所以这两者完全是不同的协议,那有没有一种办法,我们仍然使用 HTTP 协议,同时还能让服务端主动推送数据呢?YcZ28资讯网——每日最新资讯28at.com

答案是有的,也就是本文将要介绍的 SSE 技术,它的英文全称是 Server-Sent Events(服务端推送事件)。通过 SSE 可以让服务端即时推送数据到客户端,而不需要客户端轮询服务端以获取更新。YcZ28资讯网——每日最新资讯28at.com

图片图片YcZ28资讯网——每日最新资讯28at.com

到这你可能会问,那 WebSocket 和 SSE 有什么区别呢?YcZ28资讯网——每日最新资讯28at.com

1)通信方式

WebSocket 提供全双工通信,服务端和客户端都可以在同一个连接上同时发送和接收数据。最重要的是,WebSocket 独立于 HTTP 协议,尽管它开始于一个 HTTP 握手。YcZ28资讯网——每日最新资讯28at.com

SSE 仅提供服务端到客户端的单向通信,客户端不能通过 SSE 给服务端发信息。YcZ28资讯网——每日最新资讯28at.com

2)协议和实现

WebSocket 使用自己的协议(ws:// 或 wss://),需要服务端和客户端都支持,并且协议比较复杂。YcZ28资讯网——每日最新资讯28at.com

SSE 则是使用标准的 HTTP 协议,实现起来更简单,尤其是在服务端。YcZ28资讯网——每日最新资讯28at.com

3)适用场景

WebSocket 适用于服务端和客户端之间双向实时通信的场景,如在线游戏、聊天应用等。YcZ28资讯网——每日最新资讯28at.com

SSE 适用于服务端向客户端单向推送数据的场景,如消息通知、数据更新。并且 SSE 自动支持断线重连,而 WebSocket 则需要额外部署。YcZ28资讯网——每日最新资讯28at.com

4)复杂性和资源使用

WebSocket 由于其双向通信的能力,通常比 SSE 更复杂,可能需要更多的资源来维护和管理连接。YcZ28资讯网——每日最新资讯28at.com

SSE 因为其单向性和基于 HTTP 的特性,它可以利用现有的网络基础设施,如代理服务器、负载均衡器和防火墙等等,通常更容易实现和维护。YcZ28资讯网——每日最新资讯28at.com


YcZ28资讯网——每日最新资讯28at.com

相信现在你已经明白 SSE 是做什么的了,它的目的就是让服务端能够主动推送数据给客户端。如果不需要和服务端动态交互,只是希望服务端在有数据的时候推过来,那么 WebSocket 就有些太重了,因为这意味着要替换 HTTP 协议,而使用 SSE 无疑是更好的选择。YcZ28资讯网——每日最新资讯28at.com

SSE 是什么我们已经知道了,那它是怎么实现的呢?原理是什么呢?YcZ28资讯网——每日最新资讯28at.com

1)建立连接

客户端发起一个标准的 HTTP 请求来开启 SSE 会话,这个请求的特殊之处在于它包含一个头字段。YcZ28资讯网——每日最新资讯28at.com

Accept: text/event-stream

相当于客户端告诉服务端,期望接收 SSE 消息流。而服务端在看到该字段时,也知道这是一个 SSE 请求,于是立即向客户端返回响应头,注意:返回的只有响应头,里面会包含如下头字段。YcZ28资讯网——每日最新资讯28at.com

Content-Type: text/event-stream

响应头返回之后标志着 SSE 连接成功建立,并且连接会保持开放状态,服务端后续可以随时通过此连接向客户端发送数据。此外当连接不小心断开时,客户端也会自动进行重连。YcZ28资讯网——每日最新资讯28at.com

所以在普通的 HTTP 请求中,一旦服务端返回,那么请求结束了。虽然可以将 Connection 头字段设置为 keep-alive 保证连接不断开,但每次访问都包含了 HTTP 请求/响应的完整过程。YcZ28资讯网——每日最新资讯28at.com

而在 SSE 中,服务端会保持一个开放的连接,只要有新数据可用,就会直接发送给客户端。所以服务端会将响应以流的形式发送给客户端,每次发送的消息都是响应流的一部分,而不是独立的 HTTP 响应。YcZ28资讯网——每日最新资讯28at.com

因此 SSE 的服务端在发送数据时,并不遵循传统的一次请求,一次响应模式。它在建立连接之后会保持连接开放,并通过这个持续的连接流式地发送数据,这种方式就使得 SSE 非常适合实时数据推送的场景。YcZ28资讯网——每日最新资讯28at.com

2)发送消息

客户端发送请求,服务端返回响应头之后,SSE 连接就建立成功了。此时客户端只需要躺平,安静地等待服务端的输出即可。所以现在的关键就在于服务端要返回什么格式的数据呢?很简单,一个基本的消息由以下几部分组成:YcZ28资讯网——每日最新资讯28at.com

  • data:实际的消息数据;
  • id:可选,消息的唯一标识符,用于在连接重新建立时同步消息;
  • event:可选,定义事件类型,用于客户端区分消息的类型;
  • retry:可选,自动重连的时间(毫秒),如果连接中断,客户端在自动重新连接之前,需要等待多长时间;

注意:每个消息要以两个换行符(/n/n)结束,举个例子,我们发送一个 Hello World。YcZ28资讯网——每日最新资讯28at.com

data: Hello World/n/n

也可以发送带有事件类型的消息:YcZ28资讯网——每日最新资讯28at.com

event: userUpdatedata: {"username": "Serpen", "age": 18}/n/n

还是比较简单的,服务端可以保持连接并随时发送更多数据。然后客户端在收到时会进行处理,但不需要(也不能)对服务端作出任何回应,它只需要被动地接收来自服务端的数据即可。当服务端认为数据已经全部发送完毕、无需再发时,那么便可以主动断开连接。YcZ28资讯网——每日最新资讯28at.com

关于 SSE 的原理我们就解释清楚了,下面来实际编程实现它,这里我们先使用原生的 asyncio 实现 SSE。YcZ28资讯网——每日最新资讯28at.com

import asynciofrom asyncio import StreamReader, StreamWriterclass SSE:    def __init__(self, host="0.0.0.0", port=9999):        self.host = host        self.port = port    @staticmethod    def parse_request_headers(data: bytes) -> dict:        """        此函数负责从原始字节流中解析出请求头        """        headers = data.split(b"/r/n/r/n")[0].split(b"/r/n")        header_dict = {}        for header in headers[1:]:            key, val = header.decode("utf-8").split(":", 1)            header_dict[key.lower()] = val.strip()        return header_dict    async def handler_requests(self,                               reader: StreamReader,                               writer: StreamWriter):        """        负责处理来自客户端的请求        每来一个客户端连接,就会基于此函数创建一个协程        并且自动传递两个参数:reader 和 writer        reader.read  负责读取数据,等价于 socket.recv        writer.write 负责发送数据,等价于 socket.send        """        # 获取客户端的请求报文,这里对请求方法、请求地址不做限制        data = await reader.readuntil(b"/r/n/r/n")        # 解析出请求头        request_headers = self.parse_request_headers(data)        # 简单检测一下 accept 字段,如果不是建立 SSE,那么直接关闭连接        if request_headers.get("accept") != "text/event-stream":            writer.close()            return await writer.wait_closed()        # 如果是 SSE 连接,那么返回响应头        response_header = (            b"HTTP/1.1 200 OK/r/n"            b"Content-Type: text/event-stream/r/n"            b"Cache-Control: no-cache/r/n"            b"Connection: keep-alive/r/n"            b'Access-Control-Allow-Origin: */r/n'            b"/r/n"        )        writer.write(response_header)        await writer.drain()        # 然后便可以不断地向客户端返回数据了        for _ in range(5):            # 每隔 1 秒返回数据            data = "data: 高老师总能分享出好东西/r/n/r/n".encode("utf-8")            writer.write(data)            await writer.drain()            await asyncio.sleep(1)        # 数据传输完毕        writer.close()        await writer.wait_closed()    async def __create_server(self):        # 创建服务,第一个参数是一个回调函数        # 当连接过来的时候就会根据此函数创建一个协程        # 后面是绑定的 ip 和 端口        server = await asyncio.start_server(self.handler_requests,                                            self.host,                                            self.port)        # 然后开启无限循环        async with server:            await server.serve_forever()    def run_server(self):        loop = asyncio.get_event_loop()        loop.run_until_complete(self.__create_server())if __name__ == '__main__':    sse = SSE()    sse.run_server()

服务端代码编写完毕,下面编写前端代码。YcZ28资讯网——每日最新资讯28at.com

<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <title>Title</title>    <style>        #data {            font-weight: bold;            color: cadetblue;            font-size: large;        }    </style></head><body>    <h1>SSE Test</h1>    <div id="data"></div>    <script>        document.addEventListener("DOMContentLoaded", function () {            // 和服务端建立 SSE 连接            var eventSource = new EventSource("http://localhost:9999");            eventSource.onmessage = function (e) {                // 将数据渲染在 <div id="data"></div> 的内部                var data = e.data + "/n";                document.getElementById('data').innerText += data;            };            eventSource.onerror = function (e) {                console.error('Error occurred:', e);                eventSource.close();            };        });    </script></body></html>

代码编写完毕,我们用浏览器打开 HTML 文件,便可看到如下效果。YcZ28资讯网——每日最新资讯28at.com

图片图片YcZ28资讯网——每日最新资讯28at.com

以上我们就简单实现了 SSE,当然为了加深印象,这里的后端是使用原生的 asyncio 编写的,但在工作中,我们会使用现成的 Web 框架,比如 FastAPI,Blacksheep 等等。YcZ28资讯网——每日最新资讯28at.com

需要说明的是,虽然通过 SSE 技术可以实现类似 ChatGPT 的效果,但 ChatGPT 内部并没有用到 SSE,它内部是基于 HTTP 的分块传输实现的。因为 SSE 只能通过 GET 请求发出,并且无法自定义请求头。YcZ28资讯网——每日最新资讯28at.com

如果想实现 ChatGPT 的效果,需要使用 HTTP 的分块传输。而像 FastAPI、BlackSheep 等框架提供的流式响应,便是基于 HTTP 的分块传输实现的,比如 FastAPI:YcZ28资讯网——每日最新资讯28at.com

import asynciofrom fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom fastapi.middleware.cors import CORSMiddlewareimport uvicornapp = FastAPI()app.add_middleware(    CORSMiddleware,    allow_origins=["*"],    allow_methods=["*"],    allow_headers=["*"],)async def event_generator():    for _ in range(5):        # 每隔 1 秒返回数据        data = "data: 高老师总能分享出好东西/r/n/r/n".encode("utf-8")        yield data        await asyncio.sleep(1)@app.get("/")async def sse():    return StreamingResponse(event_generator(),                              media_type="text/event-stream")if __name__ == '__main__':    uvicorn.run(app, host="0.0.0.0", port=9999)

首先之前的前端代码依旧可以正常访问,通过修改数据格式和 Content-Type 可以让其支持 SSE。但最正确的做法是直接访问 localhost:9999,效果如下:YcZ28资讯网——每日最新资讯28at.com

图片图片YcZ28资讯网——每日最新资讯28at.com

所以基于 StreamingResponse 可以实现 SSE,也可以直接访问。而直接访问的话,此时里面的 data: 和 /r/n 就是实体数据的一部分。并且这种方式和 ChatGPT 的工作机制是相似的,都使用了 HTTP 的分块传输,支持所有的请求方法,而 SSE 只支持 GET 请求。YcZ28资讯网——每日最新资讯28at.com

BlackSheep 也是类似的,它同样也支持流式响应。YcZ28资讯网——每日最新资讯28at.com

import asynciofrom blacksheep import Application, Response, StreamedContentimport uvicornapp = Application()app.use_cors(    allow_origins=["*"],    allow_methods=["*"],    allow_headers=["*"],)async def event_generator():    for _ in range(5):        # 每隔 1 秒返回数据        data = "data: 高老师总能分享出好东西/r/n/r/n".encode("utf-8")        yield data        await asyncio.sleep(1)@app.router.get("/")async def sse():    return Response(        200,        cnotallow=StreamedContent(b"text/event-stream", event_generator),    )if __name__ == '__main__':    uvicorn.run(app, host="0.0.0.0", port=9999)

可以测试一下,效果是一样的。如果你不想实现 SSE,只是希望固定的数据以流的形式一点一点返回,那么记得将数据中多余的 data: 和 /r/n 给去掉,并最好修改 Content-Type 为合适的类型。YcZ28资讯网——每日最新资讯28at.com

所以 SSE 一般用于需要服务端推数据,但数据不知道什么时候会过来,于是通过 SSE 保持连接开放。后续当服务端有数据了,直接通过连接发送给客户端即可。YcZ28资讯网——每日最新资讯28at.com

而 FastAPI 和 BlackSheep 提供的流式响应更像是,返回的数据比较庞大,如果全部准备好再一次性返回,会让用户陷入长时间的等待,造成不好的体验。于是通过分块传输,准备好一部分就返回一部分。虽然整体时间没变,但可以让用户立刻获取到数据,从而提升用户体验。YcZ28资讯网——每日最新资讯28at.com

比如 ChatGPT,当它回答的内容比较多的时候,那么整个过程耗费几十秒钟是常有的事情,假设 30 秒。相比让用户等待 30 秒,然后内容一下子刷出来,显然生成一部分返回一部分这种方式更让人喜欢。YcZ28资讯网——每日最新资讯28at.com

因此使用 SSE 还是流式响应,则取决于你当前的业务。如果你返回的数据是确定的,只是准备的时间比较长,或者数据量比较大,那么推荐使用流式响应。YcZ28资讯网——每日最新资讯28at.com

至于 SSE,在这些现成的 Web 框架里面,也可以通过流式响应来实现,只需要将 Content-Type 设置为 text/event-stream,并将数据加上前缀 data: 和后缀 /r/n/r/n。YcZ28资讯网——每日最新资讯28at.com

但说实话,如果想实现 SSE,不建议通过流式响应来实现,而是使用专门的库。以 FastAPI 为例:YcZ28资讯网——每日最新资讯28at.com

from sse_starlette.sse import EventSourceResponse

FastAPI 其实就是在 starlette 的基础上套了一层壳,通过安装 sse_starlette 可以让 FastAPI 更好地支持 SSE。YcZ28资讯网——每日最新资讯28at.com

以上就是本文的内容,如果对你有帮助,就点个赞吧。YcZ28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-30985-0.html解密 SSE,像 ChatGPT 一样返回流式响应

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: Spring Boot中实现订单30分钟自动取消的策略思路及源代码

下一篇: Vue3问题:如何实现微信扫码授权登录?

标签:
  • 热门焦点
  • 线程通讯的三种方法!通俗易懂

    线程通讯的三种方法!通俗易懂

    线程通信是指多个线程之间通过某种机制进行协调和交互,例如,线程等待和通知机制就是线程通讯的主要手段之一。 在 Java 中,线程等待和通知的实现手段有以下几种方式:Object 类下
  • Rust中的高吞吐量流处理

    Rust中的高吞吐量流处理

    作者 | Noz编译 | 王瑞平本篇文章主要介绍了Rust中流处理的概念、方法和优化。作者不仅介绍了流处理的基本概念以及Rust中常用的流处理库,还使用这些库实现了一个流处理程序
  • CSS单标签实现转转logo

    CSS单标签实现转转logo

    转转品牌升级后更新了全新的Logo,今天我们用纯CSS来实现转转的新Logo,为了有一定的挑战性,这里我们只使用一个标签实现,将最大化的使用CSS能力完成Logo的绘制与动画效果。新logo
  • Java NIO内存映射文件:提高文件读写效率的优秀实践!

    Java NIO内存映射文件:提高文件读写效率的优秀实践!

    Java的NIO库提供了内存映射文件的支持,它可以将文件映射到内存中,从而可以更快地读取和写入文件数据。本文将对Java内存映射文件进行详细的介绍和演示。内存映射文件概述内存
  • 微博大门常打开,迎接海外画师漂洋东渡

    微博大门常打开,迎接海外画师漂洋东渡

    作者:互联网那些事&ldquo;起猛了,我能看得懂日语了&rdquo;。&ldquo;为什么日本人说话我能听懂?&rdquo;&ldquo;中文不像中文,日语不像日语,但是我竟然看懂了&rdquo;&hellip;&hell
  • 网传小米汽车开始筛选交付中心 建筑面积不低于3000平方米

    网传小米汽车开始筛选交付中心 建筑面积不低于3000平方米

    7月7日消息,近日有微博网友@长三角行健者爆料称,据经销商集团反馈,小米汽车目前已经开始了交付中心的筛选工作,要求候选场地至少有120个车位,建筑不能低
  • iQOO Neo8系列新品发布会

    iQOO Neo8系列新品发布会

    旗舰双芯 更强更Pro
  • 荣耀Magicbook V 14 2021曙光蓝版本正式开售,拥有触摸屏

    荣耀Magicbook V 14 2021曙光蓝版本正式开售,拥有触摸屏

    荣耀 Magicbook V 14 2021 曙光蓝版本正式开售,搭载 i7-11390H 处理器与 MX450 显卡,配备 16GB 内存与 512GB SSD,重 1.48kg,厚 14.5mm,具有 1.5mm 键盘键程、
  • 苹果MacBook Pro 2021测试:仍不支持平滑滚动

    苹果MacBook Pro 2021测试:仍不支持平滑滚动

    据10月30日9to5 Mac 消息报道,苹果新的 14 英寸和 16 英寸 MacBook Pro 2021 上市后获得了不错的评价,亮点包括行业领先的性能,令人印象深刻的电池续航,精美丰
Top