searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

如何通过python调用sse接口

2024-07-01 03:27:03
667
0

一、什么是SSE

SSE(Server-Sent Events)是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(Event Stream)。它基于 HTTP 协议,利用了其长连接特性,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送,目前业界内的AI大模型的前后端调用方法就是通过SSE接口的方式吐出流式数据;

二、如何通过python调用SSE接口

可以使用requests库,或者直接使用aiohttp库进行异步处理。

1、使用aiohttp的示例代码:

import asyncio
from aiohttp import ClientSession
 
async def main():
    async with ClientSession() as session:
        async with session.get('your url', timeout=60) as response:
            assert response.content_type == 'text/event-stream'
 
            async for line in response.content:
                decoded_line = line.decode().strip()
                if decoded_line:
                    print(decoded_line)
 
asyncio.run(main())

以上代码会建立一个与服务器的连接,并且接收服务器发送的事件。每个接收到的行将会打印出来,所以在接收到每一行的数据流信息的时候,可以针对收到的信息进行预处理,然后判断数据流中的输出信息,进行业务逻辑处理;

2、使用requests库

使用request库,且同时请求方法中设置stream为true,则表示输出为流式

import json
import requests
import time

def listen_sse(url):
    # 发送GET请求到SSE端点
    while True:
        with requests.get(your url, stream=True, timeout=20) as response:
            try:
                # 确保请求成功
                response.raise_for_status()

                # 逐行读取响应内容
                for line in response.iter_lines():
                    if line:
                        line = line.decode('utf-8')
                        print(line)
            except requests.exceptions.HTTPError as err:
                print(f"HTTP error: {err}")
            except Exception as err:
                print(f"An error occurred: {err}")
                return

if __name__ == "__main__":

     sse_url = 'your url'
     listen_sse(sse_url)

 

0条评论
作者已关闭评论
汪****翠
6文章数
0粉丝数
汪****翠
6 文章 | 0 粉丝
原创

如何通过python调用sse接口

2024-07-01 03:27:03
667
0

一、什么是SSE

SSE(Server-Sent Events)是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(Event Stream)。它基于 HTTP 协议,利用了其长连接特性,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送,目前业界内的AI大模型的前后端调用方法就是通过SSE接口的方式吐出流式数据;

二、如何通过python调用SSE接口

可以使用requests库,或者直接使用aiohttp库进行异步处理。

1、使用aiohttp的示例代码:

import asyncio
from aiohttp import ClientSession
 
async def main():
    async with ClientSession() as session:
        async with session.get('your url', timeout=60) as response:
            assert response.content_type == 'text/event-stream'
 
            async for line in response.content:
                decoded_line = line.decode().strip()
                if decoded_line:
                    print(decoded_line)
 
asyncio.run(main())

以上代码会建立一个与服务器的连接,并且接收服务器发送的事件。每个接收到的行将会打印出来,所以在接收到每一行的数据流信息的时候,可以针对收到的信息进行预处理,然后判断数据流中的输出信息,进行业务逻辑处理;

2、使用requests库

使用request库,且同时请求方法中设置stream为true,则表示输出为流式

import json
import requests
import time

def listen_sse(url):
    # 发送GET请求到SSE端点
    while True:
        with requests.get(your url, stream=True, timeout=20) as response:
            try:
                # 确保请求成功
                response.raise_for_status()

                # 逐行读取响应内容
                for line in response.iter_lines():
                    if line:
                        line = line.decode('utf-8')
                        print(line)
            except requests.exceptions.HTTPError as err:
                print(f"HTTP error: {err}")
            except Exception as err:
                print(f"An error occurred: {err}")
                return

if __name__ == "__main__":

     sse_url = 'your url'
     listen_sse(sse_url)

 

文章来自个人专栏
测试技术分享
6 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0