summaryrefslogtreecommitdiff
path: root/src/pubsub.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-03-31 12:40:08 +0200
committerantirez <antirez@gmail.com>2020-03-31 12:40:08 +0200
commit1b4bc60999f618cd171e7a390790470e0ec80244 (patch)
tree324b5c4b752f2969d1cee1bdf97fa91bdd32132b /src/pubsub.c
parent4379b8b411e4940f137623984e2931f0448b4e89 (diff)
parentdfb12f06283f22c157d99830de21700a7f86c139 (diff)
downloadredis-1b4bc60999f618cd171e7a390790470e0ec80244.tar.gz
Merge branch 'pubsub_patterns_boost' of https://github.com/leeyiw/redis into leeyiw-pubsub_patterns_boost
Diffstat (limited to 'src/pubsub.c')
-rw-r--r--src/pubsub.c52
1 files changed, 41 insertions, 11 deletions
diff --git a/src/pubsub.c b/src/pubsub.c
index 5cb4298e0..6fa397704 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -206,6 +206,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
+ dictEntry *de;
+ list *clients;
int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
@@ -217,6 +219,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
+ /* Add the client to the pattern -> list of clients hash table */
+ de = dictFind(server.pubsub_patterns_dict,pattern);
+ if (de == NULL) {
+ clients = listCreate();
+ dictAdd(server.pubsub_patterns_dict,pattern,clients);
+ incrRefCount(pattern);
+ } else {
+ clients = dictGetVal(de);
+ }
+ listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubPatSubscribed(c,pattern);
@@ -226,6 +238,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
+ dictEntry *de;
+ list *clients;
listNode *ln;
pubsubPattern pat;
int retval = 0;
@@ -238,6 +252,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
pat.pattern = pattern;
ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln);
+ /* Remove the client from the pattern -> clients list hash table */
+ de = dictFind(server.pubsub_patterns_dict,pattern);
+ serverAssertWithInfo(c,NULL,de != NULL);
+ clients = dictGetVal(de);
+ ln = listSearchKey(clients,c);
+ serverAssertWithInfo(c,NULL,ln != NULL);
+ listDelNode(clients,ln);
+ if (listLength(clients) == 0) {
+ /* Free the list and associated hash entry at all if this was
+ * the latest client. */
+ dictDelete(server.pubsub_patterns_dict,pattern);
+ }
}
/* Notify the client */
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
@@ -284,6 +310,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
+ dictIterator *di;
listNode *ln;
listIter li;
@@ -302,23 +329,26 @@ int pubsubPublishMessage(robj *channel, robj *message) {
}
}
/* Send to clients listening to matching channels */
- if (listLength(server.pubsub_patterns)) {
- listRewind(server.pubsub_patterns,&li);
+ di = dictGetIterator(server.pubsub_patterns_dict);
+ if (di) {
channel = getDecodedObject(channel);
- while ((ln = listNext(&li)) != NULL) {
- pubsubPattern *pat = ln->value;
-
- if (stringmatchlen((char*)pat->pattern->ptr,
- sdslen(pat->pattern->ptr),
+ while((de = dictNext(di)) != NULL) {
+ robj *pattern = dictGetKey(de);
+ list *clients = dictGetVal(de);
+ if (!stringmatchlen((char*)pattern->ptr,
+ sdslen(pattern->ptr),
(char*)channel->ptr,
- sdslen(channel->ptr),0))
- {
- addReplyPubsubPatMessage(pat->client,
- pat->pattern,channel,message);
+ sdslen(channel->ptr),0)) continue;
+
+ listRewind(clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ addReplyPubsubPatMessage(c,pattern,channel,message);
receivers++;
}
}
decrRefCount(channel);
+ dictReleaseIterator(di);
}
return receivers;
}