通常使用 redis 用 pub / sub 可以实现消息队列,这里介绍使用 redis stream 实现消息队列的方式。
Redis stream 是一种数据结构,类似于 append only log,不过 redis 对于 redis stream 实现了丰富操作,以克服典型 append only log 的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,比如消费者组。您可以使用流来记录并实时地传播事件。Redis stream的使用案例包括:
- 事件溯源(例如跟踪用户操作,点击等)
- 传感器监控(例如从现场设备中读取数据)
- 通知(例如在单独的流中存储每个用户的通知记录)
Redis 为每个 stream 入口生成唯一ID。您可以使用这些 ID 以后检索它们关联的记录,或者读取和处理流中的所有后续记录。
Redis stream 支持多种修剪策略(以防止流无限增长),以及多个消费策略(参见XREAD,XREADGROUP和XRANGE)。
Redis stream 常用的操作有:
- XADD:往 stream 添加数据
- XRANGE:从指定的开始和结束的时间点,读取该区间的数据
- XREAD:从 stream 获取 1 条 / 多条数据
- XLEN:获取 stream 的队列长度
一个 python 使用 stream 的例子
producer.py
import redis
from time import time
r = redis.Redis()
r.ping()
stream_key = "skey"
for i in range(0,10):
r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")
consumer.py
import redis
import time
r = redis.Redis()
stream_key = "skey"
while True:
l = r.xread(count=2, streams={stream_key: 0})
print(l)
time.sleep(10)