diff options
Diffstat (limited to 'src/pubsub.c')
-rw-r--r-- | src/pubsub.c | 32 |
1 files changed, 15 insertions, 17 deletions
diff --git a/src/pubsub.c b/src/pubsub.c index e805b16ef..a630afc8f 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -499,16 +499,10 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) } /* Publish a message to all the subscribers. */ -int pubsubPublishMessage(robj *channel, robj *message) { - return pubsubPublishMessageInternal(channel,message,pubSubType); +int pubsubPublishMessage(robj *channel, robj *message, int sharded) { + return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType); } -/* Publish a shard message to all the subscribers. */ -int pubsubPublishMessageShard(robj *channel, robj *message) { - return pubsubPublishMessageInternal(channel, message, pubSubShardType); -} - - /*----------------------------------------------------------------------------- * Pubsub commands implementation *----------------------------------------------------------------------------*/ @@ -578,6 +572,15 @@ void punsubscribeCommand(client *c) { if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } +/* This function wraps pubsubPublishMessage and also propagates the message to cluster. + * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/ +int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) { + int receivers = pubsubPublishMessage(channel, message, sharded); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message, sharded); + return receivers; +} + /* PUBLISH <channel> <message> */ void publishCommand(client *c) { if (server.sentinel_mode) { @@ -585,10 +588,8 @@ void publishCommand(client *c) { return; } - int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); - if (server.cluster_enabled) - clusterPropagatePublish(c->argv[1],c->argv[2]); - else + int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0); + if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); } @@ -677,12 +678,9 @@ void channelList(client *c, sds pat, dict *pubsub_channels) { /* SPUBLISH <channel> <message> */ void spublishCommand(client *c) { - int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType); - if (server.cluster_enabled) { - clusterPropagatePublishShard(c->argv[1], c->argv[2]); - } else { + int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1); + if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); - } addReplyLongLong(c,receivers); } |