diff options
Diffstat (limited to 'src/pubsub.c')
-rw-r--r-- | src/pubsub.c | 168 |
1 files changed, 108 insertions, 60 deletions
diff --git a/src/pubsub.c b/src/pubsub.c index bbcfc1f43..6fa397704 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -29,6 +29,97 @@ #include "server.h" +int clientSubscriptionsCount(client *c); + +/*----------------------------------------------------------------------------- + * Pubsub client replies API + *----------------------------------------------------------------------------*/ + +/* Send a pubsub message of type "message" to the client. + * Normally 'msg' is a Redis object containing the string to send as + * message. However if the caller sets 'msg' as NULL, it will be able + * to send a special message (for instance an Array type) by using the + * addReply*() API family. */ +void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { + if (c->resp == 2) + addReply(c,shared.mbulkhdr[3]); + else + addReplyPushLen(c,3); + addReply(c,shared.messagebulk); + addReplyBulk(c,channel); + if (msg) addReplyBulk(c,msg); +} + +/* Send a pubsub message of type "pmessage" to the client. The difference + * with the "message" type delivered by addReplyPubsubMessage() is that + * this message format also includes the pattern that matched the message. */ +void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { + if (c->resp == 2) + addReply(c,shared.mbulkhdr[4]); + else + addReplyPushLen(c,4); + addReply(c,shared.pmessagebulk); + addReplyBulk(c,pat); + addReplyBulk(c,channel); + addReplyBulk(c,msg); +} + +/* Send the pubsub subscription notification to the client. */ +void addReplyPubsubSubscribed(client *c, robj *channel) { + if (c->resp == 2) + addReply(c,shared.mbulkhdr[3]); + else + addReplyPushLen(c,3); + addReply(c,shared.subscribebulk); + addReplyBulk(c,channel); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + +/* Send the pubsub unsubscription notification to the client. + * Channel can be NULL: this is useful when the client sends a mass + * unsubscribe command but there are no channels to unsubscribe from: we + * still send a notification. */ +void addReplyPubsubUnsubscribed(client *c, robj *channel) { + if (c->resp == 2) + addReply(c,shared.mbulkhdr[3]); + else + addReplyPushLen(c,3); + addReply(c,shared.unsubscribebulk); + if (channel) + addReplyBulk(c,channel); + else + addReplyNull(c); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + +/* Send the pubsub pattern subscription notification to the client. */ +void addReplyPubsubPatSubscribed(client *c, robj *pattern) { + if (c->resp == 2) + addReply(c,shared.mbulkhdr[3]); + else + addReplyPushLen(c,3); + addReply(c,shared.psubscribebulk); + addReplyBulk(c,pattern); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + +/* Send the pubsub pattern unsubscription notification to the client. + * Pattern can be NULL: this is useful when the client sends a mass + * punsubscribe command but there are no pattern to unsubscribe from: we + * still send a notification. */ +void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { + if (c->resp == 2) + addReply(c,shared.mbulkhdr[3]); + else + addReplyPushLen(c,3); + addReply(c,shared.punsubscribebulk); + if (pattern) + addReplyBulk(c,pattern); + else + addReplyNull(c); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + /*----------------------------------------------------------------------------- * Pubsub low level API *----------------------------------------------------------------------------*/ @@ -76,10 +167,7 @@ int pubsubSubscribeChannel(client *c, robj *channel) { listAddNodeTail(clients,c); } /* Notify the client */ - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.subscribebulk); - addReplyBulk(c,channel); - addReplyLongLong(c,clientSubscriptionsCount(c)); + addReplyPubsubSubscribed(c,channel); return retval; } @@ -111,14 +199,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { } } /* Notify the client */ - if (notify) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.unsubscribebulk); - addReplyBulk(c,channel); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - - } + if (notify) addReplyPubsubUnsubscribed(c,channel); decrRefCount(channel); /* it is finally safe to release it */ return retval; } @@ -150,10 +231,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) { listAddNodeTail(clients,c); } /* Notify the client */ - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.psubscribebulk); - addReplyBulk(c,pattern); - addReplyLongLong(c,clientSubscriptionsCount(c)); + addReplyPubsubPatSubscribed(c,pattern); return retval; } @@ -188,13 +266,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { } } /* Notify the client */ - if (notify) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.punsubscribebulk); - addReplyBulk(c,pattern); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - } + if (notify) addReplyPubsubPatUnsubscribed(c,pattern); decrRefCount(pattern); return retval; } @@ -212,13 +284,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) { count += pubsubUnsubscribeChannel(c,channel,notify); } /* We were subscribed to nothing? Still reply to the client. */ - if (notify && count == 0) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.unsubscribebulk); - addReply(c,shared.nullbulk); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - } + if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL); dictReleaseIterator(di); return count; } @@ -236,14 +302,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { count += pubsubUnsubscribePattern(c,pattern,notify); } - if (notify && count == 0) { - /* We were subscribed to nothing? Still reply to the client. */ - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.punsubscribebulk); - addReply(c,shared.nullbulk); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - } + if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL); return count; } @@ -265,11 +324,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; - - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.messagebulk); - addReplyBulk(c,channel); - addReplyBulk(c,message); + addReplyPubsubMessage(c,channel,message); receivers++; } } @@ -283,18 +338,12 @@ int pubsubPublishMessage(robj *channel, robj *message) { if (!stringmatchlen((char*)pattern->ptr, sdslen(pattern->ptr), (char*)channel->ptr, - sdslen(channel->ptr),0)) { - continue; - } + sdslen(channel->ptr),0)) continue; + listRewind(clients,&li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); - - addReply(c,shared.mbulkhdr[4]); - addReply(c,shared.pmessagebulk); - addReplyBulk(c,pattern); - addReplyBulk(c,channel); - addReplyBulk(c,message); + addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } @@ -361,9 +410,9 @@ void publishCommand(client *c) { void pubsubCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"channels [<pattern>] -- Return the currently active channels matching a pattern (default: all).", -"numpat -- Return number of subscriptions to patterns.", -"numsub [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).", +"CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).", +"NUMPAT -- Return number of subscriptions to patterns.", +"NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).", NULL }; addReplyHelp(c, help); @@ -377,7 +426,7 @@ NULL long mblen = 0; void *replylen; - replylen = addDeferredMultiBulkLength(c); + replylen = addReplyDeferredLen(c); while((de = dictNext(di)) != NULL) { robj *cobj = dictGetKey(de); sds channel = cobj->ptr; @@ -390,12 +439,12 @@ NULL } } dictReleaseIterator(di); - setDeferredMultiBulkLength(c,replylen,mblen); + setDeferredArrayLen(c,replylen,mblen); } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) { /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */ int j; - addReplyMultiBulkLen(c,(c->argc-2)*2); + addReplyArrayLen(c,(c->argc-2)*2); for (j = 2; j < c->argc; j++) { list *l = dictFetchValue(server.pubsub_channels,c->argv[j]); @@ -406,7 +455,6 @@ NULL /* PUBSUB NUMPAT */ addReplyLongLong(c,listLength(server.pubsub_patterns)); } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try PUBSUB HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); } } |