diff options
-rw-r--r-- | src/pubsub.c | 52 | ||||
-rw-r--r-- | src/server.c | 1 | ||||
-rw-r--r-- | src/server.h | 1 |
3 files changed, 43 insertions, 11 deletions
diff --git a/src/pubsub.c b/src/pubsub.c index 5cb4298e0..6fa397704 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -206,6 +206,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { + dictEntry *de; + list *clients; int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { @@ -217,6 +219,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) { pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); + /* Add the client to the pattern -> list of clients hash table */ + de = dictFind(server.pubsub_patterns_dict,pattern); + if (de == NULL) { + clients = listCreate(); + dictAdd(server.pubsub_patterns_dict,pattern,clients); + incrRefCount(pattern); + } else { + clients = dictGetVal(de); + } + listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); @@ -226,6 +238,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) { /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { + dictEntry *de; + list *clients; listNode *ln; pubsubPattern pat; int retval = 0; @@ -238,6 +252,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { pat.pattern = pattern; ln = listSearchKey(server.pubsub_patterns,&pat); listDelNode(server.pubsub_patterns,ln); + /* Remove the client from the pattern -> clients list hash table */ + de = dictFind(server.pubsub_patterns_dict,pattern); + serverAssertWithInfo(c,NULL,de != NULL); + clients = dictGetVal(de); + ln = listSearchKey(clients,c); + serverAssertWithInfo(c,NULL,ln != NULL); + listDelNode(clients,ln); + if (listLength(clients) == 0) { + /* Free the list and associated hash entry at all if this was + * the latest client. */ + dictDelete(server.pubsub_patterns_dict,pattern); + } } /* Notify the client */ if (notify) addReplyPubsubPatUnsubscribed(c,pattern); @@ -284,6 +310,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; + dictIterator *di; listNode *ln; listIter li; @@ -302,23 +329,26 @@ int pubsubPublishMessage(robj *channel, robj *message) { } } /* Send to clients listening to matching channels */ - if (listLength(server.pubsub_patterns)) { - listRewind(server.pubsub_patterns,&li); + di = dictGetIterator(server.pubsub_patterns_dict); + if (di) { channel = getDecodedObject(channel); - while ((ln = listNext(&li)) != NULL) { - pubsubPattern *pat = ln->value; - - if (stringmatchlen((char*)pat->pattern->ptr, - sdslen(pat->pattern->ptr), + while((de = dictNext(di)) != NULL) { + robj *pattern = dictGetKey(de); + list *clients = dictGetVal(de); + if (!stringmatchlen((char*)pattern->ptr, + sdslen(pattern->ptr), (char*)channel->ptr, - sdslen(channel->ptr),0)) - { - addReplyPubsubPatMessage(pat->client, - pat->pattern,channel,message); + sdslen(channel->ptr),0)) continue; + + listRewind(clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } decrRefCount(channel); + dictReleaseIterator(di); } return receivers; } diff --git a/src/server.c b/src/server.c index 93bafbae8..852fc4ff9 100644 --- a/src/server.c +++ b/src/server.c @@ -2756,6 +2756,7 @@ void initServer(void) { evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = listCreate(); + server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; diff --git a/src/server.h b/src/server.h index d5be5ed29..f4bd4039f 100644 --- a/src/server.h +++ b/src/server.h @@ -1348,6 +1348,7 @@ struct redisServer { /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ + dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ |