diff options
author | Oran Agra <oran@redislabs.com> | 2022-04-27 16:32:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-27 16:32:17 +0300 |
commit | d375595d5e3ae2e5c29e6c00a2dc3d60578fd9fc (patch) | |
tree | c4d753d3ee0109e3513a879af8c5487e002d10a3 /src/cluster.c | |
parent | fb4e0d400ff82117104bde5296c477ad95f8dd41 (diff) | |
parent | c1f3020631ea8640f2b6aa666a245dd76635a807 (diff) | |
download | redis-7.0.0.tar.gz |
Merge pull request #10652 from oranagra/redis-7.0.07.0.0
Redis 7.0.0
Diffstat (limited to 'src/cluster.c')
-rw-r--r-- | src/cluster.c | 36 |
1 files changed, 13 insertions, 23 deletions
diff --git a/src/cluster.c b/src/cluster.c index 701871b36..adad07e19 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -960,7 +960,6 @@ clusterNode *createClusterNode(char *nodename, int flags) { memset(node->slots,0,sizeof(node->slots)); node->slot_info_pairs = NULL; node->slot_info_pairs_count = 0; - node->slot_info_pairs_alloc = 0; node->numslots = 0; node->numslaves = 0; node->slaves = NULL; @@ -2507,11 +2506,7 @@ int clusterProcessPacket(clusterLink *link) { message = createStringObject( (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); - if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) { - pubsubPublishMessageShard(channel, message); - } else { - pubsubPublishMessage(channel,message); - } + pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); decrRefCount(channel); decrRefCount(message); } @@ -3200,20 +3195,19 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin /* ----------------------------------------------------------------------------- * CLUSTER Pub/Sub support * - * For now we do very little, just propagating PUBLISH messages across the whole + * If `sharded` is 0: + * For now we do very little, just propagating [S]PUBLISH messages across the whole * cluster. In the future we'll try to get smarter and avoiding propagating those * messages to hosts without receives for a given channel. - * -------------------------------------------------------------------------- */ -void clusterPropagatePublish(robj *channel, robj *message) { - clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH); -} - -/* ----------------------------------------------------------------------------- - * CLUSTER Pub/Sub shard support - * + * Otherwise: * Publish this message across the slot (primary/replica). * -------------------------------------------------------------------------- */ -void clusterPropagatePublishShard(robj *channel, robj *message) { +void clusterPropagatePublish(robj *channel, robj *message, int sharded) { + if (!sharded) { + clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH); + return; + } + list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself); if (listLength(nodes_for_slot) != 0) { listIter li; @@ -4726,13 +4720,10 @@ void clusterGenNodesSlotsInfo(int filter) { * or end of slot. */ if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) { if (!(n->flags & filter)) { - if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) { - if (n->slot_info_pairs_alloc == 0) - n->slot_info_pairs_alloc = 8; - else - n->slot_info_pairs_alloc *= 2; - n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t)); + if (!n->slot_info_pairs) { + n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t)); } + serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots)); n->slot_info_pairs[n->slot_info_pairs_count++] = start; n->slot_info_pairs[n->slot_info_pairs_count++] = i-1; } @@ -4747,7 +4738,6 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) { zfree(n->slot_info_pairs); n->slot_info_pairs = NULL; n->slot_info_pairs_count = 0; - n->slot_info_pairs_alloc = 0; } /* Generate a csv-alike representation of the nodes we are aware of, |