diff options
Diffstat (limited to 'src/cluster.c')
-rw-r--r-- | src/cluster.c | 169 |
1 files changed, 154 insertions, 15 deletions
diff --git a/src/cluster.c b/src/cluster.c index 78e273f34..81322a8aa 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -57,6 +57,7 @@ void clusterUpdateState(void); int clusterNodeGetSlotBit(clusterNode *n, int slot); sds clusterGenNodesDescription(int filter, int use_pport); clusterNode *clusterLookupNode(const char *name); +list *clusterGetNodesServingMySlots(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); @@ -77,7 +78,9 @@ uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len); const char *clusterGetMessageTypeString(int type); +void removeChannelsInSlot(unsigned int slot); unsigned int countKeysInSlot(unsigned int hashslot); +unsigned int countChannelsInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot); /* Links to the next and previous entries for keys in the same slot are stored @@ -631,6 +634,9 @@ void clusterInit(void) { /* Initialize data for the Slot to key API. */ slotToKeyInit(server.db); + /* The slots -> channels map is a radix tree. Initialize it here. */ + server.cluster->slots_to_channels = raxNew(); + /* Set myself->port/cport/pport to my listening ports, we'll just need to * discover the IP address via MEET messages. */ deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport); @@ -1146,6 +1152,17 @@ clusterNode *clusterLookupNode(const char *name) { return dictGetVal(de); } +/* Get all the nodes serving the same slots as myself. */ +list *clusterGetNodesServingMySlots(clusterNode *node) { + list *nodes_for_slot = listCreate(); + clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof; + listAddNodeTail(nodes_for_slot, my_primary); + for (int i=0; i < my_primary->numslaves; i++) { + listAddNodeTail(nodes_for_slot, my_primary->slaves[i]); + } + return nodes_for_slot; +} + /* This is only used after the handshake. When we connect a given IP/PORT * as a result of CLUSTER MEET we don't have the node name yet, so we * pick a random one, and will fix it when we receive the PONG request using @@ -1921,7 +1938,7 @@ int clusterProcessPacket(clusterLink *link) { explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; - } else if (type == CLUSTERMSG_TYPE_PUBLISH) { + } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataPublish) - @@ -2278,7 +2295,7 @@ int clusterProcessPacket(clusterLink *link) { "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); } - } else if (type == CLUSTERMSG_TYPE_PUBLISH) { + } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { if (!sender) return 1; /* We don't know that node. */ robj *channel, *message; @@ -2286,8 +2303,10 @@ int clusterProcessPacket(clusterLink *link) { /* Don't bother creating useless objects if there are no * Pub/Sub subscribers. */ - if (dictSize(server.pubsub_channels) || - dictSize(server.pubsub_patterns)) + if ((type == CLUSTERMSG_TYPE_PUBLISH + && serverPubsubSubscriptionCount() > 0) + || (type == CLUSTERMSG_TYPE_PUBLISHSHARD + && serverPubsubShardSubscriptionCount() > 0)) { channel_len = ntohl(hdr->data.publish.msg.channel_len); message_len = ntohl(hdr->data.publish.msg.message_len); @@ -2296,7 +2315,11 @@ int clusterProcessPacket(clusterLink *link) { message = createStringObject( (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); - pubsubPublishMessage(channel,message); + if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) { + pubsubPublishMessageShard(channel, message); + } else { + pubsubPublishMessage(channel,message); + } decrRefCount(channel); decrRefCount(message); } @@ -2841,7 +2864,7 @@ void clusterBroadcastPong(int target) { * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false * positive in this context. */ REDIS_NO_SANITIZE("bounds") -void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { +void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) { unsigned char *payload; clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; @@ -2853,7 +2876,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { channel_len = sdslen(channel->ptr); message_len = sdslen(message->ptr); - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH); + clusterBuildMessageHdr(hdr,type); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len; @@ -2976,7 +2999,28 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin * messages to hosts without receives for a given channel. * -------------------------------------------------------------------------- */ void clusterPropagatePublish(robj *channel, robj *message) { - clusterSendPublish(NULL, channel, message); + clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH); +} + +/* ----------------------------------------------------------------------------- + * CLUSTER Pub/Sub shard support + * + * Publish this message across the slot (primary/replica). + * -------------------------------------------------------------------------- */ +void clusterPropagatePublishShard(robj *channel, robj *message) { + list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself); + if (listLength(nodes_for_slot) != 0) { + listIter li; + listNode *ln; + listRewind(nodes_for_slot, &li); + while((ln = listNext(&li))) { + clusterNode *node = listNodeValue(ln); + if (node != myself) { + clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); + } + } + } + listRelease(nodes_for_slot); } /* ----------------------------------------------------------------------------- @@ -4075,6 +4119,14 @@ int clusterDelSlot(int slot) { clusterNode *n = server.cluster->slots[slot]; if (!n) return C_ERR; + + /* Cleanup the channels in master/replica as part of slot deletion. */ + list *nodes_for_slot = clusterGetNodesServingMySlots(n); + listNode *ln = listSearchKey(nodes_for_slot, myself); + if (ln != NULL) { + removeChannelsInSlot(slot); + } + listRelease(nodes_for_slot); serverAssert(clusterNodeClearSlotBit(n,slot) == 1); server.cluster->slots[slot] = NULL; return C_OK; @@ -4574,6 +4626,7 @@ const char *clusterGetMessageTypeString(int type) { case CLUSTERMSG_TYPE_MEET: return "meet"; case CLUSTERMSG_TYPE_FAIL: return "fail"; case CLUSTERMSG_TYPE_PUBLISH: return "publish"; + case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard"; case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req"; case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack"; case CLUSTERMSG_TYPE_UPDATE: return "update"; @@ -5362,6 +5415,30 @@ NULL } } +void removeChannelsInSlot(unsigned int slot) { + unsigned int channelcount = countChannelsInSlot(slot); + if (channelcount == 0) return; + + /* Retrieve all the channels for the slot. */ + robj **channels = zmalloc(sizeof(robj*)*channelcount); + raxIterator iter; + int j = 0; + unsigned char indexed[2]; + + indexed[0] = (slot >> 8) & 0xff; + indexed[1] = slot & 0xff; + raxStart(&iter,server.cluster->slots_to_channels); + raxSeek(&iter,">=",indexed,2); + while(raxNext(&iter)) { + if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break; + channels[j++] = createStringObject((char*)iter.key + 2, iter.key_len - 2); + } + raxStop(&iter); + + pubsubUnsubscribeShardChannels(channels, channelcount); + zfree(channels); +} + /* ----------------------------------------------------------------------------- * DUMP, RESTORE and MIGRATE commands * -------------------------------------------------------------------------- */ @@ -6121,6 +6198,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in mc.cmd = cmd; } + int is_pubsubshard = cmd->proc == ssubscribeCommand || + cmd->proc == sunsubscribeCommand || + cmd->proc == spublishCommand; + /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ for (i = 0; i < ms->count; i++) { @@ -6172,8 +6253,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in importing_slot = 1; } } else { - /* If it is not the first key, make sure it is exactly - * the same key as the first we saw. */ + /* If it is not the first key/channel, make sure it is exactly + * the same key/channel as the first we saw. */ if (!equalStringObjects(firstkey,thiskey)) { if (slot != thisslot) { /* Error: multiple keys from different slots. */ @@ -6183,15 +6264,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in return NULL; } else { /* Flag this request as one with multiple different - * keys. */ + * keys/channels. */ multiple_keys = 1; } } } - /* Migrating / Importing slot? Count keys we don't have. */ + /* Migrating / Importing slot? Count keys we don't have. + * If it is pubsubshard command, it isn't required to check + * the channel being present or not in the node during the + * slot migration, the channel will be served from the source + * node until the migration completes with CLUSTER SETSLOT <slot> + * NODE <node-id>. */ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY; - if ((migrating_slot || importing_slot) && + if ((migrating_slot || importing_slot) && !is_pubsubshard && lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) { missing_keys++; @@ -6207,7 +6293,12 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Cluster is globally down but we got keys? We only serve the request * if it is a read command and when allow_reads_when_down is enabled. */ if (server.cluster->state != CLUSTER_OK) { - if (!server.cluster_allow_reads_when_down) { + if (is_pubsubshard) { + if (!server.cluster_allow_pubsubshard_when_down) { + if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; + return NULL; + } + } else if (!server.cluster_allow_reads_when_down) { /* The cluster is configured to block commands when the * cluster is down. */ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; @@ -6259,7 +6350,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * is serving, we can reply without redirection. */ int is_write_command = (c->cmd->flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); - if (c->flags & CLIENT_READONLY && + if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && !is_write_command && nodeIsSlave(myself) && myself->slaveof == n) @@ -6482,3 +6573,51 @@ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int countKeysInSlot(unsigned int hashslot) { return (*server.db->slots_to_keys).by_slot[hashslot].count; } + +/* ----------------------------------------------------------------------------- + * Operation(s) on channel rax tree. + * -------------------------------------------------------------------------- */ + +void slotToChannelUpdate(sds channel, int add) { + size_t keylen = sdslen(channel); + unsigned int hashslot = keyHashSlot(channel,keylen); + unsigned char buf[64]; + unsigned char *indexed = buf; + + if (keylen+2 > 64) indexed = zmalloc(keylen+2); + indexed[0] = (hashslot >> 8) & 0xff; + indexed[1] = hashslot & 0xff; + memcpy(indexed+2,channel,keylen); + if (add) { + raxInsert(server.cluster->slots_to_channels,indexed,keylen+2,NULL,NULL); + } else { + raxRemove(server.cluster->slots_to_channels,indexed,keylen+2,NULL); + } + if (indexed != buf) zfree(indexed); +} + +void slotToChannelAdd(sds channel) { + slotToChannelUpdate(channel,1); +} + +void slotToChannelDel(sds channel) { + slotToChannelUpdate(channel,0); +} + +/* Get the count of the channels for a given slot. */ +unsigned int countChannelsInSlot(unsigned int hashslot) { + raxIterator iter; + int j = 0; + unsigned char indexed[2]; + + indexed[0] = (hashslot >> 8) & 0xff; + indexed[1] = hashslot & 0xff; + raxStart(&iter,server.cluster->slots_to_channels); + raxSeek(&iter,">=",indexed,2); + while(raxNext(&iter)) { + if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break; + j++; + } + raxStop(&iter); + return j; +} |