summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2022-04-27 16:32:17 +0300
committerGitHub <noreply@github.com>2022-04-27 16:32:17 +0300
commitd375595d5e3ae2e5c29e6c00a2dc3d60578fd9fc (patch)
treec4d753d3ee0109e3513a879af8c5487e002d10a3 /src/cluster.c
parentfb4e0d400ff82117104bde5296c477ad95f8dd41 (diff)
parentc1f3020631ea8640f2b6aa666a245dd76635a807 (diff)
downloadredis-7.0.0.tar.gz
Merge pull request #10652 from oranagra/redis-7.0.07.0.0
Redis 7.0.0
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c36
1 files changed, 13 insertions, 23 deletions
diff --git a/src/cluster.c b/src/cluster.c
index 701871b36..adad07e19 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -960,7 +960,6 @@ clusterNode *createClusterNode(char *nodename, int flags) {
memset(node->slots,0,sizeof(node->slots));
node->slot_info_pairs = NULL;
node->slot_info_pairs_count = 0;
- node->slot_info_pairs_alloc = 0;
node->numslots = 0;
node->numslaves = 0;
node->slaves = NULL;
@@ -2507,11 +2506,7 @@ int clusterProcessPacket(clusterLink *link) {
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
- if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
- pubsubPublishMessageShard(channel, message);
- } else {
- pubsubPublishMessage(channel,message);
- }
+ pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
@@ -3200,20 +3195,19 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support
*
- * For now we do very little, just propagating PUBLISH messages across the whole
+ * If `sharded` is 0:
+ * For now we do very little, just propagating [S]PUBLISH messages across the whole
* cluster. In the future we'll try to get smarter and avoiding propagating those
* messages to hosts without receives for a given channel.
- * -------------------------------------------------------------------------- */
-void clusterPropagatePublish(robj *channel, robj *message) {
- clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER Pub/Sub shard support
- *
+ * Otherwise:
* Publish this message across the slot (primary/replica).
* -------------------------------------------------------------------------- */
-void clusterPropagatePublishShard(robj *channel, robj *message) {
+void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
+ if (!sharded) {
+ clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
+ return;
+ }
+
list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
if (listLength(nodes_for_slot) != 0) {
listIter li;
@@ -4726,13 +4720,10 @@ void clusterGenNodesSlotsInfo(int filter) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
if (!(n->flags & filter)) {
- if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) {
- if (n->slot_info_pairs_alloc == 0)
- n->slot_info_pairs_alloc = 8;
- else
- n->slot_info_pairs_alloc *= 2;
- n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t));
+ if (!n->slot_info_pairs) {
+ n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t));
}
+ serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots));
n->slot_info_pairs[n->slot_info_pairs_count++] = start;
n->slot_info_pairs[n->slot_info_pairs_count++] = i-1;
}
@@ -4747,7 +4738,6 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) {
zfree(n->slot_info_pairs);
n->slot_info_pairs = NULL;
n->slot_info_pairs_count = 0;
- n->slot_info_pairs_alloc = 0;
}
/* Generate a csv-alike representation of the nodes we are aware of,