一、什么是SSE
SSE(Server-Sent Events)是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(Event Stream)。它基于 HTTP 协议,利用了其长连接特性,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送,目前业界内的AI大模型的前后端调用方法就是通过SSE接口的方式吐出流式数据;
二、如何通过python调用SSE接口
可以使用requests库,或者直接使用aiohttp库进行异步处理。
1、使用aiohttp的示例代码:
import asyncio
from aiohttp import ClientSession
async def main():
async with ClientSession() as session:
async with session.get('your url', timeout=60) as response:
assert response.content_type == 'text/event-stream'
async for line in response.content:
decoded_line = line.decode().strip()
if decoded_line:
print(decoded_line)
asyncio.run(main())
以上代码会建立一个与服务器的连接,并且接收服务器发送的事件。每个接收到的行将会打印出来,所以在接收到每一行的数据流信息的时候,可以针对收到的信息进行预处理,然后判断数据流中的输出信息,进行业务逻辑处理;
2、使用requests库
使用request库,且同时请求方法中设置stream为true,则表示输出为流式
import json
import requests
import time
def listen_sse(url):
# 发送GET请求到SSE端点
while True:
with requests.get(your url, stream=True, timeout=20) as response:
try:
# 确保请求成功
response.raise_for_status()
# 逐行读取响应内容
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
print(line)
except requests.exceptions.HTTPError as err:
print(f"HTTP error: {err}")
except Exception as err:
print(f"An error occurred: {err}")
return
if __name__ == "__main__":
sse_url = 'your url'
listen_sse(sse_url)