searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

使用 redis stream 作为消息队列

2023-07-24 02:32:52
66
0

通常使用 redis 用 pub / sub 可以实现消息队列,这里介绍使用 redis stream 实现消息队列的方式。

Redis stream 是一种数据结构,类似于 append only log,不过 redis 对于 redis stream 实现了丰富操作,以克服典型 append only log 的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,比如消费者组。您可以使用流来记录并实时地传播事件。Redis stream的使用案例包括:

  • 事件溯源(例如跟踪用户操作,点击等)
  • 传感器监控(例如从现场设备中读取数据)
  • 通知(例如在单独的流中存储每个用户的通知记录)


Redis 为每个 stream 入口生成唯一ID。您可以使用这些 ID 以后检索它们关联的记录,或者读取和处理流中的所有后续记录。

Redis stream 支持多种修剪策略(以防止流无限增长),以及多个消费策略(参见XREAD,XREADGROUP和XRANGE)。

Redis stream 常用的操作有:

  • XADD:往 stream 添加数据
  • XRANGE:从指定的开始和结束的时间点,读取该区间的数据
  • XREAD:从 stream 获取 1 条 / 多条数据
  • XLEN:获取 stream 的队列长度

一个 python 使用 stream 的例子

producer.py

import redis
from time import time

r = redis.Redis()
r.ping()

stream_key = "skey"

for i in range(0,10):
    r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")

consumer.py

import redis
import time

r = redis.Redis()
stream_key = "skey"

while True:
    l = r.xread(count=2, streams={stream_key: 0})
    print(l)
    time.sleep(10)
0条评论
0 / 1000
fatelei
6文章数
0粉丝数
fatelei
6 文章 | 0 粉丝
fatelei
6文章数
0粉丝数
fatelei
6 文章 | 0 粉丝
原创

使用 redis stream 作为消息队列

2023-07-24 02:32:52
66
0

通常使用 redis 用 pub / sub 可以实现消息队列,这里介绍使用 redis stream 实现消息队列的方式。

Redis stream 是一种数据结构,类似于 append only log,不过 redis 对于 redis stream 实现了丰富操作,以克服典型 append only log 的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,比如消费者组。您可以使用流来记录并实时地传播事件。Redis stream的使用案例包括:

  • 事件溯源(例如跟踪用户操作,点击等)
  • 传感器监控(例如从现场设备中读取数据)
  • 通知(例如在单独的流中存储每个用户的通知记录)


Redis 为每个 stream 入口生成唯一ID。您可以使用这些 ID 以后检索它们关联的记录,或者读取和处理流中的所有后续记录。

Redis stream 支持多种修剪策略(以防止流无限增长),以及多个消费策略(参见XREAD,XREADGROUP和XRANGE)。

Redis stream 常用的操作有:

  • XADD:往 stream 添加数据
  • XRANGE:从指定的开始和结束的时间点,读取该区间的数据
  • XREAD:从 stream 获取 1 条 / 多条数据
  • XLEN:获取 stream 的队列长度

一个 python 使用 stream 的例子

producer.py

import redis
from time import time

r = redis.Redis()
r.ping()

stream_key = "skey"

for i in range(0,10):
    r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")

consumer.py

import redis
import time

r = redis.Redis()
stream_key = "skey"

while True:
    l = r.xread(count=2, streams={stream_key: 0})
    print(l)
    time.sleep(10)
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0