今天给大家简单介绍下redis中的发布与订阅机制,本文目录如下:
什么是发布订阅
redis中的发布订阅功能由publish、subscribe、psubscribe等命令组成,就是多个客户端可以订阅服务端一个或者多个频道,当向该频道发布消息时,订阅该频道的所有客户端就会收到消息,如下图所示:
客户端可以订阅一个或者多个频道,另外还可以进行模式的订阅,意思是当向某个频道publish消息时,该消息不仅会被发送给订阅了这个频道的所有客户端,还会被发送给订阅了所有与该频道相匹配的模式的客户端,有点绕,看下图:
当向news.it频道发送消息时,客户端A、客户端C、客户端D都会收到消息:
发布消息的命令是publish,语法是PUBLISH channel message
普通订阅命令是subscribe,语法是subcribe channel [channel ...],可以同时订阅多个频道
模式订阅命令是psubscribe,语法是psubscribe pattern [pattern ...],可以同时订阅多个模式
发布订阅的实现
频道的订阅与退订
首先讲一下频道的订阅与退订,相关的命令是subcribe和unsubscribe,大概原理是当执行subscribe命令订阅某个频道时,服务端会用一个字典来保存订阅的客户端,其中频道名作为字典key,客户端链表作为value,执行subscribe就往该链表插入一个客户端,执行unsubscribe就从该链表中删除该客户端,当执行publish向某个频道发布消息时,通过频道名找到对应的客户端链表,然后依次给客户端发送消息,这样就实现了频道的订阅和发布。
redis将订阅关系保存在服务器状态的pubsub_channel字典中,key是频道名,value是链表,保存了订阅了该频道的客户端:
struct redisServer {
...
dict *pubsub_channels; /* Map channels to list of subscribed clients */
...
}
如下例子:
client-1、client-2、client-3订阅了news.it频道,client-4订阅了news.sport频道,client-5、client-6订阅了news.business频道,下面从代码层面看一下:
subscribe命令的处理函数是subscribeCommand函数,最后会调用到pubsubSubscribeChannel函数,如下:
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// 订阅的频道保存到c->pubsub_channels字典中,key是channel,value是NULL
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel); // 查找该频道是否存在
if (de == NULL) {
clients = listCreate();
// 添加该频道到server.pubsub_channels字典中
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// 将该客户端插入到链表尾部
listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubSubscribed(c,channel);
return retval;
}
可以看出如果频道不存在则添加频道,然后添加客户端到链表中,如果该客户端已经订阅了该频道,则不会执行if中的代码。
unsubscribe命令的处理函数是unsubscribeCommand,最后会调用到pubsubUnsubscribeChannel函数:
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
de = dictFind(server.pubsub_channels,channel); // 查找该频道
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de); // 得到该频道列表
ln = listSearchKey(clients,c); // 从列表中查找该客户端c
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln); // 删除该客户端节点
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
// 该频道没有订阅者了,从字典中删除该频道
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) addReplyPubsubUnsubscribed(c,channel);
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
如下图例子,将client-10086从news.sport和news.movie频道删除,执行完成后,下图虚线节点将会被删除,另外由于news.movie没有其他订阅者了,该频道也会从字典中删除
模式的订阅与退订
模式的订阅命令是psubscribeCommand,退订是punsubscribeCommand函数,订阅时跟频道订阅类似,将订阅关系保存在服务器状态的pubsub_patterns字典中,模式作为key,客户端链表作为value
struct redisServer {
...
dict *pubsub_patterns; /* A dict of pubsub_patterns */
...
}
订阅最后会调用psubscribeCommand函数,逻辑比较简单,如下:
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
list *clients;
int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns,pattern);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_patterns,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubPatSubscribed(c,pattern);
return retval;
}
退订处理函数是punsubscribeCommand,最后会调用到pubsubUnsubscribePattern函数:
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
retval = 1;
listDelNode(c->pubsub_patterns,ln);
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns,pattern);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de); // 得到该模式的客户端链表
ln = listSearchKey(clients,c); // 在链表中查找客户端
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln); // 找到就删除该节点
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client. */
// 如果没有其他订阅者就从字典中删除该模式key
dictDelete(server.pubsub_patterns,pattern);
}
}
/* Notify the client */
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
decrRefCount(pattern);
return retval;
}
publish发布命令实现
publish命令发布消息时,对应的处理函数是publishCommand,最后会调用到pubsubPublishMessage函数,大概逻辑如下:
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
// 从pubsub_channels字典中查找channel频道
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
// 找到对应的频道就遍历客户端列表,给每个客户端发送消息
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
// 处理模式订阅的情况
di = dictGetIterator(server.pubsub_patterns);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
// 如果模式匹配当前频道
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
listRewind(clients,&li);
// 遍历客户端链表,给客户端发送消息
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}