summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c169
1 files changed, 154 insertions, 15 deletions
diff --git a/src/cluster.c b/src/cluster.c
index 78e273f34..81322a8aa 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -57,6 +57,7 @@ void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter, int use_pport);
clusterNode *clusterLookupNode(const char *name);
+list *clusterGetNodesServingMySlots(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
@@ -77,7 +78,9 @@ uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
const char *clusterGetMessageTypeString(int type);
+void removeChannelsInSlot(unsigned int slot);
unsigned int countKeysInSlot(unsigned int hashslot);
+unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
/* Links to the next and previous entries for keys in the same slot are stored
@@ -631,6 +634,9 @@ void clusterInit(void) {
/* Initialize data for the Slot to key API. */
slotToKeyInit(server.db);
+ /* The slots -> channels map is a radix tree. Initialize it here. */
+ server.cluster->slots_to_channels = raxNew();
+
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
@@ -1146,6 +1152,17 @@ clusterNode *clusterLookupNode(const char *name) {
return dictGetVal(de);
}
+/* Get all the nodes serving the same slots as myself. */
+list *clusterGetNodesServingMySlots(clusterNode *node) {
+ list *nodes_for_slot = listCreate();
+ clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof;
+ listAddNodeTail(nodes_for_slot, my_primary);
+ for (int i=0; i < my_primary->numslaves; i++) {
+ listAddNodeTail(nodes_for_slot, my_primary->slaves[i]);
+ }
+ return nodes_for_slot;
+}
+
/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
@@ -1921,7 +1938,7 @@ int clusterProcessPacket(clusterLink *link) {
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
- } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+ } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
@@ -2278,7 +2295,7 @@ int clusterProcessPacket(clusterLink *link) {
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
- } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+ } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */
robj *channel, *message;
@@ -2286,8 +2303,10 @@ int clusterProcessPacket(clusterLink *link) {
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
- if (dictSize(server.pubsub_channels) ||
- dictSize(server.pubsub_patterns))
+ if ((type == CLUSTERMSG_TYPE_PUBLISH
+ && serverPubsubSubscriptionCount() > 0)
+ || (type == CLUSTERMSG_TYPE_PUBLISHSHARD
+ && serverPubsubShardSubscriptionCount() > 0))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
@@ -2296,7 +2315,11 @@ int clusterProcessPacket(clusterLink *link) {
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
- pubsubPublishMessage(channel,message);
+ if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
+ pubsubPublishMessageShard(channel, message);
+ } else {
+ pubsubPublishMessage(channel,message);
+ }
decrRefCount(channel);
decrRefCount(message);
}
@@ -2841,7 +2864,7 @@ void clusterBroadcastPong(int target) {
* the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
* positive in this context. */
REDIS_NO_SANITIZE("bounds")
-void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
+void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
unsigned char *payload;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
@@ -2853,7 +2876,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
- clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
+ clusterBuildMessageHdr(hdr,type);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
@@ -2976,7 +2999,28 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
* messages to hosts without receives for a given channel.
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message) {
- clusterSendPublish(NULL, channel, message);
+ clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
+}
+
+/* -----------------------------------------------------------------------------
+ * CLUSTER Pub/Sub shard support
+ *
+ * Publish this message across the slot (primary/replica).
+ * -------------------------------------------------------------------------- */
+void clusterPropagatePublishShard(robj *channel, robj *message) {
+ list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
+ if (listLength(nodes_for_slot) != 0) {
+ listIter li;
+ listNode *ln;
+ listRewind(nodes_for_slot, &li);
+ while((ln = listNext(&li))) {
+ clusterNode *node = listNodeValue(ln);
+ if (node != myself) {
+ clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
+ }
+ }
+ }
+ listRelease(nodes_for_slot);
}
/* -----------------------------------------------------------------------------
@@ -4075,6 +4119,14 @@ int clusterDelSlot(int slot) {
clusterNode *n = server.cluster->slots[slot];
if (!n) return C_ERR;
+
+ /* Cleanup the channels in master/replica as part of slot deletion. */
+ list *nodes_for_slot = clusterGetNodesServingMySlots(n);
+ listNode *ln = listSearchKey(nodes_for_slot, myself);
+ if (ln != NULL) {
+ removeChannelsInSlot(slot);
+ }
+ listRelease(nodes_for_slot);
serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
server.cluster->slots[slot] = NULL;
return C_OK;
@@ -4574,6 +4626,7 @@ const char *clusterGetMessageTypeString(int type) {
case CLUSTERMSG_TYPE_MEET: return "meet";
case CLUSTERMSG_TYPE_FAIL: return "fail";
case CLUSTERMSG_TYPE_PUBLISH: return "publish";
+ case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
@@ -5362,6 +5415,30 @@ NULL
}
}
+void removeChannelsInSlot(unsigned int slot) {
+ unsigned int channelcount = countChannelsInSlot(slot);
+ if (channelcount == 0) return;
+
+ /* Retrieve all the channels for the slot. */
+ robj **channels = zmalloc(sizeof(robj*)*channelcount);
+ raxIterator iter;
+ int j = 0;
+ unsigned char indexed[2];
+
+ indexed[0] = (slot >> 8) & 0xff;
+ indexed[1] = slot & 0xff;
+ raxStart(&iter,server.cluster->slots_to_channels);
+ raxSeek(&iter,">=",indexed,2);
+ while(raxNext(&iter)) {
+ if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
+ channels[j++] = createStringObject((char*)iter.key + 2, iter.key_len - 2);
+ }
+ raxStop(&iter);
+
+ pubsubUnsubscribeShardChannels(channels, channelcount);
+ zfree(channels);
+}
+
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
@@ -6121,6 +6198,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
mc.cmd = cmd;
}
+ int is_pubsubshard = cmd->proc == ssubscribeCommand ||
+ cmd->proc == sunsubscribeCommand ||
+ cmd->proc == spublishCommand;
+
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
@@ -6172,8 +6253,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
importing_slot = 1;
}
} else {
- /* If it is not the first key, make sure it is exactly
- * the same key as the first we saw. */
+ /* If it is not the first key/channel, make sure it is exactly
+ * the same key/channel as the first we saw. */
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
@@ -6183,15 +6264,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
return NULL;
} else {
/* Flag this request as one with multiple different
- * keys. */
+ * keys/channels. */
multiple_keys = 1;
}
}
}
- /* Migrating / Importing slot? Count keys we don't have. */
+ /* Migrating / Importing slot? Count keys we don't have.
+ * If it is pubsubshard command, it isn't required to check
+ * the channel being present or not in the node during the
+ * slot migration, the channel will be served from the source
+ * node until the migration completes with CLUSTER SETSLOT <slot>
+ * NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
- if ((migrating_slot || importing_slot) &&
+ if ((migrating_slot || importing_slot) && !is_pubsubshard &&
lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
{
missing_keys++;
@@ -6207,7 +6293,12 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (server.cluster->state != CLUSTER_OK) {
- if (!server.cluster_allow_reads_when_down) {
+ if (is_pubsubshard) {
+ if (!server.cluster_allow_pubsubshard_when_down) {
+ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
+ return NULL;
+ }
+ } else if (!server.cluster_allow_reads_when_down) {
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
@@ -6259,7 +6350,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* is serving, we can reply without redirection. */
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
- if (c->flags & CLIENT_READONLY &&
+ if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
!is_write_command &&
nodeIsSlave(myself) &&
myself->slaveof == n)
@@ -6482,3 +6573,51 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int countKeysInSlot(unsigned int hashslot) {
return (*server.db->slots_to_keys).by_slot[hashslot].count;
}
+
+/* -----------------------------------------------------------------------------
+ * Operation(s) on channel rax tree.
+ * -------------------------------------------------------------------------- */
+
+void slotToChannelUpdate(sds channel, int add) {
+ size_t keylen = sdslen(channel);
+ unsigned int hashslot = keyHashSlot(channel,keylen);
+ unsigned char buf[64];
+ unsigned char *indexed = buf;
+
+ if (keylen+2 > 64) indexed = zmalloc(keylen+2);
+ indexed[0] = (hashslot >> 8) & 0xff;
+ indexed[1] = hashslot & 0xff;
+ memcpy(indexed+2,channel,keylen);
+ if (add) {
+ raxInsert(server.cluster->slots_to_channels,indexed,keylen+2,NULL,NULL);
+ } else {
+ raxRemove(server.cluster->slots_to_channels,indexed,keylen+2,NULL);
+ }
+ if (indexed != buf) zfree(indexed);
+}
+
+void slotToChannelAdd(sds channel) {
+ slotToChannelUpdate(channel,1);
+}
+
+void slotToChannelDel(sds channel) {
+ slotToChannelUpdate(channel,0);
+}
+
+/* Get the count of the channels for a given slot. */
+unsigned int countChannelsInSlot(unsigned int hashslot) {
+ raxIterator iter;
+ int j = 0;
+ unsigned char indexed[2];
+
+ indexed[0] = (hashslot >> 8) & 0xff;
+ indexed[1] = hashslot & 0xff;
+ raxStart(&iter,server.cluster->slots_to_channels);
+ raxSeek(&iter,">=",indexed,2);
+ while(raxNext(&iter)) {
+ if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
+ j++;
+ }
+ raxStop(&iter);
+ return j;
+}