summaryrefslogtreecommitdiff
path: root/src/pubsub.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pubsub.c')
-rw-r--r--src/pubsub.c32
1 files changed, 15 insertions, 17 deletions
diff --git a/src/pubsub.c b/src/pubsub.c
index e805b16ef..a630afc8f 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -499,16 +499,10 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
}
/* Publish a message to all the subscribers. */
-int pubsubPublishMessage(robj *channel, robj *message) {
- return pubsubPublishMessageInternal(channel,message,pubSubType);
+int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
+ return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType);
}
-/* Publish a shard message to all the subscribers. */
-int pubsubPublishMessageShard(robj *channel, robj *message) {
- return pubsubPublishMessageInternal(channel, message, pubSubShardType);
-}
-
-
/*-----------------------------------------------------------------------------
* Pubsub commands implementation
*----------------------------------------------------------------------------*/
@@ -578,6 +572,15 @@ void punsubscribeCommand(client *c) {
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
+/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
+ * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
+int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
+ int receivers = pubsubPublishMessage(channel, message, sharded);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message, sharded);
+ return receivers;
+}
+
/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
if (server.sentinel_mode) {
@@ -585,10 +588,8 @@ void publishCommand(client *c) {
return;
}
- int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
- if (server.cluster_enabled)
- clusterPropagatePublish(c->argv[1],c->argv[2]);
- else
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0);
+ if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
@@ -677,12 +678,9 @@ void channelList(client *c, sds pat, dict *pubsub_channels) {
/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
- int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
- if (server.cluster_enabled) {
- clusterPropagatePublishShard(c->argv[1], c->argv[2]);
- } else {
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
+ if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
- }
addReplyLongLong(c,receivers);
}