From f49ff156ecd62aee104cff9f88fb62948575e6b0 Mon Sep 17 00:00:00 2001 From: guybe7 Date: Sun, 17 Apr 2022 14:43:22 +0200 Subject: Add RM_PublishMessageShard (#10543) since PUBLISH and SPUBLISH use different dictionaries for channels and clients, and we already have an API for PUBLISH, it only makes sense to have one for SPUBLISH Add test coverage and unifying some test infrastructure. --- src/pubsub.c | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) (limited to 'src/pubsub.c') diff --git a/src/pubsub.c b/src/pubsub.c index e805b16ef..a630afc8f 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -499,16 +499,10 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) } /* Publish a message to all the subscribers. */ -int pubsubPublishMessage(robj *channel, robj *message) { - return pubsubPublishMessageInternal(channel,message,pubSubType); +int pubsubPublishMessage(robj *channel, robj *message, int sharded) { + return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType); } -/* Publish a shard message to all the subscribers. */ -int pubsubPublishMessageShard(robj *channel, robj *message) { - return pubsubPublishMessageInternal(channel, message, pubSubShardType); -} - - /*----------------------------------------------------------------------------- * Pubsub commands implementation *----------------------------------------------------------------------------*/ @@ -578,6 +572,15 @@ void punsubscribeCommand(client *c) { if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } +/* This function wraps pubsubPublishMessage and also propagates the message to cluster. + * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/ +int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) { + int receivers = pubsubPublishMessage(channel, message, sharded); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message, sharded); + return receivers; +} + /* PUBLISH */ void publishCommand(client *c) { if (server.sentinel_mode) { @@ -585,10 +588,8 @@ void publishCommand(client *c) { return; } - int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); - if (server.cluster_enabled) - clusterPropagatePublish(c->argv[1],c->argv[2]); - else + int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0); + if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); } @@ -677,12 +678,9 @@ void channelList(client *c, sds pat, dict *pubsub_channels) { /* SPUBLISH */ void spublishCommand(client *c) { - int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType); - if (server.cluster_enabled) { - clusterPropagatePublishShard(c->argv[1], c->argv[2]); - } else { + int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1); + if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); - } addReplyLongLong(c,receivers); } -- cgit v1.2.1