summaryrefslogtreecommitdiff
path: root/src/pubsub.c
diff options
context:
space:
mode:
authorguybe7 <guy.benoish@redislabs.com>2022-04-17 14:43:22 +0200
committerGitHub <noreply@github.com>2022-04-17 15:43:22 +0300
commitf49ff156ecd62aee104cff9f88fb62948575e6b0 (patch)
treed7c5b09264550a2c5f17177b59182788318b24ec /src/pubsub.c
parent789c94feceb7cb0b618dcb912c0151625d913887 (diff)
downloadredis-f49ff156ecd62aee104cff9f88fb62948575e6b0.tar.gz
Add RM_PublishMessageShard (#10543)
since PUBLISH and SPUBLISH use different dictionaries for channels and clients, and we already have an API for PUBLISH, it only makes sense to have one for SPUBLISH Add test coverage and unifying some test infrastructure.
Diffstat (limited to 'src/pubsub.c')
-rw-r--r--src/pubsub.c32
1 files changed, 15 insertions, 17 deletions
diff --git a/src/pubsub.c b/src/pubsub.c
index e805b16ef..a630afc8f 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -499,16 +499,10 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
}
/* Publish a message to all the subscribers. */
-int pubsubPublishMessage(robj *channel, robj *message) {
- return pubsubPublishMessageInternal(channel,message,pubSubType);
+int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
+ return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType);
}
-/* Publish a shard message to all the subscribers. */
-int pubsubPublishMessageShard(robj *channel, robj *message) {
- return pubsubPublishMessageInternal(channel, message, pubSubShardType);
-}
-
-
/*-----------------------------------------------------------------------------
* Pubsub commands implementation
*----------------------------------------------------------------------------*/
@@ -578,6 +572,15 @@ void punsubscribeCommand(client *c) {
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
+/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
+ * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
+int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
+ int receivers = pubsubPublishMessage(channel, message, sharded);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message, sharded);
+ return receivers;
+}
+
/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
if (server.sentinel_mode) {
@@ -585,10 +588,8 @@ void publishCommand(client *c) {
return;
}
- int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
- if (server.cluster_enabled)
- clusterPropagatePublish(c->argv[1],c->argv[2]);
- else
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0);
+ if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
@@ -677,12 +678,9 @@ void channelList(client *c, sds pat, dict *pubsub_channels) {
/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
- int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
- if (server.cluster_enabled) {
- clusterPropagatePublishShard(c->argv[1], c->argv[2]);
- } else {
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
+ if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
- }
addReplyLongLong(c,receivers);
}