From bc75a94e2d00ff0cccde475de0980a89bbc641e9 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 18 Dec 2018 12:33:51 +0100 Subject: RESP3: pubsub messages API completely refactored. --- src/pubsub.c | 132 +++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 75 insertions(+), 57 deletions(-) (limited to 'src/pubsub.c') diff --git a/src/pubsub.c b/src/pubsub.c index 12b252cfb..0bf615eb1 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -29,6 +29,75 @@ #include "server.h" +int clientSubscriptionsCount(client *c); + +/*----------------------------------------------------------------------------- + * Pubsub client replies API + *----------------------------------------------------------------------------*/ + +/* Send a pubsub message of type "message" to the client. */ +void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { + addReply(c,shared.mbulkhdr[3]); + addReply(c,shared.messagebulk); + addReplyBulk(c,channel); + addReplyBulk(c,msg); +} + +/* Send a pubsub message of type "pmessage" to the client. The difference + * with the "message" type delivered by addReplyPubsubMessage() is that + * this message format also includes the pattern that matched the message. */ +void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { + addReply(c,shared.mbulkhdr[4]); + addReply(c,shared.pmessagebulk); + addReplyBulk(c,pat); + addReplyBulk(c,channel); + addReplyBulk(c,msg); +} + +/* Send the pubsub subscription notification to the client. */ +void addReplyPubsubSubscribed(client *c, robj *channel) { + addReply(c,shared.mbulkhdr[3]); + addReply(c,shared.subscribebulk); + addReplyBulk(c,channel); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + +/* Send the pubsub unsubscription notification to the client. + * Channel can be NULL: this is useful when the client sends a mass + * unsubscribe command but there are no channels to unsubscribe from: we + * still send a notification. */ +void addReplyPubsubUnsubscribed(client *c, robj *channel) { + addReply(c,shared.mbulkhdr[3]); + addReply(c,shared.unsubscribebulk); + if (channel) + addReplyBulk(c,channel); + else + addReplyNull(c); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + +/* Send the pubsub pattern subscription notification to the client. */ +void addReplyPubsubPatSubscribed(client *c, robj *pattern) { + addReply(c,shared.mbulkhdr[3]); + addReply(c,shared.psubscribebulk); + addReplyBulk(c,pattern); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + +/* Send the pubsub pattern unsubscription notification to the client. + * Pattern can be NULL: this is useful when the client sends a mass + * punsubscribe command but there are no pattern to unsubscribe from: we + * still send a notification. */ +void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { + addReply(c,shared.mbulkhdr[3]); + addReply(c,shared.punsubscribebulk); + if (pattern) + addReplyBulk(c,pattern); + else + addReplyNull(c); + addReplyLongLong(c,clientSubscriptionsCount(c)); +} + /*----------------------------------------------------------------------------- * Pubsub low level API *----------------------------------------------------------------------------*/ @@ -76,10 +145,7 @@ int pubsubSubscribeChannel(client *c, robj *channel) { listAddNodeTail(clients,c); } /* Notify the client */ - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.subscribebulk); - addReplyBulk(c,channel); - addReplyLongLong(c,clientSubscriptionsCount(c)); + addReplyPubsubSubscribed(c,channel); return retval; } @@ -111,14 +177,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { } } /* Notify the client */ - if (notify) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.unsubscribebulk); - addReplyBulk(c,channel); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - - } + if (notify) addReplyPubsubUnsubscribed(c,channel); decrRefCount(channel); /* it is finally safe to release it */ return retval; } @@ -138,10 +197,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) { listAddNodeTail(server.pubsub_patterns,pat); } /* Notify the client */ - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.psubscribebulk); - addReplyBulk(c,pattern); - addReplyLongLong(c,clientSubscriptionsCount(c)); + addReplyPubsubPatSubscribed(c,pattern); return retval; } @@ -162,13 +218,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { listDelNode(server.pubsub_patterns,ln); } /* Notify the client */ - if (notify) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.punsubscribebulk); - addReplyBulk(c,pattern); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - } + if (notify) addReplyPubsubPatUnsubscribed(c,pattern); decrRefCount(pattern); return retval; } @@ -186,13 +236,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) { count += pubsubUnsubscribeChannel(c,channel,notify); } /* We were subscribed to nothing? Still reply to the client. */ - if (notify && count == 0) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.unsubscribebulk); - addReplyNull(c); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - } + if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL); dictReleaseIterator(di); return count; } @@ -210,36 +254,10 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { count += pubsubUnsubscribePattern(c,pattern,notify); } - if (notify && count == 0) { - /* We were subscribed to nothing? Still reply to the client. */ - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.punsubscribebulk); - addReplyNull(c); - addReplyLongLong(c,dictSize(c->pubsub_channels)+ - listLength(c->pubsub_patterns)); - } + if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL); return count; } -/* Send a pubsub message of type "message" to the client. */ -void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { - addReply(c,shared.mbulkhdr[3]); - addReply(c,shared.messagebulk); - addReplyBulk(c,channel); - addReplyBulk(c,msg); -} - -/* Send a pubsub message of type "pmessage" to the client. The difference - * with the "message" type delivered by addReplyPubsubMessage() is that - * this message format also includes the pattern that matched the message. */ -void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { - addReply(c,shared.mbulkhdr[4]); - addReply(c,shared.pmessagebulk); - addReplyBulk(c,pat); - addReplyBulk(c,channel); - addReplyBulk(c,msg); -} - /* Publish a message */ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; -- cgit v1.2.1