summaryrefslogtreecommitdiff
path: root/src/pubsub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pubsub.c')
-rw-r--r--src/pubsub.c168
1 files changed, 108 insertions, 60 deletions
diff --git a/src/pubsub.c b/src/pubsub.c
index bbcfc1f43..6fa397704 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -29,6 +29,97 @@
#include "server.h"
+int clientSubscriptionsCount(client *c);
+
+/*-----------------------------------------------------------------------------
+ * Pubsub client replies API
+ *----------------------------------------------------------------------------*/
+
+/* Send a pubsub message of type "message" to the client.
+ * Normally 'msg' is a Redis object containing the string to send as
+ * message. However if the caller sets 'msg' as NULL, it will be able
+ * to send a special message (for instance an Array type) by using the
+ * addReply*() API family. */
+void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.messagebulk);
+ addReplyBulk(c,channel);
+ if (msg) addReplyBulk(c,msg);
+}
+
+/* Send a pubsub message of type "pmessage" to the client. The difference
+ * with the "message" type delivered by addReplyPubsubMessage() is that
+ * this message format also includes the pattern that matched the message. */
+void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[4]);
+ else
+ addReplyPushLen(c,4);
+ addReply(c,shared.pmessagebulk);
+ addReplyBulk(c,pat);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,msg);
+}
+
+/* Send the pubsub subscription notification to the client. */
+void addReplyPubsubSubscribed(client *c, robj *channel) {
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.subscribebulk);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
+/* Send the pubsub unsubscription notification to the client.
+ * Channel can be NULL: this is useful when the client sends a mass
+ * unsubscribe command but there are no channels to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubUnsubscribed(client *c, robj *channel) {
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.unsubscribebulk);
+ if (channel)
+ addReplyBulk(c,channel);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
+/* Send the pubsub pattern subscription notification to the client. */
+void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
+/* Send the pubsub pattern unsubscription notification to the client.
+ * Pattern can be NULL: this is useful when the client sends a mass
+ * punsubscribe command but there are no pattern to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
+ if (c->resp == 2)
+ addReply(c,shared.mbulkhdr[3]);
+ else
+ addReplyPushLen(c,3);
+ addReply(c,shared.punsubscribebulk);
+ if (pattern)
+ addReplyBulk(c,pattern);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
/*-----------------------------------------------------------------------------
* Pubsub low level API
*----------------------------------------------------------------------------*/
@@ -76,10 +167,7 @@ int pubsubSubscribeChannel(client *c, robj *channel) {
listAddNodeTail(clients,c);
}
/* Notify the client */
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.subscribebulk);
- addReplyBulk(c,channel);
- addReplyLongLong(c,clientSubscriptionsCount(c));
+ addReplyPubsubSubscribed(c,channel);
return retval;
}
@@ -111,14 +199,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
}
}
/* Notify the client */
- if (notify) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.unsubscribebulk);
- addReplyBulk(c,channel);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
-
- }
+ if (notify) addReplyPubsubUnsubscribed(c,channel);
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
@@ -150,10 +231,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
listAddNodeTail(clients,c);
}
/* Notify the client */
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.psubscribebulk);
- addReplyBulk(c,pattern);
- addReplyLongLong(c,clientSubscriptionsCount(c));
+ addReplyPubsubPatSubscribed(c,pattern);
return retval;
}
@@ -188,13 +266,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
}
}
/* Notify the client */
- if (notify) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.punsubscribebulk);
- addReplyBulk(c,pattern);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
- }
+ if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
decrRefCount(pattern);
return retval;
}
@@ -212,13 +284,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) {
count += pubsubUnsubscribeChannel(c,channel,notify);
}
/* We were subscribed to nothing? Still reply to the client. */
- if (notify && count == 0) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.unsubscribebulk);
- addReply(c,shared.nullbulk);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
- }
+ if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
dictReleaseIterator(di);
return count;
}
@@ -236,14 +302,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
count += pubsubUnsubscribePattern(c,pattern,notify);
}
- if (notify && count == 0) {
- /* We were subscribed to nothing? Still reply to the client. */
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.punsubscribebulk);
- addReply(c,shared.nullbulk);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
- }
+ if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
return count;
}
@@ -265,11 +324,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
-
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.messagebulk);
- addReplyBulk(c,channel);
- addReplyBulk(c,message);
+ addReplyPubsubMessage(c,channel,message);
receivers++;
}
}
@@ -283,18 +338,12 @@ int pubsubPublishMessage(robj *channel, robj *message) {
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
- sdslen(channel->ptr),0)) {
- continue;
- }
+ sdslen(channel->ptr),0)) continue;
+
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
-
- addReply(c,shared.mbulkhdr[4]);
- addReply(c,shared.pmessagebulk);
- addReplyBulk(c,pattern);
- addReplyBulk(c,channel);
- addReplyBulk(c,message);
+ addReplyPubsubPatMessage(c,pattern,channel,message);
receivers++;
}
}
@@ -361,9 +410,9 @@ void publishCommand(client *c) {
void pubsubCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
-"channels [<pattern>] -- Return the currently active channels matching a pattern (default: all).",
-"numpat -- Return number of subscriptions to patterns.",
-"numsub [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).",
+"CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).",
+"NUMPAT -- Return number of subscriptions to patterns.",
+"NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).",
NULL
};
addReplyHelp(c, help);
@@ -377,7 +426,7 @@ NULL
long mblen = 0;
void *replylen;
- replylen = addDeferredMultiBulkLength(c);
+ replylen = addReplyDeferredLen(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
@@ -390,12 +439,12 @@ NULL
}
}
dictReleaseIterator(di);
- setDeferredMultiBulkLength(c,replylen,mblen);
+ setDeferredArrayLen(c,replylen,mblen);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
- addReplyMultiBulkLen(c,(c->argc-2)*2);
+ addReplyArrayLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
@@ -406,7 +455,6 @@ NULL
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
} else {
- addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try PUBSUB HELP",
- (char*)c->argv[1]->ptr);
+ addReplySubcommandSyntaxError(c);
}
}