summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c318
1 files changed, 266 insertions, 52 deletions
diff --git a/src/cluster.c b/src/cluster.c
index 32335bbf9..701871b36 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -56,7 +56,6 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter, int use_pport);
-clusterNode *clusterLookupNode(const char *name);
list *clusterGetNodesServingMySlots(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
@@ -74,6 +73,8 @@ void clusterCloseAllSlots(void);
void clusterSetNodeAsMaster(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
sds representClusterNodeFlags(sds ci, uint16_t flags);
+sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count);
+void clusterFreeNodesSlotsInfo(clusterNode *n);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
@@ -210,7 +211,11 @@ int clusterLoadConfig(char *filename) {
}
/* Create this node if it does not exist */
- n = clusterLookupNode(argv[0]);
+ if (verifyClusterNodeId(argv[0], sdslen(argv[0])) == C_ERR) {
+ sdsfreesplitres(argv, argc);
+ goto fmterr;
+ }
+ n = clusterLookupNode(argv[0], sdslen(argv[0]));
if (!n) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
@@ -218,6 +223,17 @@ int clusterLoadConfig(char *filename) {
/* Format for the node address information:
* ip:port[@cport][,hostname] */
+ /* Hostname is an optional argument that defines the endpoint
+ * that can be reported to clients instead of IP. */
+ char *hostname = strchr(argv[1], ',');
+ if (hostname) {
+ *hostname = '\0';
+ hostname++;
+ n->hostname = sdscpy(n->hostname, hostname);
+ } else if (sdslen(n->hostname) != 0) {
+ sdsclear(n->hostname);
+ }
+
/* Address and port */
if ((p = strrchr(argv[1],':')) == NULL) {
sdsfreesplitres(argv,argc);
@@ -237,17 +253,6 @@ int clusterLoadConfig(char *filename) {
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
- /* Hostname is an optional argument that defines the endpoint
- * that can be reported to clients instead of IP. */
- char *hostname = strchr(p, ',');
- if (hostname) {
- *hostname = '\0';
- hostname++;
- n->hostname = sdscpy(n->hostname, hostname);
- } else if (sdslen(n->hostname) != 0) {
- sdsclear(n->hostname);
- }
-
/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */
@@ -286,7 +291,11 @@ int clusterLoadConfig(char *filename) {
/* Get master if any. Set the master and populate master's
* slave list. */
if (argv[3][0] != '-') {
- master = clusterLookupNode(argv[3]);
+ if (verifyClusterNodeId(argv[3], sdslen(argv[3])) == C_ERR) {
+ sdsfreesplitres(argv, argc);
+ goto fmterr;
+ }
+ master = clusterLookupNode(argv[3], sdslen(argv[3]));
if (!master) {
master = createClusterNode(argv[3],0);
clusterAddNode(master);
@@ -322,7 +331,14 @@ int clusterLoadConfig(char *filename) {
goto fmterr;
}
p += 3;
- cn = clusterLookupNode(p);
+
+ char *pr = strchr(p, ']');
+ size_t node_len = pr - p;
+ if (pr == NULL || verifyClusterNodeId(p, node_len) == C_ERR) {
+ sdsfreesplitres(argv, argc);
+ goto fmterr;
+ }
+ cn = clusterLookupNode(p, CLUSTER_NAMELEN);
if (!cn) {
cn = createClusterNode(p,0);
clusterAddNode(cn);
@@ -796,7 +812,7 @@ void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) {
* we would always process the disconnection of the existing inbound link before
* accepting a new existing inbound link. Therefore, it's possible to have more than
* one inbound link from the same node at the same time. */
- serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %s with fd %d",
+ serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %.40s with fd %d",
node->inbound_link->conn->fd, node->name, link->conn->fd);
}
node->inbound_link = link;
@@ -942,7 +958,9 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->configEpoch = 0;
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
- node->slots_info = NULL;
+ node->slot_info_pairs = NULL;
+ node->slot_info_pairs_count = 0;
+ node->slot_info_pairs_alloc = 0;
node->numslots = 0;
node->numslaves = 0;
node->slaves = NULL;
@@ -1178,12 +1196,23 @@ void clusterDelNode(clusterNode *delnode) {
freeClusterNode(delnode);
}
-/* Node lookup by name */
-clusterNode *clusterLookupNode(const char *name) {
- sds s = sdsnewlen(name, CLUSTER_NAMELEN);
- dictEntry *de;
+/* Cluster node sanity check. Returns C_OK if the node id
+ * is valid an C_ERR otherwise. */
+int verifyClusterNodeId(const char *name, int length) {
+ if (length != CLUSTER_NAMELEN) return C_ERR;
+ for (int i = 0; i < length; i++) {
+ if (name[i] >= 'a' && name[i] <= 'z') continue;
+ if (name[i] >= '0' && name[i] <= '9') continue;
+ return C_ERR;
+ }
+ return C_OK;
+}
- de = dictFind(server.cluster->nodes,s);
+/* Node lookup by name */
+clusterNode *clusterLookupNode(const char *name, int length) {
+ if (verifyClusterNodeId(name, length) != C_OK) return NULL;
+ sds s = sdsnewlen(name, length);
+ dictEntry *de = dictFind(server.cluster->nodes, s);
sdsfree(s);
if (de == NULL) return NULL;
return dictGetVal(de);
@@ -1599,7 +1628,7 @@ int clusterStartHandshake(char *ip, int port, int cport) {
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
- clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
+ clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
while(count--) {
uint16_t flags = ntohs(g->flags);
@@ -1618,7 +1647,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
}
/* Update our state accordingly to the gossip sections */
- node = clusterLookupNode(g->nodename);
+ node = clusterLookupNode(g->nodename, CLUSTER_NAMELEN);
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
@@ -1895,6 +1924,17 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
+ } else if (myself->slaveof && myself->slaveof->slaveof) {
+ /* Safeguard against sub-replicas. A replica's master can turn itself
+ * into a replica if its last slot is removed. If no other node takes
+ * over the slot, there is nothing else to trigger replica migration. */
+ serverLog(LL_WARNING,
+ "I'm a sub-replica! Reconfiguring myself as a replica of grandmaster %.40s",
+ myself->slaveof->slaveof->name);
+ clusterSetMaster(myself->slaveof->slaveof);
+ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
+ CLUSTER_TODO_UPDATE_STATE|
+ CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) {
/* If we are here, we received an update message which removed
* ownership for certain slots we still have keys about, but still
@@ -1970,7 +2010,7 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) {
/* We previously validated the extensions, so this function just needs to
* handle the extensions. */
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
- clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
+ clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
char *ext_hostname = NULL;
uint16_t extensions = ntohs(hdr->extensions);
/* Loop through all the extensions and process them */
@@ -2003,7 +2043,7 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
sender = link->node;
} else {
/* Otherwise, fetch sender based on the message */
- sender = clusterLookupNode(hdr->sender);
+ sender = clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
/* We know the sender node but haven't associate it with the link. This must
* be an inbound link because only for inbound links we didn't know which node
* to associate when they were created. */
@@ -2213,7 +2253,7 @@ int clusterProcessPacket(clusterLink *link) {
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
- serverLog(LL_DEBUG,"%s packet received: %s",
+ serverLog(LL_DEBUG,"%s packet received: %.40s",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");
if (!link->inbound) {
@@ -2314,7 +2354,7 @@ int clusterProcessPacket(clusterLink *link) {
clusterSetNodeAsMaster(sender);
} else {
/* Node is a slave. */
- clusterNode *master = clusterLookupNode(hdr->slaveof);
+ clusterNode *master = clusterLookupNode(hdr->slaveof, CLUSTER_NAMELEN);
if (nodeIsMaster(sender)) {
/* Master turned into a slave! Reconfigure the node. */
@@ -2429,7 +2469,7 @@ int clusterProcessPacket(clusterLink *link) {
clusterNode *failing;
if (sender) {
- failing = clusterLookupNode(hdr->data.fail.about.nodename);
+ failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN);
if (failing &&
!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
{
@@ -2517,7 +2557,7 @@ int clusterProcessPacket(clusterLink *link) {
ntohu64(hdr->data.update.nodecfg.configEpoch);
if (!sender) return 1; /* We don't know the sender. */
- n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
+ n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
if (!n) return 1; /* We don't know the reported node. */
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
@@ -3148,7 +3188,7 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
clusterNode *node = NULL;
if (target != NULL) {
- node = clusterLookupNode(target);
+ node = clusterLookupNode(target, strlen(target));
if (node == NULL || node->link == NULL) return C_ERR;
}
@@ -4561,6 +4601,22 @@ sds representClusterNodeFlags(sds ci, uint16_t flags) {
return ci;
}
+/* Concatenate the slot ownership information to the given SDS string 'ci'.
+ * If the slot ownership is in a contiguous block, it's represented as start-end pair,
+ * else each slot is added separately. */
+sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count) {
+ for (int i = 0; i< slot_info_pairs_count; i+=2) {
+ unsigned long start = slot_info_pairs[i];
+ unsigned long end = slot_info_pairs[i+1];
+ if (start == end) {
+ ci = sdscatfmt(ci, " %i", start);
+ } else {
+ ci = sdscatfmt(ci, " %i-%i", start, end);
+ }
+ }
+ return ci;
+}
+
/* Generate a csv-alike representation of the specified cluster node.
* See clusterGenNodesDescription() top comment for more information.
*
@@ -4609,8 +4665,8 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
/* Slots served by this instance. If we already have slots info,
* append it directly, otherwise, generate slots only if it has. */
- if (node->slots_info) {
- ci = sdscatsds(ci, node->slots_info);
+ if (node->slot_info_pairs) {
+ ci = representSlotInfo(ci, node->slot_info_pairs, node->slot_info_pairs_count);
} else if (node->numslots > 0) {
start = -1;
for (j = 0; j < CLUSTER_SLOTS; j++) {
@@ -4670,12 +4726,15 @@ void clusterGenNodesSlotsInfo(int filter) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
if (!(n->flags & filter)) {
- if (n->slots_info == NULL) n->slots_info = sdsempty();
- if (start == i-1) {
- n->slots_info = sdscatfmt(n->slots_info," %i",start);
- } else {
- n->slots_info = sdscatfmt(n->slots_info," %i-%i",start,i-1);
+ if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) {
+ if (n->slot_info_pairs_alloc == 0)
+ n->slot_info_pairs_alloc = 8;
+ else
+ n->slot_info_pairs_alloc *= 2;
+ n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t));
}
+ n->slot_info_pairs[n->slot_info_pairs_count++] = start;
+ n->slot_info_pairs[n->slot_info_pairs_count++] = i-1;
}
if (i == CLUSTER_SLOTS) break;
n = server.cluster->slots[i];
@@ -4684,6 +4743,13 @@ void clusterGenNodesSlotsInfo(int filter) {
}
}
+void clusterFreeNodesSlotsInfo(clusterNode *n) {
+ zfree(n->slot_info_pairs);
+ n->slot_info_pairs = NULL;
+ n->slot_info_pairs_count = 0;
+ n->slot_info_pairs_alloc = 0;
+}
+
/* Generate a csv-alike representation of the nodes we are aware of,
* including the "myself" node, and return an SDS string containing the
* representation (it is up to the caller to free it).
@@ -4718,10 +4784,7 @@ sds clusterGenNodesDescription(int filter, int use_pport) {
ci = sdscatlen(ci,"\n",1);
/* Release slots info. */
- if (node->slots_info) {
- sdsfree(node->slots_info);
- node->slots_info = NULL;
- }
+ clusterFreeNodesSlotsInfo(node);
}
dictReleaseIterator(di);
return ci;
@@ -4942,6 +5005,136 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
setDeferredArrayLen(c, nested_replylen, nested_elements);
}
+/* Add detailed information of a node to the output buffer of the given client. */
+void addNodeDetailsToShardReply(client *c, clusterNode *node) {
+ int reply_count = 0;
+ void *node_replylen = addReplyDeferredLen(c);
+ addReplyBulkCString(c, "id");
+ addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
+ reply_count++;
+
+ /* We use server.tls_cluster as a proxy for whether or not
+ * the remote port is the tls port or not */
+ int plaintext_port = server.tls_cluster ? node->pport : node->port;
+ int tls_port = server.tls_cluster ? node->port : 0;
+ if (plaintext_port) {
+ addReplyBulkCString(c, "port");
+ addReplyLongLong(c, plaintext_port);
+ reply_count++;
+ }
+
+ if (tls_port) {
+ addReplyBulkCString(c, "tls-port");
+ addReplyLongLong(c, tls_port);
+ reply_count++;
+ }
+
+ addReplyBulkCString(c, "ip");
+ addReplyBulkCString(c, node->ip);
+ reply_count++;
+
+ addReplyBulkCString(c, "endpoint");
+ addReplyBulkCString(c, getPreferredEndpoint(node));
+ reply_count++;
+
+ if (node->hostname) {
+ addReplyBulkCString(c, "hostname");
+ addReplyBulkCString(c, node->hostname);
+ reply_count++;
+ }
+
+ long long node_offset;
+ if (node->flags & CLUSTER_NODE_MYSELF) {
+ node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
+ } else {
+ node_offset = node->repl_offset;
+ }
+
+ addReplyBulkCString(c, "role");
+ addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
+ reply_count++;
+
+ addReplyBulkCString(c, "replication-offset");
+ addReplyLongLong(c, node_offset);
+ reply_count++;
+
+ addReplyBulkCString(c, "health");
+ const char *health_msg = NULL;
+ if (nodeFailed(node)) {
+ health_msg = "fail";
+ } else if (nodeIsSlave(node) && node_offset == 0) {
+ health_msg = "loading";
+ } else {
+ health_msg = "online";
+ }
+ addReplyBulkCString(c, health_msg);
+ reply_count++;
+
+ setDeferredMapLen(c, node_replylen, reply_count);
+}
+
+/* Add the shard reply of a single shard based off the given primary node. */
+void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_info_pairs, int slot_pairs_count) {
+ addReplyMapLen(c, 2);
+ addReplyBulkCString(c, "slots");
+ if (slot_info_pairs) {
+ serverAssert((slot_pairs_count % 2) == 0);
+ addReplyArrayLen(c, slot_pairs_count);
+ for (int i = 0; i < slot_pairs_count; i++)
+ addReplyBulkLongLong(c, (unsigned long)slot_info_pairs[i]);
+ } else {
+ /* If no slot info pair is provided, the node owns no slots */
+ addReplyArrayLen(c, 0);
+ }
+
+ addReplyBulkCString(c, "nodes");
+ list *nodes_for_slot = clusterGetNodesServingMySlots(node);
+ /* At least the provided node should be serving its slots */
+ serverAssert(nodes_for_slot);
+ addReplyArrayLen(c, listLength(nodes_for_slot));
+ if (listLength(nodes_for_slot) != 0) {
+ listIter li;
+ listNode *ln;
+ listRewind(nodes_for_slot, &li);
+ while ((ln = listNext(&li))) {
+ clusterNode *node = listNodeValue(ln);
+ addNodeDetailsToShardReply(c, node);
+ }
+ listRelease(nodes_for_slot);
+ }
+}
+
+/* Add to the output buffer of the given client, an array of slot (start, end)
+ * pair owned by the shard, also the primary and set of replica(s) along with
+ * information about each node. */
+void clusterReplyShards(client *c) {
+ void *shard_replylen = addReplyDeferredLen(c);
+ int shard_count = 0;
+ /* This call will add slot_info_pairs to all nodes */
+ clusterGenNodesSlotsInfo(0);
+ dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
+ dictEntry *de;
+ /* Iterate over all the available nodes in the cluster, for each primary
+ * node return generate the cluster shards response. if the primary node
+ * doesn't own any slot, cluster shard response contains the node related
+ * information and an empty slots array. */
+ while((de = dictNext(di)) != NULL) {
+ clusterNode *n = dictGetVal(de);
+ if (nodeIsSlave(n)) {
+ /* You can force a replica to own slots, even though it'll get reverted,
+ * so freeing the slot pair here just in case. */
+ clusterFreeNodesSlotsInfo(n);
+ continue;
+ }
+ shard_count++;
+ /* n->slot_info_pairs is set to NULL when the the node owns no slots. */
+ addShardReplyForClusterShards(c, n, n->slot_info_pairs, n->slot_info_pairs_count);
+ clusterFreeNodesSlotsInfo(n);
+ }
+ dictReleaseIterator(di);
+ setDeferredArrayLen(c, shard_replylen, shard_count);
+}
+
void clusterReplyMultiBulkSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
@@ -5035,6 +5228,8 @@ void clusterCommand(client *c) {
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids",
+"SHARDS",
+" Return information about slot range mappings and the nodes associated with them.",
"LINKS",
" Return information about all network links between this node and its peers.",
" Output format is an array where each array element is a map containing attributes of a link",
@@ -5084,6 +5279,9 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
/* CLUSTER SLOTS */
clusterReplyMultiBulkSlots(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
+ /* CLUSTER SHARDS */
+ clusterReplyShards(c);
} else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (dictSize(server.db[0].dict) != 0) {
@@ -5181,7 +5379,8 @@ NULL
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
- if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
+ n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
+ if (n == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
@@ -5197,7 +5396,8 @@ NULL
"I'm already the owner of hash slot %u",slot);
return;
}
- if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
+ n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
+ if (n == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
@@ -5213,8 +5413,7 @@ NULL
server.cluster->migrating_slots_to[slot] = NULL;
} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
- clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
-
+ n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s",
(char*)c->argv[4]->ptr);
@@ -5241,9 +5440,26 @@ NULL
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;
+ int slot_was_mine = server.cluster->slots[slot] == myself;
clusterDelSlot(slot);
clusterAddSlot(n,slot);
+ /* If we are a master left without slots, we should turn into a
+ * replica of the new master. */
+ if (slot_was_mine &&
+ n != myself &&
+ myself->numslots == 0 &&
+ server.cluster_allow_replica_migration)
+ {
+ serverLog(LL_WARNING,
+ "Configuration change detected. Reconfiguring myself "
+ "as a replica of %.40s", n->name);
+ clusterSetMaster(n);
+ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
+ CLUSTER_TODO_UPDATE_STATE |
+ CLUSTER_TODO_FSYNC_CONFIG);
+ }
+
/* If this node was importing this slot, assigning the slot to
* itself also clears the importing status. */
if (n == myself &&
@@ -5409,8 +5625,7 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
/* CLUSTER FORGET <NODE ID> */
- clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
-
+ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
@@ -5428,9 +5643,8 @@ NULL
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
/* CLUSTER REPLICATE <NODE ID> */
- clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
-
/* Lookup the specified node in our table. */
+ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
@@ -5466,7 +5680,7 @@ NULL
} else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
!strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
/* CLUSTER SLAVES <NODE ID> */
- clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
+ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
int j;
/* Lookup the specified node in our table. */
@@ -5493,7 +5707,7 @@ NULL
c->argc == 3)
{
/* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
- clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
+ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);