summaryrefslogtreecommitdiff
path: root/src/pubsub.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-12-18 12:33:51 +0100
committerantirez <antirez@gmail.com>2019-01-09 17:00:30 +0100
commitbc75a94e2d00ff0cccde475de0980a89bbc641e9 (patch)
treeab711d0d0a2a37d1b62684536b565773c07a764b /src/pubsub.c
parent798a32919291968d9ea44e6266ba5f8d505f4263 (diff)
downloadredis-bc75a94e2d00ff0cccde475de0980a89bbc641e9.tar.gz
RESP3: pubsub messages API completely refactored.
Diffstat (limited to 'src/pubsub.c')
-rw-r--r--src/pubsub.c132
1 files changed, 75 insertions, 57 deletions
diff --git a/src/pubsub.c b/src/pubsub.c
index 12b252cfb..0bf615eb1 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -29,6 +29,75 @@
#include "server.h"
+int clientSubscriptionsCount(client *c);
+
+/*-----------------------------------------------------------------------------
+ * Pubsub client replies API
+ *----------------------------------------------------------------------------*/
+
+/* Send a pubsub message of type "message" to the client. */
+void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
+ addReply(c,shared.mbulkhdr[3]);
+ addReply(c,shared.messagebulk);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,msg);
+}
+
+/* Send a pubsub message of type "pmessage" to the client. The difference
+ * with the "message" type delivered by addReplyPubsubMessage() is that
+ * this message format also includes the pattern that matched the message. */
+void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
+ addReply(c,shared.mbulkhdr[4]);
+ addReply(c,shared.pmessagebulk);
+ addReplyBulk(c,pat);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,msg);
+}
+
+/* Send the pubsub subscription notification to the client. */
+void addReplyPubsubSubscribed(client *c, robj *channel) {
+ addReply(c,shared.mbulkhdr[3]);
+ addReply(c,shared.subscribebulk);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
+/* Send the pubsub unsubscription notification to the client.
+ * Channel can be NULL: this is useful when the client sends a mass
+ * unsubscribe command but there are no channels to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubUnsubscribed(client *c, robj *channel) {
+ addReply(c,shared.mbulkhdr[3]);
+ addReply(c,shared.unsubscribebulk);
+ if (channel)
+ addReplyBulk(c,channel);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
+/* Send the pubsub pattern subscription notification to the client. */
+void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
+ addReply(c,shared.mbulkhdr[3]);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
+/* Send the pubsub pattern unsubscription notification to the client.
+ * Pattern can be NULL: this is useful when the client sends a mass
+ * punsubscribe command but there are no pattern to unsubscribe from: we
+ * still send a notification. */
+void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
+ addReply(c,shared.mbulkhdr[3]);
+ addReply(c,shared.punsubscribebulk);
+ if (pattern)
+ addReplyBulk(c,pattern);
+ else
+ addReplyNull(c);
+ addReplyLongLong(c,clientSubscriptionsCount(c));
+}
+
/*-----------------------------------------------------------------------------
* Pubsub low level API
*----------------------------------------------------------------------------*/
@@ -76,10 +145,7 @@ int pubsubSubscribeChannel(client *c, robj *channel) {
listAddNodeTail(clients,c);
}
/* Notify the client */
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.subscribebulk);
- addReplyBulk(c,channel);
- addReplyLongLong(c,clientSubscriptionsCount(c));
+ addReplyPubsubSubscribed(c,channel);
return retval;
}
@@ -111,14 +177,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
}
}
/* Notify the client */
- if (notify) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.unsubscribebulk);
- addReplyBulk(c,channel);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
-
- }
+ if (notify) addReplyPubsubUnsubscribed(c,channel);
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
@@ -138,10 +197,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
listAddNodeTail(server.pubsub_patterns,pat);
}
/* Notify the client */
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.psubscribebulk);
- addReplyBulk(c,pattern);
- addReplyLongLong(c,clientSubscriptionsCount(c));
+ addReplyPubsubPatSubscribed(c,pattern);
return retval;
}
@@ -162,13 +218,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
listDelNode(server.pubsub_patterns,ln);
}
/* Notify the client */
- if (notify) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.punsubscribebulk);
- addReplyBulk(c,pattern);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
- }
+ if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
decrRefCount(pattern);
return retval;
}
@@ -186,13 +236,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) {
count += pubsubUnsubscribeChannel(c,channel,notify);
}
/* We were subscribed to nothing? Still reply to the client. */
- if (notify && count == 0) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.unsubscribebulk);
- addReplyNull(c);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
- }
+ if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
dictReleaseIterator(di);
return count;
}
@@ -210,36 +254,10 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
count += pubsubUnsubscribePattern(c,pattern,notify);
}
- if (notify && count == 0) {
- /* We were subscribed to nothing? Still reply to the client. */
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.punsubscribebulk);
- addReplyNull(c);
- addReplyLongLong(c,dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns));
- }
+ if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
return count;
}
-/* Send a pubsub message of type "message" to the client. */
-void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
- addReply(c,shared.mbulkhdr[3]);
- addReply(c,shared.messagebulk);
- addReplyBulk(c,channel);
- addReplyBulk(c,msg);
-}
-
-/* Send a pubsub message of type "pmessage" to the client. The difference
- * with the "message" type delivered by addReplyPubsubMessage() is that
- * this message format also includes the pattern that matched the message. */
-void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
- addReply(c,shared.mbulkhdr[4]);
- addReply(c,shared.pmessagebulk);
- addReplyBulk(c,pat);
- addReplyBulk(c,channel);
- addReplyBulk(c,msg);
-}
-
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;