summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf9
-rw-r--r--src/acl.c20
-rw-r--r--src/cluster.c169
-rw-r--r--src/cluster.h5
-rw-r--r--src/commands.c70
-rw-r--r--src/commands/pubsub-shardchannels.json23
-rw-r--r--src/commands/pubsub-shardnumsub.json16
-rw-r--r--src/commands/spublish.json46
-rw-r--r--src/commands/ssubscribe.json42
-rw-r--r--src/commands/sunsubscribe.json43
-rw-r--r--src/config.c1
-rw-r--r--src/module.c1
-rw-r--r--src/networking.c4
-rw-r--r--src/pubsub.c350
-rw-r--r--src/redis-cli.c3
-rw-r--r--src/server.c10
-rw-r--r--src/server.h23
-rw-r--r--src/tracking.c6
-rw-r--r--tests/cluster/cluster.tcl30
-rw-r--r--tests/cluster/tests/19-cluster-nodes-slots.tcl21
-rw-r--r--tests/cluster/tests/23-multiple-slot-operations.tcl8
-rw-r--r--tests/cluster/tests/25-pubsubshard-slot-migration.tcl171
-rw-r--r--tests/cluster/tests/26-pubsubshard.tcl94
-rw-r--r--tests/instances.tcl10
-rw-r--r--tests/support/cluster.tcl6
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/acl.tcl54
-rw-r--r--tests/unit/pubsubshard.tcl213
28 files changed, 1343 insertions, 106 deletions
diff --git a/redis.conf b/redis.conf
index 7289277cf..8804aac37 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1613,6 +1613,15 @@ lua-time-limit 5000
#
# cluster-allow-reads-when-down no
+# This option, when set to yes, allows nodes to serve pubsub shard traffic while the
+# the cluster is in a down state, as long as it believes it owns the slots.
+#
+# This is useful if the application would like to use the pubsub feature even when
+# the cluster global stable state is not OK. If the application wants to make sure only
+# one shard is serving a given channel, this feature should be kept as yes.
+#
+# cluster-allow-pubsubshard-when-down yes
+
# Cluster link send buffer limit is the limit on the memory usage of an individual
# cluster bus link's send buffer in bytes. Cluster links would be freed if they exceed
# this limit. This is to primarily prevent send buffers from growing unbounded on links
diff --git a/src/acl.c b/src/acl.c
index 19e2a58f3..19b357c9e 100644
--- a/src/acl.c
+++ b/src/acl.c
@@ -1307,8 +1307,11 @@ int ACLCheckCommandPerm(const user *u, struct redisCommand *cmd, robj **argv, in
}
/* Check if the user can execute commands explicitly touching the keys
- * mentioned in the command arguments. */
+ * mentioned in the command arguments. Shard channels are treated as
+ * special keys for client library to rely on `COMMAND` command
+ * to discover the node to connect to. These don't need acl key check. */
if (!(u->flags & USER_FLAG_ALLKEYS) &&
+ !(cmd->flags & CMD_PUBSUB) &&
(cmd->getkeys_proc || cmd->key_specs_num))
{
getKeysResult result = GETKEYS_RESULT_INIT;
@@ -1392,6 +1395,7 @@ void ACLKillPubsubClientsIfNeeded(user *u, list *upcoming) {
}
/* Check for channel violations. */
if (!kill) {
+ /* Check for global channels violation. */
dictIterator *di = dictGetIterator(c->pubsub_channels);
dictEntry *de;
while (!kill && ((de = dictNext(di)) != NULL)) {
@@ -1400,6 +1404,16 @@ void ACLKillPubsubClientsIfNeeded(user *u, list *upcoming) {
ACL_DENIED_CHANNEL);
}
dictReleaseIterator(di);
+
+ /* Check for shard channels violation. */
+ di = dictGetIterator(c->pubsubshard_channels);
+ while (!kill && ((de = dictNext(di)) != NULL)) {
+ o = dictGetKey(de);
+ kill = (ACLCheckPubsubChannelPerm(o->ptr,upcoming,0) ==
+ ACL_DENIED_CHANNEL);
+ }
+
+ dictReleaseIterator(di);
}
/* Kill it. */
@@ -1448,9 +1462,9 @@ int ACLCheckAllUserCommandPerm(const user *u, struct redisCommand *cmd, robj **a
int acl_retval = ACLCheckCommandPerm(u,cmd,argv,argc,idxptr);
if (acl_retval != ACL_OK)
return acl_retval;
- if (cmd->proc == publishCommand)
+ if (cmd->proc == publishCommand || cmd->proc == spublishCommand)
acl_retval = ACLCheckPubsubPerm(u,argv,1,1,0,idxptr);
- else if (cmd->proc == subscribeCommand)
+ else if (cmd->proc == subscribeCommand || cmd->proc == ssubscribeCommand)
acl_retval = ACLCheckPubsubPerm(u,argv,1,argc-1,0,idxptr);
else if (cmd->proc == psubscribeCommand)
acl_retval = ACLCheckPubsubPerm(u,argv,1,argc-1,1,idxptr);
diff --git a/src/cluster.c b/src/cluster.c
index 78e273f34..81322a8aa 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -57,6 +57,7 @@ void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter, int use_pport);
clusterNode *clusterLookupNode(const char *name);
+list *clusterGetNodesServingMySlots(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
@@ -77,7 +78,9 @@ uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
const char *clusterGetMessageTypeString(int type);
+void removeChannelsInSlot(unsigned int slot);
unsigned int countKeysInSlot(unsigned int hashslot);
+unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
/* Links to the next and previous entries for keys in the same slot are stored
@@ -631,6 +634,9 @@ void clusterInit(void) {
/* Initialize data for the Slot to key API. */
slotToKeyInit(server.db);
+ /* The slots -> channels map is a radix tree. Initialize it here. */
+ server.cluster->slots_to_channels = raxNew();
+
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
@@ -1146,6 +1152,17 @@ clusterNode *clusterLookupNode(const char *name) {
return dictGetVal(de);
}
+/* Get all the nodes serving the same slots as myself. */
+list *clusterGetNodesServingMySlots(clusterNode *node) {
+ list *nodes_for_slot = listCreate();
+ clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof;
+ listAddNodeTail(nodes_for_slot, my_primary);
+ for (int i=0; i < my_primary->numslaves; i++) {
+ listAddNodeTail(nodes_for_slot, my_primary->slaves[i]);
+ }
+ return nodes_for_slot;
+}
+
/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
@@ -1921,7 +1938,7 @@ int clusterProcessPacket(clusterLink *link) {
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
- } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+ } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
@@ -2278,7 +2295,7 @@ int clusterProcessPacket(clusterLink *link) {
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
- } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+ } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */
robj *channel, *message;
@@ -2286,8 +2303,10 @@ int clusterProcessPacket(clusterLink *link) {
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
- if (dictSize(server.pubsub_channels) ||
- dictSize(server.pubsub_patterns))
+ if ((type == CLUSTERMSG_TYPE_PUBLISH
+ && serverPubsubSubscriptionCount() > 0)
+ || (type == CLUSTERMSG_TYPE_PUBLISHSHARD
+ && serverPubsubShardSubscriptionCount() > 0))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
@@ -2296,7 +2315,11 @@ int clusterProcessPacket(clusterLink *link) {
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
- pubsubPublishMessage(channel,message);
+ if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
+ pubsubPublishMessageShard(channel, message);
+ } else {
+ pubsubPublishMessage(channel,message);
+ }
decrRefCount(channel);
decrRefCount(message);
}
@@ -2841,7 +2864,7 @@ void clusterBroadcastPong(int target) {
* the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
* positive in this context. */
REDIS_NO_SANITIZE("bounds")
-void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
+void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
unsigned char *payload;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
@@ -2853,7 +2876,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
- clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
+ clusterBuildMessageHdr(hdr,type);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
@@ -2976,7 +2999,28 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
* messages to hosts without receives for a given channel.
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message) {
- clusterSendPublish(NULL, channel, message);
+ clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
+}
+
+/* -----------------------------------------------------------------------------
+ * CLUSTER Pub/Sub shard support
+ *
+ * Publish this message across the slot (primary/replica).
+ * -------------------------------------------------------------------------- */
+void clusterPropagatePublishShard(robj *channel, robj *message) {
+ list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
+ if (listLength(nodes_for_slot) != 0) {
+ listIter li;
+ listNode *ln;
+ listRewind(nodes_for_slot, &li);
+ while((ln = listNext(&li))) {
+ clusterNode *node = listNodeValue(ln);
+ if (node != myself) {
+ clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
+ }
+ }
+ }
+ listRelease(nodes_for_slot);
}
/* -----------------------------------------------------------------------------
@@ -4075,6 +4119,14 @@ int clusterDelSlot(int slot) {
clusterNode *n = server.cluster->slots[slot];
if (!n) return C_ERR;
+
+ /* Cleanup the channels in master/replica as part of slot deletion. */
+ list *nodes_for_slot = clusterGetNodesServingMySlots(n);
+ listNode *ln = listSearchKey(nodes_for_slot, myself);
+ if (ln != NULL) {
+ removeChannelsInSlot(slot);
+ }
+ listRelease(nodes_for_slot);
serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
server.cluster->slots[slot] = NULL;
return C_OK;
@@ -4574,6 +4626,7 @@ const char *clusterGetMessageTypeString(int type) {
case CLUSTERMSG_TYPE_MEET: return "meet";
case CLUSTERMSG_TYPE_FAIL: return "fail";
case CLUSTERMSG_TYPE_PUBLISH: return "publish";
+ case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
@@ -5362,6 +5415,30 @@ NULL
}
}
+void removeChannelsInSlot(unsigned int slot) {
+ unsigned int channelcount = countChannelsInSlot(slot);
+ if (channelcount == 0) return;
+
+ /* Retrieve all the channels for the slot. */
+ robj **channels = zmalloc(sizeof(robj*)*channelcount);
+ raxIterator iter;
+ int j = 0;
+ unsigned char indexed[2];
+
+ indexed[0] = (slot >> 8) & 0xff;
+ indexed[1] = slot & 0xff;
+ raxStart(&iter,server.cluster->slots_to_channels);
+ raxSeek(&iter,">=",indexed,2);
+ while(raxNext(&iter)) {
+ if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
+ channels[j++] = createStringObject((char*)iter.key + 2, iter.key_len - 2);
+ }
+ raxStop(&iter);
+
+ pubsubUnsubscribeShardChannels(channels, channelcount);
+ zfree(channels);
+}
+
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
@@ -6121,6 +6198,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
mc.cmd = cmd;
}
+ int is_pubsubshard = cmd->proc == ssubscribeCommand ||
+ cmd->proc == sunsubscribeCommand ||
+ cmd->proc == spublishCommand;
+
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
@@ -6172,8 +6253,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
importing_slot = 1;
}
} else {
- /* If it is not the first key, make sure it is exactly
- * the same key as the first we saw. */
+ /* If it is not the first key/channel, make sure it is exactly
+ * the same key/channel as the first we saw. */
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
@@ -6183,15 +6264,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
return NULL;
} else {
/* Flag this request as one with multiple different
- * keys. */
+ * keys/channels. */
multiple_keys = 1;
}
}
}
- /* Migrating / Importing slot? Count keys we don't have. */
+ /* Migrating / Importing slot? Count keys we don't have.
+ * If it is pubsubshard command, it isn't required to check
+ * the channel being present or not in the node during the
+ * slot migration, the channel will be served from the source
+ * node until the migration completes with CLUSTER SETSLOT <slot>
+ * NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
- if ((migrating_slot || importing_slot) &&
+ if ((migrating_slot || importing_slot) && !is_pubsubshard &&
lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
{
missing_keys++;
@@ -6207,7 +6293,12 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (server.cluster->state != CLUSTER_OK) {
- if (!server.cluster_allow_reads_when_down) {
+ if (is_pubsubshard) {
+ if (!server.cluster_allow_pubsubshard_when_down) {
+ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
+ return NULL;
+ }
+ } else if (!server.cluster_allow_reads_when_down) {
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
@@ -6259,7 +6350,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* is serving, we can reply without redirection. */
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
- if (c->flags & CLIENT_READONLY &&
+ if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
!is_write_command &&
nodeIsSlave(myself) &&
myself->slaveof == n)
@@ -6482,3 +6573,51 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int countKeysInSlot(unsigned int hashslot) {
return (*server.db->slots_to_keys).by_slot[hashslot].count;
}
+
+/* -----------------------------------------------------------------------------
+ * Operation(s) on channel rax tree.
+ * -------------------------------------------------------------------------- */
+
+void slotToChannelUpdate(sds channel, int add) {
+ size_t keylen = sdslen(channel);
+ unsigned int hashslot = keyHashSlot(channel,keylen);
+ unsigned char buf[64];
+ unsigned char *indexed = buf;
+
+ if (keylen+2 > 64) indexed = zmalloc(keylen+2);
+ indexed[0] = (hashslot >> 8) & 0xff;
+ indexed[1] = hashslot & 0xff;
+ memcpy(indexed+2,channel,keylen);
+ if (add) {
+ raxInsert(server.cluster->slots_to_channels,indexed,keylen+2,NULL,NULL);
+ } else {
+ raxRemove(server.cluster->slots_to_channels,indexed,keylen+2,NULL);
+ }
+ if (indexed != buf) zfree(indexed);
+}
+
+void slotToChannelAdd(sds channel) {
+ slotToChannelUpdate(channel,1);
+}
+
+void slotToChannelDel(sds channel) {
+ slotToChannelUpdate(channel,0);
+}
+
+/* Get the count of the channels for a given slot. */
+unsigned int countChannelsInSlot(unsigned int hashslot) {
+ raxIterator iter;
+ int j = 0;
+ unsigned char indexed[2];
+
+ indexed[0] = (hashslot >> 8) & 0xff;
+ indexed[1] = hashslot & 0xff;
+ raxStart(&iter,server.cluster->slots_to_channels);
+ raxSeek(&iter,">=",indexed,2);
+ while(raxNext(&iter)) {
+ if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
+ j++;
+ }
+ raxStop(&iter);
+ return j;
+}
diff --git a/src/cluster.h b/src/cluster.h
index d64e2a5b9..a28176e4b 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -97,6 +97,7 @@ typedef struct clusterLink {
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
#define CLUSTERMSG_TYPE_COUNT 10 /* Total number of message types. */
+#define CLUSTERMSG_TYPE_PUBLISHSHARD 11 /* Pub/Sub Publish shard propagation */
/* Flags that a module can set in order to prevent certain Redis Cluster
* features to be enabled. Useful when implementing a different distributed
@@ -173,6 +174,7 @@ typedef struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
+ rax *slots_to_channels;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
@@ -320,6 +322,7 @@ int verifyClusterConfigWithData(void);
unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message);
+void clusterPropagatePublishShard(robj *channel, robj *message);
unsigned int keyHashSlot(char *key, int keylen);
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
@@ -329,5 +332,7 @@ void slotToKeyFlush(redisDb *db);
void slotToKeyDestroy(redisDb *db);
void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
+void slotToChannelAdd(sds channel);
+void slotToChannelDel(sds channel);
#endif /* __CLUSTER_H */
diff --git a/src/commands.c b/src/commands.c
index bf0537b4f..6e1a511ca 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -2913,12 +2913,36 @@ struct redisCommandArg PUBSUB_NUMSUB_Args[] = {
{0}
};
+/********** PUBSUB SHARDCHANNELS ********************/
+
+/* PUBSUB SHARDCHANNELS history */
+#define PUBSUB_SHARDCHANNELS_History NULL
+
+/* PUBSUB SHARDCHANNELS hints */
+#define PUBSUB_SHARDCHANNELS_Hints NULL
+
+/* PUBSUB SHARDCHANNELS argument table */
+struct redisCommandArg PUBSUB_SHARDCHANNELS_Args[] = {
+{"pattern",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL},
+{0}
+};
+
+/********** PUBSUB SHARDNUMSUB ********************/
+
+/* PUBSUB SHARDNUMSUB history */
+#define PUBSUB_SHARDNUMSUB_History NULL
+
+/* PUBSUB SHARDNUMSUB hints */
+#define PUBSUB_SHARDNUMSUB_Hints NULL
+
/* PUBSUB command table */
struct redisCommand PUBSUB_Subcommands[] = {
{"channels","List active channels","O(N) where N is the number of active channels, and assuming constant time pattern matching (relatively short channels and patterns)","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_CHANNELS_History,PUBSUB_CHANNELS_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0,.args=PUBSUB_CHANNELS_Args},
{"help","Show helpful text about the different subcommands","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_HELP_History,PUBSUB_HELP_Hints,pubsubCommand,2,CMD_LOADING|CMD_STALE,0},
{"numpat","Get the count of unique patterns pattern subscriptions","O(1)","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_NUMPAT_History,PUBSUB_NUMPAT_Hints,pubsubCommand,2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0},
{"numsub","Get the count of subscribers for channels","O(N) for the NUMSUB subcommand, where N is the number of requested channels","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_NUMSUB_History,PUBSUB_NUMSUB_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0,.args=PUBSUB_NUMSUB_Args},
+{"shardchannels","List active shard channels","O(N) where N is the number of active shard channels, and assuming constant time pattern matching (relatively short channels).","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_SHARDCHANNELS_History,PUBSUB_SHARDCHANNELS_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0,.args=PUBSUB_SHARDCHANNELS_Args},
+{"shardnumsub","Get the count of subscribers for shard channels","O(N) for the SHARDNUMSUB subcommand, where N is the number of requested channels","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_SHARDNUMSUB_History,PUBSUB_SHARDNUMSUB_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0},
{0}
};
@@ -2944,6 +2968,35 @@ struct redisCommandArg PUNSUBSCRIBE_Args[] = {
{0}
};
+/********** SPUBLISH ********************/
+
+/* SPUBLISH history */
+#define SPUBLISH_History NULL
+
+/* SPUBLISH hints */
+#define SPUBLISH_Hints NULL
+
+/* SPUBLISH argument table */
+struct redisCommandArg SPUBLISH_Args[] = {
+{"channel",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
+{"message",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
+{0}
+};
+
+/********** SSUBSCRIBE ********************/
+
+/* SSUBSCRIBE history */
+#define SSUBSCRIBE_History NULL
+
+/* SSUBSCRIBE hints */
+#define SSUBSCRIBE_Hints NULL
+
+/* SSUBSCRIBE argument table */
+struct redisCommandArg SSUBSCRIBE_Args[] = {
+{"channel",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE},
+{0}
+};
+
/********** SUBSCRIBE ********************/
/* SUBSCRIBE history */
@@ -2961,6 +3014,20 @@ struct redisCommandArg SUBSCRIBE_Args[] = {
{0}
};
+/********** SUNSUBSCRIBE ********************/
+
+/* SUNSUBSCRIBE history */
+#define SUNSUBSCRIBE_History NULL
+
+/* SUNSUBSCRIBE hints */
+#define SUNSUBSCRIBE_Hints NULL
+
+/* SUNSUBSCRIBE argument table */
+struct redisCommandArg SUNSUBSCRIBE_Args[] = {
+{"channel",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE},
+{0}
+};
+
/********** UNSUBSCRIBE ********************/
/* UNSUBSCRIBE history */
@@ -6511,7 +6578,10 @@ struct redisCommand redisCommandTable[] = {
{"publish","Post a message to a channel","O(N+M) where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBLISH_History,PUBLISH_Hints,publishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_SENTINEL,0,.args=PUBLISH_Args},
{"pubsub","A container for Pub/Sun commands","Depends on subcommand.","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_History,PUBSUB_Hints,NULL,-2,0,0,.subcommands=PUBSUB_Subcommands},
{"punsubscribe","Stop listening for messages posted to channels matching the given patterns","O(N+M) where N is the number of patterns the client is already subscribed and M is the number of total patterns subscribed in the system (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,PUNSUBSCRIBE_Hints,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=PUNSUBSCRIBE_Args},
+{"spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SPUBLISH_History,SPUBLISH_Hints,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE,0,{{CMD_KEY_SHARD_CHANNEL,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=SPUBLISH_Args},
+{"ssubscribe","Listen for messages published to the given shard channels","O(N) where N is the number of shard channels to subscribe to.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SSUBSCRIBE_History,SSUBSCRIBE_Hints,ssubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,{{CMD_KEY_SHARD_CHANNEL,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=SSUBSCRIBE_Args},
{"subscribe","Listen for messages published to the given channels","O(N) where N is the number of channels to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SUBSCRIBE_History,SUBSCRIBE_Hints,subscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SUBSCRIBE_Args},
+{"sunsubscribe","Stop listening for messages posted to the given shard channels","O(N) where N is the number of clients already subscribed to a channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,SUNSUBSCRIBE_Hints,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,{{CMD_KEY_SHARD_CHANNEL,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=SUNSUBSCRIBE_Args},
{"unsubscribe","Stop listening for messages posted to the given channels","O(N) where N is the number of clients already subscribed to a channel.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,UNSUBSCRIBE_History,UNSUBSCRIBE_Hints,unsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=UNSUBSCRIBE_Args},
/* scripting */
{"eval","Execute a Lua script server side","Depends on the script that is executed.","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,EVAL_History,EVAL_Hints,evalCommand,-3,CMD_NOSCRIPT|CMD_SKIP_MONITOR|CMD_MAY_REPLICATE|CMD_NO_MANDATORY_KEYS,ACL_CATEGORY_SCRIPTING,{{CMD_KEY_WRITE|CMD_KEY_READ,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},evalGetKeys,.args=EVAL_Args},
diff --git a/src/commands/pubsub-shardchannels.json b/src/commands/pubsub-shardchannels.json
new file mode 100644
index 000000000..450cd8dcd
--- /dev/null
+++ b/src/commands/pubsub-shardchannels.json
@@ -0,0 +1,23 @@
+{
+ "SHARDCHANNELS": {
+ "summary": "List active shard channels",
+ "complexity": "O(N) where N is the number of active shard channels, and assuming constant time pattern matching (relatively short channels).",
+ "group": "pubsub",
+ "since": "7.0.0",
+ "arity": -2,
+ "container": "PUBSUB",
+ "function": "pubsubCommand",
+ "command_flags": [
+ "PUBSUB",
+ "LOADING",
+ "STALE"
+ ],
+ "arguments": [
+ {
+ "name": "pattern",
+ "type": "string",
+ "optional": true
+ }
+ ]
+ }
+}
diff --git a/src/commands/pubsub-shardnumsub.json b/src/commands/pubsub-shardnumsub.json
new file mode 100644
index 000000000..167132b31
--- /dev/null
+++ b/src/commands/pubsub-shardnumsub.json
@@ -0,0 +1,16 @@
+{
+ "SHARDNUMSUB": {
+ "summary": "Get the count of subscribers for shard channels",
+ "complexity": "O(N) for the SHARDNUMSUB subcommand, where N is the number of requested channels",
+ "group": "pubsub",
+ "since": "7.0.0",
+ "arity": -2,
+ "container": "PUBSUB",
+ "function": "pubsubCommand",
+ "command_flags": [
+ "PUBSUB",
+ "LOADING",
+ "STALE"
+ ]
+ }
+}
diff --git a/src/commands/spublish.json b/src/commands/spublish.json
new file mode 100644
index 000000000..34835b9dc
--- /dev/null
+++ b/src/commands/spublish.json
@@ -0,0 +1,46 @@
+{
+ "SPUBLISH": {
+ "summary": "Post a message to a shard channel",
+ "complexity": "O(N) where N is the number of clients subscribed to the receiving shard channel.",
+ "group": "pubsub",
+ "since": "7.0.0",
+ "arity": 3,
+ "function": "spublishCommand",
+ "command_flags": [
+ "PUBSUB",
+ "LOADING",
+ "STALE",
+ "FAST",
+ "MAY_REPLICATE"
+ ],
+ "arguments": [
+ {
+ "name": "channel",
+ "type": "string"
+ },
+ {
+ "name": "message",
+ "type": "string"
+ }
+ ],
+ "key_specs": [
+ {
+ "flags": [
+ "SHARD_CHANNEL"
+ ],
+ "begin_search": {
+ "index": {
+ "pos": 1
+ }
+ },
+ "find_keys": {
+ "range": {
+ "lastkey": 0,
+ "step": 1,
+ "limit": 0
+ }
+ }
+ }
+ ]
+ }
+}
diff --git a/src/commands/ssubscribe.json b/src/commands/ssubscribe.json
new file mode 100644
index 000000000..541e4aac0
--- /dev/null
+++ b/src/commands/ssubscribe.json
@@ -0,0 +1,42 @@
+{
+ "SSUBSCRIBE": {
+ "summary": "Listen for messages published to the given shard channels",
+ "complexity": "O(N) where N is the number of shard channels to subscribe to.",
+ "group": "pubsub",
+ "since": "7.0.0",
+ "arity": -2,
+ "function": "ssubscribeCommand",
+ "command_flags": [
+ "PUBSUB",
+ "NOSCRIPT",
+ "LOADING",
+ "STALE"
+ ],
+ "arguments": [
+ {
+ "name": "channel",
+ "type": "string",
+ "multiple": true
+ }
+ ],
+ "key_specs": [
+ {
+ "flags": [
+ "SHARD_CHANNEL"
+ ],
+ "begin_search": {
+ "index": {
+ "pos": 1
+ }
+ },
+ "find_keys": {
+ "range": {
+ "lastkey": -1,
+ "step": 1,
+ "limit": 0
+ }
+ }
+ }
+ ]
+ }
+}
diff --git a/src/commands/sunsubscribe.json b/src/commands/sunsubscribe.json
new file mode 100644
index 000000000..ba43a02b6
--- /dev/null
+++ b/src/commands/sunsubscribe.json
@@ -0,0 +1,43 @@
+{
+ "SUNSUBSCRIBE": {
+ "summary": "Stop listening for messages posted to the given shard channels",
+ "complexity": "O(N) where N is the number of clients already subscribed to a channel.",
+ "group": "pubsub",
+ "since": "7.0.0",
+ "arity": -1,
+ "function": "sunsubscribeCommand",
+ "command_flags": [
+ "PUBSUB",
+ "NOSCRIPT",
+ "LOADING",
+ "STALE"
+ ],
+ "arguments": [
+ {
+ "name": "channel",
+ "type": "string",
+ "optional": true,
+ "multiple": true
+ }
+ ],
+ "key_specs": [
+ {
+ "flags": [
+ "SHARD_CHANNEL"
+ ],
+ "begin_search": {
+ "index": {
+ "pos": 1
+ }
+ },
+ "find_keys": {
+ "range": {
+ "lastkey": -1,
+ "step": 1,
+ "limit": 0
+ }
+ }
+ }
+ ]
+ }
+}
diff --git a/src/config.c b/src/config.c
index 03b6af29a..317b92ea2 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2636,6 +2636,7 @@ standardConfig configs[] = {
createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, server.cluster_enabled, 0, NULL, NULL),
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_reads_when_down, 0, NULL, NULL),
+ createBoolConfig("cluster-allow-pubsubshard-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_pubsubshard_when_down, 1, NULL, NULL),
createBoolConfig("crash-log-enabled", NULL, MODIFIABLE_CONFIG, server.crashlog_enabled, 1, NULL, updateSighandlerEnabled),
createBoolConfig("crash-memcheck-enabled", NULL, MODIFIABLE_CONFIG, server.memcheck_enabled, 1, NULL, NULL),
createBoolConfig("use-exit-on-panic", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.use_exit_on_panic, 0, NULL, NULL),
diff --git a/src/module.c b/src/module.c
index 9cae23db5..58e3bb666 100644
--- a/src/module.c
+++ b/src/module.c
@@ -790,6 +790,7 @@ int64_t commandKeySpecsFlagsFromString(const char *s) {
char *t = tokens[j];
if (!strcasecmp(t,"write")) flags |= CMD_KEY_WRITE;
else if (!strcasecmp(t,"read")) flags |= CMD_KEY_READ;
+ else if (!strcasecmp(t,"shard_channel")) flags |= CMD_KEY_SHARD_CHANNEL;
else if (!strcasecmp(t,"incomplete")) flags |= CMD_KEY_INCOMPLETE;
else break;
}
diff --git a/src/networking.c b/src/networking.c
index 20d05a9e3..50c7d99ca 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -190,6 +190,7 @@ client *createClient(connection *conn) {
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_patterns = listCreate();
+ c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
@@ -1424,9 +1425,11 @@ void freeClient(client *c) {
/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
+ pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
+ dictRelease(c->pubsubshard_channels);
/* Free data structures. */
listRelease(c->reply);
@@ -2592,6 +2595,7 @@ void resetCommand(client *c) {
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
+ pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
if (c->name) {
diff --git a/src/pubsub.c b/src/pubsub.c
index 6da5b18cf..e805b16ef 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -30,8 +30,68 @@
#include "server.h"
#include "cluster.h"
+/* Structure to hold the pubsub related metadata. Currently used
+ * for pubsub and pubsubshard feature. */
+typedef struct pubsubtype {
+ int shard;
+ dict *(*clientPubSubChannels)(client*);
+ int (*subscriptionCount)(client*);
+ dict **serverPubSubChannels;
+ robj **subscribeMsg;
+ robj **unsubscribeMsg;
+}pubsubtype;
+
+/*
+ * Get client's global Pub/Sub channels subscription count.
+ */
int clientSubscriptionsCount(client *c);
+/*
+ * Get client's shard level Pub/Sub channels subscription count.
+ */
+int clientShardSubscriptionsCount(client *c);
+
+/*
+ * Get client's global Pub/Sub channels dict.
+ */
+dict* getClientPubSubChannels(client *c);
+
+/*
+ * Get client's shard level Pub/Sub channels dict.
+ */
+dict* getClientPubSubShardChannels(client *c);
+
+/*
+ * Get list of channels client is subscribed to.
+ * If a pattern is provided, the subset of channels is returned
+ * matching the pattern.
+ */
+void channelList(client *c, sds pat, dict* pubsub_channels);
+
+/*
+ * Pub/Sub type for global channels.
+ */
+pubsubtype pubSubType = {
+ .shard = 0,
+ .clientPubSubChannels = getClientPubSubChannels,
+ .subscriptionCount = clientSubscriptionsCount,
+ .serverPubSubChannels = &server.pubsub_channels,
+ .subscribeMsg = &shared.subscribebulk,
+ .unsubscribeMsg = &shared.unsubscribebulk,
+};
+
+/*
+ * Pub/Sub type for shard level channels bounded to a slot.
+ */
+pubsubtype pubSubShardType = {
+ .shard = 1,
+ .clientPubSubChannels = getClientPubSubShardChannels,
+ .subscriptionCount = clientShardSubscriptionsCount,
+ .serverPubSubChannels = &server.pubsubshard_channels,
+ .subscribeMsg = &shared.ssubscribebulk,
+ .unsubscribeMsg = &shared.sunsubscribebulk
+};
+
/*-----------------------------------------------------------------------------
* Pubsub client replies API
*----------------------------------------------------------------------------*/
@@ -66,31 +126,31 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
}
/* Send the pubsub subscription notification to the client. */
-void addReplyPubsubSubscribed(client *c, robj *channel) {
+void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
- addReply(c,shared.subscribebulk);
+ addReply(c,*type.subscribeMsg);
addReplyBulk(c,channel);
- addReplyLongLong(c,clientSubscriptionsCount(c));
+ addReplyLongLong(c,type.subscriptionCount(c));
}
/* Send the pubsub unsubscription notification to the client.
* Channel can be NULL: this is useful when the client sends a mass
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
-void addReplyPubsubUnsubscribed(client *c, robj *channel) {
+void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
- addReply(c,shared.unsubscribebulk);
+ addReply(c, *type.unsubscribeMsg);
if (channel)
addReplyBulk(c,channel);
else
addReplyNull(c);
- addReplyLongLong(c,clientSubscriptionsCount(c));
+ addReplyLongLong(c,type.subscriptionCount(c));
}
/* Send the pubsub pattern subscription notification to the client. */
@@ -125,28 +185,57 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
* Pubsub low level API
*----------------------------------------------------------------------------*/
+/* Return the number of pubsub channels + patterns is handled. */
+int serverPubsubSubscriptionCount() {
+ return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
+}
+
+/* Return the number of pubsub shard level channels is handled. */
+int serverPubsubShardSubscriptionCount() {
+ return dictSize(server.pubsubshard_channels);
+}
+
+
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
- return dictSize(c->pubsub_channels)+
- listLength(c->pubsub_patterns);
+ return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
+}
+
+/* Return the number of shard level channels a client is subscribed to. */
+int clientShardSubscriptionsCount(client *c) {
+ return dictSize(c->pubsubshard_channels);
+}
+
+dict* getClientPubSubChannels(client *c) {
+ return c->pubsub_channels;
+}
+
+dict* getClientPubSubShardChannels(client *c) {
+ return c->pubsubshard_channels;
+}
+
+/* Return the number of pubsub + pubsub shard level channels
+ * a client is subscribed to. */
+int clientTotalPubSubSubscriptionCount(client *c) {
+ return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c);
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
-int pubsubSubscribeChannel(client *c, robj *channel) {
+int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
- if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
+ if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
- de = dictFind(server.pubsub_channels,channel);
+ de = dictFind(*type.serverPubSubChannels, channel);
if (de == NULL) {
clients = listCreate();
- dictAdd(server.pubsub_channels,channel,clients);
+ dictAdd(*type.serverPubSubChannels, channel, clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
@@ -154,13 +243,13 @@ int pubsubSubscribeChannel(client *c, robj *channel) {
listAddNodeTail(clients,c);
}
/* Notify the client */
- addReplyPubsubSubscribed(c,channel);
+ addReplyPubsubSubscribed(c,channel,type);
return retval;
}
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
-int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
+int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
dictEntry *de;
list *clients;
listNode *ln;
@@ -169,10 +258,10 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
- if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
+ if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
- de = dictFind(server.pubsub_channels,channel);
+ de = dictFind(*type.serverPubSubChannels, channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
@@ -182,15 +271,53 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
- dictDelete(server.pubsub_channels,channel);
+ dictDelete(*type.serverPubSubChannels, channel);
+ /* As this channel isn't subscribed by anyone, it's safe
+ * to remove the channel from the slot. */
+ if (server.cluster_enabled & type.shard) {
+ slotToChannelDel(channel->ptr);
+ }
}
}
/* Notify the client */
- if (notify) addReplyPubsubUnsubscribed(c,channel);
+ if (notify) {
+ addReplyPubsubUnsubscribed(c,channel,type);
+ }
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
+void pubsubShardUnsubscribeAllClients(robj *channel) {
+ int retval;
+ dictEntry *de = dictFind(server.pubsubshard_channels, channel);
+ serverAssertWithInfo(NULL,channel,de != NULL);
+ list *clients = dictGetVal(de);
+ if (listLength(clients) > 0) {
+ /* For each client subscribed to the channel, unsubscribe it. */
+ listIter li;
+ listNode *ln;
+ listRewind(clients, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ retval = dictDelete(c->pubsubshard_channels, channel);
+ serverAssertWithInfo(c,channel,retval == DICT_OK);
+ addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
+ /* If the client has no other pubsub subscription,
+ * move out of pubsub mode. */
+ if (clientTotalPubSubSubscriptionCount(c) == 0) {
+ c->flags &= ~CLIENT_PUBSUB;
+ }
+ }
+ }
+ /* Delete the channel from server pubsubshard channels hash table. */
+ retval = dictDelete(server.pubsubshard_channels, channel);
+ /* Delete the channel from slots_to_channel mapping. */
+ slotToChannelDel(channel->ptr);
+ serverAssertWithInfo(NULL,channel,retval == DICT_OK);
+ decrRefCount(channel); /* it is finally safe to release it */
+}
+
+
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
@@ -250,24 +377,53 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
/* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed to. */
-int pubsubUnsubscribeAllChannels(client *c, int notify) {
+int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) {
int count = 0;
- if (dictSize(c->pubsub_channels) > 0) {
- dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
+ if (dictSize(type.clientPubSubChannels(c)) > 0) {
+ dictIterator *di = dictGetSafeIterator(type.clientPubSubChannels(c));
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
- count += pubsubUnsubscribeChannel(c,channel,notify);
+ count += pubsubUnsubscribeChannel(c,channel,notify,type);
}
dictReleaseIterator(di);
}
/* We were subscribed to nothing? Still reply to the client. */
- if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
+ if (notify && count == 0) {
+ addReplyPubsubUnsubscribed(c,NULL,type);
+ }
+ return count;
+}
+
+/*
+ * Unsubscribe a client from all global channels.
+ */
+int pubsubUnsubscribeAllChannels(client *c, int notify) {
+ int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType);
+ return count;
+}
+
+/*
+ * Unsubscribe a client from all shard subscribed channels.
+ */
+int pubsubUnsubscribeShardAllChannels(client *c, int notify) {
+ int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType);
return count;
}
+/*
+ * Unsubscribe a client from provided shard subscribed channel(s).
+ */
+void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) {
+ for (unsigned int j = 0; j < count; j++) {
+ /* Remove the channel from server and from the clients
+ * subscribed to it as well as notify them. */
+ pubsubShardUnsubscribeAllClients(channels[j]);
+ }
+}
+
/* Unsubscribe from all the patterns. Return the number of patterns the
* client was subscribed from. */
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
@@ -285,8 +441,10 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
return count;
}
-/* Publish a message */
-int pubsubPublishMessage(robj *channel, robj *message) {
+/*
+ * Publish a message to all the subscribers.
+ */
+int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
@@ -294,7 +452,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
listIter li;
/* Send to clients listening for that channel */
- de = dictFind(server.pubsub_channels,channel);
+ de = dictFind(*type.serverPubSubChannels, channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
@@ -308,6 +466,12 @@ int pubsubPublishMessage(robj *channel, robj *message) {
receivers++;
}
}
+
+ if (type.shard) {
+ /* Shard pubsub ignores patterns. */
+ return receivers;
+ }
+
/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns);
if (di) {
@@ -334,6 +498,17 @@ int pubsubPublishMessage(robj *channel, robj *message) {
return receivers;
}
+/* Publish a message to all the subscribers. */
+int pubsubPublishMessage(robj *channel, robj *message) {
+ return pubsubPublishMessageInternal(channel,message,pubSubType);
+}
+
+/* Publish a shard message to all the subscribers. */
+int pubsubPublishMessageShard(robj *channel, robj *message) {
+ return pubsubPublishMessageInternal(channel, message, pubSubShardType);
+}
+
+
/*-----------------------------------------------------------------------------
* Pubsub commands implementation
*----------------------------------------------------------------------------*/
@@ -352,13 +527,12 @@ void subscribeCommand(client *c) {
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
-
for (j = 1; j < c->argc; j++)
- pubsubSubscribeChannel(c,c->argv[j]);
+ pubsubSubscribeChannel(c,c->argv[j],pubSubType);
c->flags |= CLIENT_PUBSUB;
}
-/* UNSUBSCRIBE [channel [channel ...]] */
+/* UNSUBSCRIBE [channel ...] */
void unsubscribeCommand(client *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
@@ -366,9 +540,9 @@ void unsubscribeCommand(client *c) {
int j;
for (j = 1; j < c->argc; j++)
- pubsubUnsubscribeChannel(c,c->argv[j],1);
+ pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType);
}
- if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
+ if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
/* PSUBSCRIBE pattern [pattern ...] */
@@ -401,7 +575,7 @@ void punsubscribeCommand(client *c) {
for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1);
}
- if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
+ if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
/* PUBLISH <channel> <message> */
@@ -429,7 +603,11 @@ void pubsubCommand(client *c) {
" Return number of subscriptions to patterns.",
"NUMSUB [<channel> ...]",
" Return the number of subscribers for the specified channels, excluding",
-" pattern subscriptions(default: no channels).",
+" pattern subscriptions(default: no channels)."
+"SHARDCHANNELS [<pattern>]",
+" Return the currently active shard level channels matching a <pattern> (default: '*').",
+"SHARDNUMSUB [<channel> ...]",
+" Return the number of subscribers for the specified shard level channel(s)",
NULL
};
addReplyHelp(c, help);
@@ -438,25 +616,7 @@ NULL
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
- dictIterator *di = dictGetIterator(server.pubsub_channels);
- dictEntry *de;
- long mblen = 0;
- void *replylen;
-
- replylen = addReplyDeferredLen(c);
- while((de = dictNext(di)) != NULL) {
- robj *cobj = dictGetKey(de);
- sds channel = cobj->ptr;
-
- if (!pat || stringmatchlen(pat, sdslen(pat),
- channel, sdslen(channel),0))
- {
- addReplyBulk(c,cobj);
- mblen++;
- }
- }
- dictReleaseIterator(di);
- setDeferredArrayLen(c,replylen,mblen);
+ channelList(c, pat, server.pubsub_channels);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
@@ -471,7 +631,93 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,dictSize(server.pubsub_patterns));
+ } else if (!strcasecmp(c->argv[1]->ptr,"shardchannels") &&
+ (c->argc == 2 || c->argc == 3))
+ {
+ /* PUBSUB SHARDCHANNELS */
+ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
+ channelList(c,pat,server.pubsubshard_channels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) {
+ /* PUBSUB SHARDNUMSUB [Channel_1 ... Channel_N] */
+ int j;
+
+ addReplyArrayLen(c, (c->argc-2)*2);
+ for (j = 2; j < c->argc; j++) {
+ list *l = dictFetchValue(server.pubsubshard_channels, c->argv[j]);
+
+ addReplyBulk(c,c->argv[j]);
+ addReplyLongLong(c,l ? listLength(l) : 0);
+ }
} else {
addReplySubcommandSyntaxError(c);
}
}
+
+void channelList(client *c, sds pat, dict *pubsub_channels) {
+ dictIterator *di = dictGetIterator(pubsub_channels);
+ dictEntry *de;
+ long mblen = 0;
+ void *replylen;
+
+ replylen = addReplyDeferredLen(c);
+ while((de = dictNext(di)) != NULL) {
+ robj *cobj = dictGetKey(de);
+ sds channel = cobj->ptr;
+
+ if (!pat || stringmatchlen(pat, sdslen(pat),
+ channel, sdslen(channel),0))
+ {
+ addReplyBulk(c,cobj);
+ mblen++;
+ }
+ }
+ dictReleaseIterator(di);
+ setDeferredArrayLen(c,replylen,mblen);
+}
+
+/* SPUBLISH <channel> <message> */
+void spublishCommand(client *c) {
+ int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
+ if (server.cluster_enabled) {
+ clusterPropagatePublishShard(c->argv[1], c->argv[2]);
+ } else {
+ forceCommandPropagation(c,PROPAGATE_REPL);
+ }
+ addReplyLongLong(c,receivers);
+}
+
+/* SSUBSCRIBE channel [channel ...] */
+void ssubscribeCommand(client *c) {
+ if (c->flags & CLIENT_DENY_BLOCKING) {
+ /* A client that has CLIENT_DENY_BLOCKING flag on
+ * expect a reply per command and so can not execute subscribe. */
+ addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
+ return;
+ }
+
+ for (int j = 1; j < c->argc; j++) {
+ /* A channel is only considered to be added, if a
+ * subscriber exists for it. And if a subscriber
+ * already exists the slotToChannel doesn't needs
+ * to be incremented. */
+ if (server.cluster_enabled &
+ (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
+ slotToChannelAdd(c->argv[j]->ptr);
+ }
+ pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
+ }
+ c->flags |= CLIENT_PUBSUB;
+}
+
+
+/* SUNSUBSCRIBE [channel ...] */
+void sunsubscribeCommand(client *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeShardAllChannels(c, 1);
+ } else {
+ for (int j = 1; j < c->argc; j++) {
+ pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
+ }
+ }
+ if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
+}
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 397e3bfa8..94697f99f 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1381,7 +1381,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
if (!strcasecmp(command,"subscribe") ||
- !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
+ !strcasecmp(command,"psubscribe") ||
+ !strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1;
if (!strcasecmp(command,"sync") ||
!strcasecmp(command,"psync")) config.slave_mode = 1;
diff --git a/src/server.c b/src/server.c
index e7cf5740b..4770a5df0 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1648,6 +1648,8 @@ void createSharedObjects(void) {
shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
+ shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
+ shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
@@ -2367,6 +2369,7 @@ void initServer(void) {
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
+ server.pubsubshard_channels = dictCreate(&keylistDictType);
server.cronloops = 0;
server.in_script = 0;
server.in_exec = 0;
@@ -3499,14 +3502,16 @@ int processCommand(client *c) {
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
+ c->cmd->proc != ssubscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
+ c->cmd->proc != sunsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand) {
rejectCommandFormat(c,
- "Can't execute '%s': only (P)SUBSCRIBE / "
- "(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
+ "Can't execute '%s': only (P|S)SUBSCRIBE / "
+ "(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
c->cmd->name);
return C_OK;
}
@@ -4001,6 +4006,7 @@ void addReplyFlagsForKeyArgs(client *c, uint64_t flags) {
void *flaglen = addReplyDeferredLen(c);
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_WRITE, "write");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_READ, "read");
+ flagcount += addReplyCommandFlag(c,flags,CMD_KEY_SHARD_CHANNEL, "shard_channel");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_INCOMPLETE, "incomplete");
setDeferredSetLen(c, flaglen, flagcount);
}
diff --git a/src/server.h b/src/server.h
index 792eb30a1..c1a0af355 100644
--- a/src/server.h
+++ b/src/server.h
@@ -233,9 +233,12 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* Key argument flags. Please check the command table defined in the server.c file
* for more information about the meaning of every flag. */
-#define CMD_KEY_WRITE (1ULL<<0)
-#define CMD_KEY_READ (1ULL<<1)
-#define CMD_KEY_INCOMPLETE (1ULL<<2) /* meaning that the keyspec might not point out to all keys it should cover */
+#define CMD_KEY_WRITE (1ULL<<0) /* "write" flag */
+#define CMD_KEY_READ (1ULL<<1) /* "read" flag */
+#define CMD_KEY_SHARD_CHANNEL (1ULL<<2) /* "shard_channel" flag */
+#define CMD_KEY_INCOMPLETE (1ULL<<3) /* "incomplete" flag (meaning that
+ * the keyspec might not point out
+ * to all keys it should cover) */
/* AOF states */
#define AOF_OFF 0 /* AOF is off */
@@ -1086,6 +1089,7 @@ typedef struct client {
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
+ dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
@@ -1174,6 +1178,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
+ *ssubscribebulk,*sunsubscribebulk,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -1751,6 +1756,7 @@ struct redisServer {
dict *pubsub_patterns; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
+ dict *pubsubshard_channels; /* Map channels to list of subscribed clients */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
int cluster_port; /* Set the cluster port for a node. */
@@ -1821,6 +1827,8 @@ struct redisServer {
* failover then any replica can be used. */
int target_replica_port; /* Failover target port */
int failover_state; /* Failover state */
+ int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster
+ is down, doesn't affect pubsub global. */
};
#define MAX_KEYS_BUFFER 256
@@ -2816,9 +2824,14 @@ robj *hashTypeDup(robj *o);
/* Pub / Sub */
int pubsubUnsubscribeAllChannels(client *c, int notify);
+int pubsubUnsubscribeShardAllChannels(client *c, int notify);
+void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count);
int pubsubUnsubscribeAllPatterns(client *c, int notify);
int pubsubPublishMessage(robj *channel, robj *message);
+int pubsubPublishMessageShard(robj *channel, robj *message);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
+int serverPubsubSubscriptionCount();
+int serverPubsubShardSubscriptionCount();
/* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
@@ -2902,6 +2915,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
/* API to get key arguments from commands */
int *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
+int getChannelsFromCommand(struct redisCommand *cmd, int argc, getKeysResult *result);
void getKeysFreeResult(getKeysResult *result);
int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
@@ -3184,6 +3198,9 @@ void psubscribeCommand(client *c);
void punsubscribeCommand(client *c);
void publishCommand(client *c);
void pubsubCommand(client *c);
+void spublishCommand(client *c);
+void ssubscribeCommand(client *c);
+void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);
diff --git a/src/tracking.c b/src/tracking.c
index 11e2587e2..bb36f742d 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -228,6 +228,12 @@ void trackingRememberKeys(client *c) {
getKeysFreeResult(&result);
return;
}
+ /* Shard channels are treated as special keys for client
+ * library to rely on `COMMAND` command to discover the node
+ * to connect to. These channels doesn't need to be tracked. */
+ if (c->cmd->flags & CMD_PUBSUB) {
+ return;
+ }
int *keys = result.keys;
diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl
index 7b7ce5343..31e0c3667 100644
--- a/tests/cluster/cluster.tcl
+++ b/tests/cluster/cluster.tcl
@@ -66,7 +66,7 @@ proc s {n field} {
get_info_field [R $n info] $field
}
-# Assuming nodes are reest, this function performs slots allocation.
+# Assuming nodes are reset, this function performs slots allocation.
# Only the first 'n' nodes are used.
proc cluster_allocate_slots {n} {
set slot 16383
@@ -129,6 +129,32 @@ proc create_cluster {masters slaves} {
set ::cluster_replica_nodes $slaves
}
+proc cluster_allocate_with_continuous_slots {n} {
+ set slot 16383
+ set avg [expr ($slot+1) / $n]
+ while {$slot >= 0} {
+ set node [expr $slot/$avg >= $n ? $n-1 : $slot/$avg]
+ lappend slots_$node $slot
+ incr slot -1
+ }
+ for {set j 0} {$j < $n} {incr j} {
+ R $j cluster addslots {*}[set slots_${j}]
+ }
+}
+
+# Create a cluster composed of the specified number of masters and slaves with continuous slots.
+proc cluster_create_with_continuous_slots {masters slaves} {
+ cluster_allocate_with_continuous_slots $masters
+ if {$slaves} {
+ cluster_allocate_slaves $masters $slaves
+ }
+ assert_cluster_state ok
+
+ set ::cluster_master_nodes $masters
+ set ::cluster_replica_nodes $slaves
+}
+
+
# Set the cluster node-timeout to all the reachalbe nodes.
proc set_cluster_node_timeout {to} {
foreach_redis_id id {
@@ -243,4 +269,4 @@ proc get_link_from_peer {this_instance_id peer_nodename} {
}
}
return {}
-}
+} \ No newline at end of file
diff --git a/tests/cluster/tests/19-cluster-nodes-slots.tcl b/tests/cluster/tests/19-cluster-nodes-slots.tcl
index 80f68d5d0..77faec912 100644
--- a/tests/cluster/tests/19-cluster-nodes-slots.tcl
+++ b/tests/cluster/tests/19-cluster-nodes-slots.tcl
@@ -2,27 +2,6 @@
source "../tests/includes/init-tests.tcl"
-proc cluster_allocate_with_continuous_slots {n} {
- set slot 16383
- set avg [expr ($slot+1) / $n]
- while {$slot >= 0} {
- set node [expr $slot/$avg >= $n ? $n-1 : $slot/$avg]
- lappend slots_$node $slot
- incr slot -1
- }
- for {set j 0} {$j < $n} {incr j} {
- R $j cluster addslots {*}[set slots_${j}]
- }
-}
-
-proc cluster_create_with_continuous_slots {masters slaves} {
- cluster_allocate_with_continuous_slots $masters
- if {$slaves} {
- cluster_allocate_slaves $masters $slaves
- }
- assert_cluster_state ok
-}
-
test "Create a 2 nodes cluster" {
cluster_create_with_continuous_slots 2 2
}
diff --git a/tests/cluster/tests/23-multiple-slot-operations.tcl b/tests/cluster/tests/23-multiple-slot-operations.tcl
index 906033c72..965ecd5af 100644
--- a/tests/cluster/tests/23-multiple-slot-operations.tcl
+++ b/tests/cluster/tests/23-multiple-slot-operations.tcl
@@ -2,7 +2,7 @@
source "../tests/includes/init-tests.tcl"
-proc cluster_allocate_with_continuous_slots {n} {
+proc cluster_allocate_with_continuous_slots_local {n} {
R 0 cluster ADDSLOTSRANGE 0 3276
R 1 cluster ADDSLOTSRANGE 3277 6552
R 2 cluster ADDSLOTSRANGE 6553 9828
@@ -10,8 +10,8 @@ proc cluster_allocate_with_continuous_slots {n} {
R 4 cluster ADDSLOTSRANGE 13105 16383
}
-proc cluster_create_with_continuous_slots {masters slaves} {
- cluster_allocate_with_continuous_slots $masters
+proc cluster_create_with_continuous_slots_local {masters slaves} {
+ cluster_allocate_with_continuous_slots_local $masters
if {$slaves} {
cluster_allocate_slaves $masters $slaves
}
@@ -20,7 +20,7 @@ proc cluster_create_with_continuous_slots {masters slaves} {
test "Create a 5 nodes cluster" {
- cluster_create_with_continuous_slots 5 5
+ cluster_create_with_continuous_slots_local 5 5
}
test "Cluster should start ok" {
diff --git a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl
new file mode 100644
index 000000000..11b77d36a
--- /dev/null
+++ b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl
@@ -0,0 +1,171 @@
+source "../tests/includes/init-tests.tcl"
+
+test "Create a 3 nodes cluster" {
+ cluster_create_with_continuous_slots 3 3
+}
+
+test "Cluster is up" {
+ assert_cluster_state ok
+}
+
+set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
+
+test "Migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
+
+ # Setup the to and from node
+ set channelname mychannel
+ set slot [$cluster cluster keyslot $channelname]
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $nodefrom(host) $nodefrom(port)]
+
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
+
+ # Verify subscribe is still valid, able to receive messages.
+ $nodefrom(link) spublish $channelname hello
+ assert_equal {message mychannel hello} [$subscribeclient read]
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
+
+ $subscribeclient close
+}
+
+test "Client subscribes to multiple channels, migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
+
+ # Setup the to and from node
+ set channelname ch3
+ set anotherchannelname ch7
+ set slot [$cluster cluster keyslot $channelname]
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $nodefrom(host) $nodefrom(port)]
+
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ $subscribeclient ssubscribe $anotherchannelname
+ $subscribeclient read
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
+
+ # Verify subscribe is still valid, able to receive messages.
+ $nodefrom(link) spublish $channelname hello
+ assert_equal {message ch3 hello} [$subscribeclient read]
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
+
+ # Verify the client receives sunsubscribe message for the channel(slot) which got migrated.
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"1" eq [lindex $msg 2]}
+
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
+
+ $nodefrom(link) spublish $anotherchannelname hello
+
+ # Verify the client is still connected and receives message from the other channel.
+ set msg [$subscribeclient read]
+ assert {"message" eq [lindex $msg 0]}
+ assert {$anotherchannelname eq [lindex $msg 1]}
+ assert {"hello" eq [lindex $msg 2]}
+
+ $subscribeclient close
+}
+
+test "Migrate a slot, verify client receives sunsubscribe on replica serving the slot." {
+
+ # Setup the to and from node
+ set channelname mychannel1
+ set slot [$cluster cluster keyslot $channelname]
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+
+ # Get replica node serving slot (mychannel) to connect a client.
+ set replicanodeinfo [$cluster cluster replicas $nodefrom(id)]
+ set args [split $replicanodeinfo " "]
+ set addr [lindex [split [lindex $args 1] @] 0]
+ set replicahost [lindex [split $addr :] 0]
+ set replicaport [lindex [split $addr :] 1]
+ set subscribeclient [redis_deferring_client_by_addr $replicahost $replicaport]
+
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
+
+ # Verify subscribe is still valid, able to receive messages.
+ $nodefrom(link) spublish $channelname hello
+ assert_equal {message mychannel1 hello} [$subscribeclient read]
+
+ assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
+ assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ $subscribeclient close
+}
+
+test "Delete a slot, verify sunsubscribe message" {
+ set channelname ch2
+ set slot [$cluster cluster keyslot $channelname]
+
+ array set primary_client [$cluster masternode_for_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $primary_client(host) $primary_client(port)]
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ $primary_client(link) cluster DELSLOTS $slot
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ $subscribeclient close
+}
+
+test "Reset cluster, verify sunsubscribe message" {
+ set channelname ch4
+ set slot [$cluster cluster keyslot $channelname]
+
+ array set primary_client [$cluster masternode_for_slot $slot]
+
+ set subscribeclient [redis_deferring_client_by_addr $primary_client(host) $primary_client(port)]
+ $subscribeclient deferred 1
+ $subscribeclient ssubscribe $channelname
+ $subscribeclient read
+
+ $cluster cluster reset HARD
+
+ set msg [$subscribeclient read]
+ assert {"sunsubscribe" eq [lindex $msg 0]}
+ assert {$channelname eq [lindex $msg 1]}
+ assert {"0" eq [lindex $msg 2]}
+
+ $cluster close
+ $subscribeclient close
+} \ No newline at end of file
diff --git a/tests/cluster/tests/26-pubsubshard.tcl b/tests/cluster/tests/26-pubsubshard.tcl
new file mode 100644
index 000000000..2619eda0a
--- /dev/null
+++ b/tests/cluster/tests/26-pubsubshard.tcl
@@ -0,0 +1,94 @@
+# Test PUBSUB shard propagation in a cluster slot.
+
+source "../tests/includes/init-tests.tcl"
+
+test "Create a 3 nodes cluster" {
+ cluster_create_with_continuous_slots 3 3
+}
+
+set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
+test "Pub/Sub shard basics" {
+
+ set slot [$cluster cluster keyslot "channel.0"]
+ array set publishnode [$cluster masternode_for_slot $slot]
+ array set notshardnode [$cluster masternode_notfor_slot $slot]
+
+ set publishclient [redis_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeclient2 [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set anotherclient [redis_deferring_client_by_addr $notshardnode(host) $notshardnode(port)]
+
+ $subscribeclient ssubscribe channel.0
+ $subscribeclient read
+
+ $subscribeclient2 ssubscribe channel.0
+ $subscribeclient2 read
+
+ $anotherclient ssubscribe channel.0
+ catch {$anotherclient read} err
+ assert_match {MOVED *} $err
+
+ set data [randomValue]
+ $publishclient spublish channel.0 $data
+
+ set msg [$subscribeclient read]
+ assert_equal $data [lindex $msg 2]
+
+ set msg [$subscribeclient2 read]
+ assert_equal $data [lindex $msg 2]
+
+ $publishclient close
+ $subscribeclient close
+ $subscribeclient2 close
+ $anotherclient close
+}
+
+test "client can't subscribe to multiple shard channels across different slots in same call" {
+ catch {$cluster ssubscribe channel.0 channel.1} err
+ assert_match {CROSSSLOT Keys*} $err
+}
+
+test "client can subscribe to multiple shard channels across different slots in separate call" {
+ $cluster ssubscribe ch3
+ $cluster ssubscribe ch7
+
+ $cluster sunsubscribe ch3
+ $cluster sunsubscribe ch7
+}
+
+
+test "Verify Pub/Sub and Pub/Sub shard no overlap" {
+ set slot [$cluster cluster keyslot "channel.0"]
+ array set publishnode [$cluster masternode_for_slot $slot]
+ array set notshardnode [$cluster masternode_notfor_slot $slot]
+
+ set publishshardclient [redis_client_by_addr $publishnode(host) $publishnode(port)]
+ set publishclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeshardclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+ set subscribeclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
+
+ $subscribeshardclient deferred 1
+ $subscribeshardclient ssubscribe channel.0
+ $subscribeshardclient read
+
+ $subscribeclient deferred 1
+ $subscribeclient subscribe channel.0
+ $subscribeclient read
+
+ set sharddata "testingpubsubdata"
+ $publishshardclient spublish channel.0 $sharddata
+
+ set data "somemoredata"
+ $publishclient publish channel.0 $data
+
+ set msg [$subscribeshardclient read]
+ assert_equal $sharddata [lindex $msg 2]
+
+ set msg [$subscribeclient read]
+ assert_equal $data [lindex $msg 2]
+
+ $cluster close
+ $publishclient close
+ $subscribeclient close
+ $subscribeshardclient close
+} \ No newline at end of file
diff --git a/tests/instances.tcl b/tests/instances.tcl
index 2a3336b78..52e61418e 100644
--- a/tests/instances.tcl
+++ b/tests/instances.tcl
@@ -677,9 +677,19 @@ proc redis_deferring_client {type id} {
return $client
}
+proc redis_deferring_client_by_addr {host port} {
+ set client [redis $host $port 1 $::tls]
+ return $client
+}
+
proc redis_client {type id} {
set port [get_instance_attrib $type $id port]
set host [get_instance_attrib $type $id host]
set client [redis $host $port 0 $::tls]
return $client
}
+
+proc redis_client_by_addr {host port} {
+ set client [redis $host $port 0 $::tls]
+ return $client
+} \ No newline at end of file
diff --git a/tests/support/cluster.tcl b/tests/support/cluster.tcl
index df4b7f3d0..081ef6a95 100644
--- a/tests/support/cluster.tcl
+++ b/tests/support/cluster.tcl
@@ -13,6 +13,7 @@ package require Tcl 8.5
package provide redis_cluster 0.1
namespace eval redis_cluster {}
+set ::redis_cluster::internal_id 0
set ::redis_cluster::id 0
array set ::redis_cluster::startup_nodes {}
array set ::redis_cluster::nodes {}
@@ -32,7 +33,8 @@ set ::redis_cluster::plain_commands {
hget hmset hmget hincrby hincrbyfloat hdel hlen hkeys hvals
hgetall hexists hscan incrby decrby incrbyfloat getset move
expire expireat pexpire pexpireat type ttl pttl persist restore
- dump bitcount bitpos pfadd pfcount
+ dump bitcount bitpos pfadd pfcount cluster ssubscribe spublish
+ sunsubscribe
}
# Create a cluster client. The nodes are given as a list of host:port. The TLS
@@ -118,6 +120,7 @@ proc ::redis_cluster::__method__refresh_nodes_map {id} {
# Build this node description as an hash.
set node [dict create \
id $nodeid \
+ internal_id $id \
addr $addr \
host $host \
port $port \
@@ -265,6 +268,7 @@ proc ::redis_cluster::get_keys_from_command {cmd argv} {
mget {return $argv}
eval {return [lrange $argv 2 1+[lindex $argv 1]]}
evalsha {return [lrange $argv 2 1+[lindex $argv 1]]}
+ spublish {return [list [lindex $argv 1]]}
}
# All the remaining commands are not handled.
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 722b04ed0..d47e83c56 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -62,6 +62,7 @@ set ::all_tests {
integration/redis-benchmark
integration/dismiss-mem
unit/pubsub
+ unit/pubsubshard
unit/slowlog
unit/scripting
unit/functions
diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl
index 03a3ba926..660e841fb 100644
--- a/tests/unit/acl.tcl
+++ b/tests/unit/acl.tcl
@@ -87,6 +87,10 @@ start_server {tags {"acl external:skip"}} {
r PUBLISH foo bar
} {0}
+ test {By default users are able to publish to any shard channel} {
+ r SPUBLISH foo bar
+ } {0}
+
test {By default users are able to subscribe to any channel} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
@@ -96,6 +100,15 @@ start_server {tags {"acl external:skip"}} {
$rd close
} {0}
+ test {By default users are able to subscribe to any shard channel} {
+ set rd [redis_deferring_client]
+ $rd AUTH psuser pspass
+ $rd read
+ $rd SSUBSCRIBE foo
+ assert_match {ssubscribe foo 1} [$rd read]
+ $rd close
+ } {0}
+
test {By default users are able to subscribe to any pattern} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
@@ -113,6 +126,14 @@ start_server {tags {"acl external:skip"}} {
set e
} {*NOPERM*channel*}
+ test {It's possible to allow publishing to a subset of shard channels} {
+ r ACL setuser psuser resetchannels &foo:1 &bar:*
+ assert_equal {0} [r SPUBLISH foo:1 somemessage]
+ assert_equal {0} [r SPUBLISH bar:2 anothermessage]
+ catch {r SPUBLISH zap:3 nosuchmessage} e
+ set e
+ } {*NOPERM*channel*}
+
test {Validate subset of channels is prefixed with resetchannels flag} {
r ACL setuser hpuser on nopass resetchannels &foo +@all
@@ -166,6 +187,19 @@ start_server {tags {"acl external:skip"}} {
set e
} {*NOPERM*channel*}
+ test {It's possible to allow subscribing to a subset of shard channels} {
+ set rd [redis_deferring_client]
+ $rd AUTH psuser pspass
+ $rd read
+ $rd SSUBSCRIBE foo:1
+ assert_match {ssubscribe foo:1 1} [$rd read]
+ $rd SSUBSCRIBE bar:2
+ assert_match {ssubscribe bar:2 2} [$rd read]
+ $rd SSUBSCRIBE zap:3
+ catch {$rd read} e
+ set e
+ } {*NOPERM*channel*}
+
test {It's possible to allow subscribing to a subset of channel patterns} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
@@ -193,6 +227,20 @@ start_server {tags {"acl external:skip"}} {
$rd close
} {0}
+ test {Subscribers are killed when revoked of channel permission} {
+ set rd [redis_deferring_client]
+ r ACL setuser psuser resetchannels &foo:1
+ $rd AUTH psuser pspass
+ $rd read
+ $rd CLIENT SETNAME deathrow
+ $rd read
+ $rd SSUBSCRIBE foo:1
+ $rd read
+ r ACL setuser psuser resetchannels
+ assert_no_match {*deathrow*} [r CLIENT LIST]
+ $rd close
+ } {0}
+
test {Subscribers are killed when revoked of pattern permission} {
set rd [redis_deferring_client]
r ACL setuser psuser resetchannels &bar:*
@@ -209,16 +257,18 @@ start_server {tags {"acl external:skip"}} {
test {Subscribers are pardoned if literal permissions are retained and/or gaining allchannels} {
set rd [redis_deferring_client]
- r ACL setuser psuser resetchannels &foo:1 &bar:*
+ r ACL setuser psuser resetchannels &foo:1 &bar:* &orders
$rd AUTH psuser pspass
$rd read
$rd CLIENT SETNAME pardoned
$rd read
$rd SUBSCRIBE foo:1
$rd read
+ $rd SSUBSCRIBE orders
+ $rd read
$rd PSUBSCRIBE bar:*
$rd read
- r ACL setuser psuser resetchannels &foo:1 &bar:* &baz:qaz &zoo:*
+ r ACL setuser psuser resetchannels &foo:1 &bar:* &orders &baz:qaz &zoo:*
assert_match {*pardoned*} [r CLIENT LIST]
r ACL setuser psuser allchannels
assert_match {*pardoned*} [r CLIENT LIST]
diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl
new file mode 100644
index 000000000..5c3564afe
--- /dev/null
+++ b/tests/unit/pubsubshard.tcl
@@ -0,0 +1,213 @@
+start_server {tags {"pubsubshard external:skip"}} {
+ proc __consume_ssubscribe_messages {client type channels} {
+ set numsub -1
+ set counts {}
+
+ for {set i [llength $channels]} {$i > 0} {incr i -1} {
+ set msg [$client read]
+ assert_equal $type [lindex $msg 0]
+
+ # when receiving subscribe messages the channels names
+ # are ordered. when receiving unsubscribe messages
+ # they are unordered
+ set idx [lsearch -exact $channels [lindex $msg 1]]
+ if {[string match "sunsubscribe" $type]} {
+ assert {$idx >= 0}
+ } else {
+ assert {$idx == 0}
+ }
+ set channels [lreplace $channels $idx $idx]
+
+ # aggregate the subscription count to return to the caller
+ lappend counts [lindex $msg 2]
+ }
+
+ # we should have received messages for channels
+ assert {[llength $channels] == 0}
+ return $counts
+ }
+
+ proc __consume_subscribe_messages {client type channels} {
+ set numsub -1
+ set counts {}
+
+ for {set i [llength $channels]} {$i > 0} {incr i -1} {
+ set msg [$client read]
+ assert_equal $type [lindex $msg 0]
+
+ # when receiving subscribe messages the channels names
+ # are ordered. when receiving unsubscribe messages
+ # they are unordered
+ set idx [lsearch -exact $channels [lindex $msg 1]]
+ if {[string match "unsubscribe" $type]} {
+ assert {$idx >= 0}
+ } else {
+ assert {$idx == 0}
+ }
+ set channels [lreplace $channels $idx $idx]
+
+ # aggregate the subscription count to return to the caller
+ lappend counts [lindex $msg 2]
+ }
+
+ # we should have received messages for channels
+ assert {[llength $channels] == 0}
+ return $counts
+ }
+
+ proc ssubscribe {client channels} {
+ $client ssubscribe {*}$channels
+ __consume_ssubscribe_messages $client ssubscribe $channels
+ }
+
+ proc subscribe {client channels} {
+ $client subscribe {*}$channels
+ __consume_subscribe_messages $client subscribe $channels
+ }
+
+ proc sunsubscribe {client {channels {}}} {
+ $client sunsubscribe {*}$channels
+ __consume_subscribe_messages $client sunsubscribe $channels
+ }
+
+ proc unsubscribe {client {channels {}}} {
+ $client unsubscribe {*}$channels
+ __consume_subscribe_messages $client unsubscribe $channels
+ }
+
+ test "SPUBLISH/SSUBSCRIBE basics" {
+ set rd1 [redis_deferring_client]
+
+ # subscribe to two channels
+ assert_equal {1} [ssubscribe $rd1 {chan1}]
+ assert_equal {2} [ssubscribe $rd1 {chan2}]
+ assert_equal 1 [r SPUBLISH chan1 hello]
+ assert_equal 1 [r SPUBLISH chan2 world]
+ assert_equal {message chan1 hello} [$rd1 read]
+ assert_equal {message chan2 world} [$rd1 read]
+
+ # unsubscribe from one of the channels
+ sunsubscribe $rd1 {chan1}
+ assert_equal 0 [r SPUBLISH chan1 hello]
+ assert_equal 1 [r SPUBLISH chan2 world]
+ assert_equal {message chan2 world} [$rd1 read]
+
+ # unsubscribe from the remaining channel
+ sunsubscribe $rd1 {chan2}
+ assert_equal 0 [r SPUBLISH chan1 hello]
+ assert_equal 0 [r SPUBLISH chan2 world]
+
+ # clean up clients
+ $rd1 close
+ }
+
+ test "SPUBLISH/SSUBSCRIBE with two clients" {
+ set rd1 [redis_deferring_client]
+ set rd2 [redis_deferring_client]
+
+ assert_equal {1} [ssubscribe $rd1 {chan1}]
+ assert_equal {1} [ssubscribe $rd2 {chan1}]
+ assert_equal 2 [r SPUBLISH chan1 hello]
+ assert_equal {message chan1 hello} [$rd1 read]
+ assert_equal {message chan1 hello} [$rd2 read]
+
+ # clean up clients
+ $rd1 close
+ $rd2 close
+ }
+
+ test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
+ set rd1 [redis_deferring_client]
+ assert_equal {1} [ssubscribe $rd1 {chan1}]
+ assert_equal {2} [ssubscribe $rd1 {chan2}]
+ assert_equal {3} [ssubscribe $rd1 {chan3}]
+ sunsubscribe $rd1
+ assert_equal 0 [r SPUBLISH chan1 hello]
+ assert_equal 0 [r SPUBLISH chan2 hello]
+ assert_equal 0 [r SPUBLISH chan3 hello]
+
+ # clean up clients
+ $rd1 close
+ }
+
+ test "SUBSCRIBE to one channel more than once" {
+ set rd1 [redis_deferring_client]
+ assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
+ assert_equal 1 [r SPUBLISH chan1 hello]
+ assert_equal {message chan1 hello} [$rd1 read]
+
+ # clean up clients
+ $rd1 close
+ }
+
+ test "UNSUBSCRIBE from non-subscribed channels" {
+ set rd1 [redis_deferring_client]
+ assert_equal {0} [sunsubscribe $rd1 {foo}]
+ assert_equal {0} [sunsubscribe $rd1 {bar}]
+ assert_equal {0} [sunsubscribe $rd1 {quux}]
+
+ # clean up clients
+ $rd1 close
+ }
+
+ test "PUBSUB command basics" {
+ r pubsub shardnumsub abc def
+ } {abc 0 def 0}
+
+ test "SPUBLISH/SSUBSCRIBE with two clients" {
+ set rd1 [redis_deferring_client]
+ set rd2 [redis_deferring_client]
+
+ assert_equal {1} [ssubscribe $rd1 {chan1}]
+ assert_equal {1} [ssubscribe $rd2 {chan1}]
+ assert_equal 2 [r SPUBLISH chan1 hello]
+ assert_equal "chan1 2" [r pubsub shardnumsub chan1]
+ assert_equal "chan1" [r pubsub shardchannels]
+
+ # clean up clients
+ $rd1 close
+ $rd2 close
+ }
+
+ test "SPUBLISH/SSUBSCRIBE with PUBLISH/SUBSCRIBE" {
+ set rd1 [redis_deferring_client]
+ set rd2 [redis_deferring_client]
+
+ assert_equal {1} [ssubscribe $rd1 {chan1}]
+ assert_equal {1} [subscribe $rd2 {chan1}]
+ assert_equal 1 [r SPUBLISH chan1 hello]
+ assert_equal 1 [r publish chan1 hello]
+ assert_equal "chan1 1" [r pubsub shardnumsub chan1]
+ assert_equal "chan1 1" [r pubsub numsub chan1]
+ assert_equal "chan1" [r pubsub shardchannels]
+ assert_equal "chan1" [r pubsub channels]
+ }
+}
+
+start_server {tags {"pubsubshard external:skip"}} {
+start_server {tags {"pubsubshard external:skip"}} {
+ set node_0 [srv 0 client]
+ set node_0_host [srv 0 host]
+ set node_0_port [srv 0 port]
+
+ set node_1 [srv -1 client]
+ set node_1_host [srv -1 host]
+ set node_1_port [srv -1 port]
+
+ test {setup replication for following tests} {
+ $node_1 replicaof $node_0_host $node_0_port
+ wait_for_sync $node_1
+ }
+
+ test {publish message to master and receive on replica} {
+ set rd0 [redis_deferring_client node_0_host node_0_port]
+ set rd1 [redis_deferring_client node_1_host node_1_port]
+
+ assert_equal {1} [ssubscribe $rd1 {chan1}]
+ $rd0 SPUBLISH chan1 hello
+ assert_equal {message chan1 hello} [$rd1 read]
+ $rd0 SPUBLISH chan1 world
+ assert_equal {message chan1 world} [$rd1 read]
+ }
+}
+} \ No newline at end of file