diff options
Diffstat (limited to 'src/cluster.c')
-rw-r--r-- | src/cluster.c | 318 |
1 files changed, 266 insertions, 52 deletions
diff --git a/src/cluster.c b/src/cluster.c index 32335bbf9..701871b36 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -56,7 +56,6 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request); void clusterUpdateState(void); int clusterNodeGetSlotBit(clusterNode *n, int slot); sds clusterGenNodesDescription(int filter, int use_pport); -clusterNode *clusterLookupNode(const char *name); list *clusterGetNodesServingMySlots(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); @@ -74,6 +73,8 @@ void clusterCloseAllSlots(void); void clusterSetNodeAsMaster(clusterNode *n); void clusterDelNode(clusterNode *delnode); sds representClusterNodeFlags(sds ci, uint16_t flags); +sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count); +void clusterFreeNodesSlotsInfo(clusterNode *n); uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len); @@ -210,7 +211,11 @@ int clusterLoadConfig(char *filename) { } /* Create this node if it does not exist */ - n = clusterLookupNode(argv[0]); + if (verifyClusterNodeId(argv[0], sdslen(argv[0])) == C_ERR) { + sdsfreesplitres(argv, argc); + goto fmterr; + } + n = clusterLookupNode(argv[0], sdslen(argv[0])); if (!n) { n = createClusterNode(argv[0],0); clusterAddNode(n); @@ -218,6 +223,17 @@ int clusterLoadConfig(char *filename) { /* Format for the node address information: * ip:port[@cport][,hostname] */ + /* Hostname is an optional argument that defines the endpoint + * that can be reported to clients instead of IP. */ + char *hostname = strchr(argv[1], ','); + if (hostname) { + *hostname = '\0'; + hostname++; + n->hostname = sdscpy(n->hostname, hostname); + } else if (sdslen(n->hostname) != 0) { + sdsclear(n->hostname); + } + /* Address and port */ if ((p = strrchr(argv[1],':')) == NULL) { sdsfreesplitres(argv,argc); @@ -237,17 +253,6 @@ int clusterLoadConfig(char *filename) { * base port. */ n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR; - /* Hostname is an optional argument that defines the endpoint - * that can be reported to clients instead of IP. */ - char *hostname = strchr(p, ','); - if (hostname) { - *hostname = '\0'; - hostname++; - n->hostname = sdscpy(n->hostname, hostname); - } else if (sdslen(n->hostname) != 0) { - sdsclear(n->hostname); - } - /* The plaintext port for client in a TLS cluster (n->pport) is not * stored in nodes.conf. It is received later over the bus protocol. */ @@ -286,7 +291,11 @@ int clusterLoadConfig(char *filename) { /* Get master if any. Set the master and populate master's * slave list. */ if (argv[3][0] != '-') { - master = clusterLookupNode(argv[3]); + if (verifyClusterNodeId(argv[3], sdslen(argv[3])) == C_ERR) { + sdsfreesplitres(argv, argc); + goto fmterr; + } + master = clusterLookupNode(argv[3], sdslen(argv[3])); if (!master) { master = createClusterNode(argv[3],0); clusterAddNode(master); @@ -322,7 +331,14 @@ int clusterLoadConfig(char *filename) { goto fmterr; } p += 3; - cn = clusterLookupNode(p); + + char *pr = strchr(p, ']'); + size_t node_len = pr - p; + if (pr == NULL || verifyClusterNodeId(p, node_len) == C_ERR) { + sdsfreesplitres(argv, argc); + goto fmterr; + } + cn = clusterLookupNode(p, CLUSTER_NAMELEN); if (!cn) { cn = createClusterNode(p,0); clusterAddNode(cn); @@ -796,7 +812,7 @@ void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) { * we would always process the disconnection of the existing inbound link before * accepting a new existing inbound link. Therefore, it's possible to have more than * one inbound link from the same node at the same time. */ - serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %s with fd %d", + serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %.40s with fd %d", node->inbound_link->conn->fd, node->name, link->conn->fd); } node->inbound_link = link; @@ -942,7 +958,9 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->configEpoch = 0; node->flags = flags; memset(node->slots,0,sizeof(node->slots)); - node->slots_info = NULL; + node->slot_info_pairs = NULL; + node->slot_info_pairs_count = 0; + node->slot_info_pairs_alloc = 0; node->numslots = 0; node->numslaves = 0; node->slaves = NULL; @@ -1178,12 +1196,23 @@ void clusterDelNode(clusterNode *delnode) { freeClusterNode(delnode); } -/* Node lookup by name */ -clusterNode *clusterLookupNode(const char *name) { - sds s = sdsnewlen(name, CLUSTER_NAMELEN); - dictEntry *de; +/* Cluster node sanity check. Returns C_OK if the node id + * is valid an C_ERR otherwise. */ +int verifyClusterNodeId(const char *name, int length) { + if (length != CLUSTER_NAMELEN) return C_ERR; + for (int i = 0; i < length; i++) { + if (name[i] >= 'a' && name[i] <= 'z') continue; + if (name[i] >= '0' && name[i] <= '9') continue; + return C_ERR; + } + return C_OK; +} - de = dictFind(server.cluster->nodes,s); +/* Node lookup by name */ +clusterNode *clusterLookupNode(const char *name, int length) { + if (verifyClusterNodeId(name, length) != C_OK) return NULL; + sds s = sdsnewlen(name, length); + dictEntry *de = dictFind(server.cluster->nodes, s); sdsfree(s); if (de == NULL) return NULL; return dictGetVal(de); @@ -1599,7 +1628,7 @@ int clusterStartHandshake(char *ip, int port, int cport) { void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { uint16_t count = ntohs(hdr->count); clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip; - clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); + clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN); while(count--) { uint16_t flags = ntohs(g->flags); @@ -1618,7 +1647,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { } /* Update our state accordingly to the gossip sections */ - node = clusterLookupNode(g->nodename); + node = clusterLookupNode(g->nodename, CLUSTER_NAMELEN); if (node) { /* We already know this node. Handle failure reports, only when the sender is a master. */ @@ -1895,6 +1924,17 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); + } else if (myself->slaveof && myself->slaveof->slaveof) { + /* Safeguard against sub-replicas. A replica's master can turn itself + * into a replica if its last slot is removed. If no other node takes + * over the slot, there is nothing else to trigger replica migration. */ + serverLog(LL_WARNING, + "I'm a sub-replica! Reconfiguring myself as a replica of grandmaster %.40s", + myself->slaveof->slaveof->name); + clusterSetMaster(myself->slaveof->slaveof); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); } else if (dirty_slots_count) { /* If we are here, we received an update message which removed * ownership for certain slots we still have keys about, but still @@ -1970,7 +2010,7 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) { /* We previously validated the extensions, so this function just needs to * handle the extensions. */ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { - clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); + clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN); char *ext_hostname = NULL; uint16_t extensions = ntohs(hdr->extensions); /* Loop through all the extensions and process them */ @@ -2003,7 +2043,7 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { sender = link->node; } else { /* Otherwise, fetch sender based on the message */ - sender = clusterLookupNode(hdr->sender); + sender = clusterLookupNode(hdr->sender, CLUSTER_NAMELEN); /* We know the sender node but haven't associate it with the link. This must * be an inbound link because only for inbound links we didn't know which node * to associate when they were created. */ @@ -2213,7 +2253,7 @@ int clusterProcessPacket(clusterLink *link) { if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { - serverLog(LL_DEBUG,"%s packet received: %s", + serverLog(LL_DEBUG,"%s packet received: %.40s", clusterGetMessageTypeString(type), link->node ? link->node->name : "NULL"); if (!link->inbound) { @@ -2314,7 +2354,7 @@ int clusterProcessPacket(clusterLink *link) { clusterSetNodeAsMaster(sender); } else { /* Node is a slave. */ - clusterNode *master = clusterLookupNode(hdr->slaveof); + clusterNode *master = clusterLookupNode(hdr->slaveof, CLUSTER_NAMELEN); if (nodeIsMaster(sender)) { /* Master turned into a slave! Reconfigure the node. */ @@ -2429,7 +2469,7 @@ int clusterProcessPacket(clusterLink *link) { clusterNode *failing; if (sender) { - failing = clusterLookupNode(hdr->data.fail.about.nodename); + failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN); if (failing && !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF))) { @@ -2517,7 +2557,7 @@ int clusterProcessPacket(clusterLink *link) { ntohu64(hdr->data.update.nodecfg.configEpoch); if (!sender) return 1; /* We don't know the sender. */ - n = clusterLookupNode(hdr->data.update.nodecfg.nodename); + n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN); if (!n) return 1; /* We don't know the reported node. */ if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */ @@ -3148,7 +3188,7 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin clusterNode *node = NULL; if (target != NULL) { - node = clusterLookupNode(target); + node = clusterLookupNode(target, strlen(target)); if (node == NULL || node->link == NULL) return C_ERR; } @@ -4561,6 +4601,22 @@ sds representClusterNodeFlags(sds ci, uint16_t flags) { return ci; } +/* Concatenate the slot ownership information to the given SDS string 'ci'. + * If the slot ownership is in a contiguous block, it's represented as start-end pair, + * else each slot is added separately. */ +sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count) { + for (int i = 0; i< slot_info_pairs_count; i+=2) { + unsigned long start = slot_info_pairs[i]; + unsigned long end = slot_info_pairs[i+1]; + if (start == end) { + ci = sdscatfmt(ci, " %i", start); + } else { + ci = sdscatfmt(ci, " %i-%i", start, end); + } + } + return ci; +} + /* Generate a csv-alike representation of the specified cluster node. * See clusterGenNodesDescription() top comment for more information. * @@ -4609,8 +4665,8 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) { /* Slots served by this instance. If we already have slots info, * append it directly, otherwise, generate slots only if it has. */ - if (node->slots_info) { - ci = sdscatsds(ci, node->slots_info); + if (node->slot_info_pairs) { + ci = representSlotInfo(ci, node->slot_info_pairs, node->slot_info_pairs_count); } else if (node->numslots > 0) { start = -1; for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -4670,12 +4726,15 @@ void clusterGenNodesSlotsInfo(int filter) { * or end of slot. */ if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) { if (!(n->flags & filter)) { - if (n->slots_info == NULL) n->slots_info = sdsempty(); - if (start == i-1) { - n->slots_info = sdscatfmt(n->slots_info," %i",start); - } else { - n->slots_info = sdscatfmt(n->slots_info," %i-%i",start,i-1); + if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) { + if (n->slot_info_pairs_alloc == 0) + n->slot_info_pairs_alloc = 8; + else + n->slot_info_pairs_alloc *= 2; + n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t)); } + n->slot_info_pairs[n->slot_info_pairs_count++] = start; + n->slot_info_pairs[n->slot_info_pairs_count++] = i-1; } if (i == CLUSTER_SLOTS) break; n = server.cluster->slots[i]; @@ -4684,6 +4743,13 @@ void clusterGenNodesSlotsInfo(int filter) { } } +void clusterFreeNodesSlotsInfo(clusterNode *n) { + zfree(n->slot_info_pairs); + n->slot_info_pairs = NULL; + n->slot_info_pairs_count = 0; + n->slot_info_pairs_alloc = 0; +} + /* Generate a csv-alike representation of the nodes we are aware of, * including the "myself" node, and return an SDS string containing the * representation (it is up to the caller to free it). @@ -4718,10 +4784,7 @@ sds clusterGenNodesDescription(int filter, int use_pport) { ci = sdscatlen(ci,"\n",1); /* Release slots info. */ - if (node->slots_info) { - sdsfree(node->slots_info); - node->slots_info = NULL; - } + clusterFreeNodesSlotsInfo(node); } dictReleaseIterator(di); return ci; @@ -4942,6 +5005,136 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in setDeferredArrayLen(c, nested_replylen, nested_elements); } +/* Add detailed information of a node to the output buffer of the given client. */ +void addNodeDetailsToShardReply(client *c, clusterNode *node) { + int reply_count = 0; + void *node_replylen = addReplyDeferredLen(c); + addReplyBulkCString(c, "id"); + addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + reply_count++; + + /* We use server.tls_cluster as a proxy for whether or not + * the remote port is the tls port or not */ + int plaintext_port = server.tls_cluster ? node->pport : node->port; + int tls_port = server.tls_cluster ? node->port : 0; + if (plaintext_port) { + addReplyBulkCString(c, "port"); + addReplyLongLong(c, plaintext_port); + reply_count++; + } + + if (tls_port) { + addReplyBulkCString(c, "tls-port"); + addReplyLongLong(c, tls_port); + reply_count++; + } + + addReplyBulkCString(c, "ip"); + addReplyBulkCString(c, node->ip); + reply_count++; + + addReplyBulkCString(c, "endpoint"); + addReplyBulkCString(c, getPreferredEndpoint(node)); + reply_count++; + + if (node->hostname) { + addReplyBulkCString(c, "hostname"); + addReplyBulkCString(c, node->hostname); + reply_count++; + } + + long long node_offset; + if (node->flags & CLUSTER_NODE_MYSELF) { + node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; + } else { + node_offset = node->repl_offset; + } + + addReplyBulkCString(c, "role"); + addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master"); + reply_count++; + + addReplyBulkCString(c, "replication-offset"); + addReplyLongLong(c, node_offset); + reply_count++; + + addReplyBulkCString(c, "health"); + const char *health_msg = NULL; + if (nodeFailed(node)) { + health_msg = "fail"; + } else if (nodeIsSlave(node) && node_offset == 0) { + health_msg = "loading"; + } else { + health_msg = "online"; + } + addReplyBulkCString(c, health_msg); + reply_count++; + + setDeferredMapLen(c, node_replylen, reply_count); +} + +/* Add the shard reply of a single shard based off the given primary node. */ +void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_info_pairs, int slot_pairs_count) { + addReplyMapLen(c, 2); + addReplyBulkCString(c, "slots"); + if (slot_info_pairs) { + serverAssert((slot_pairs_count % 2) == 0); + addReplyArrayLen(c, slot_pairs_count); + for (int i = 0; i < slot_pairs_count; i++) + addReplyBulkLongLong(c, (unsigned long)slot_info_pairs[i]); + } else { + /* If no slot info pair is provided, the node owns no slots */ + addReplyArrayLen(c, 0); + } + + addReplyBulkCString(c, "nodes"); + list *nodes_for_slot = clusterGetNodesServingMySlots(node); + /* At least the provided node should be serving its slots */ + serverAssert(nodes_for_slot); + addReplyArrayLen(c, listLength(nodes_for_slot)); + if (listLength(nodes_for_slot) != 0) { + listIter li; + listNode *ln; + listRewind(nodes_for_slot, &li); + while ((ln = listNext(&li))) { + clusterNode *node = listNodeValue(ln); + addNodeDetailsToShardReply(c, node); + } + listRelease(nodes_for_slot); + } +} + +/* Add to the output buffer of the given client, an array of slot (start, end) + * pair owned by the shard, also the primary and set of replica(s) along with + * information about each node. */ +void clusterReplyShards(client *c) { + void *shard_replylen = addReplyDeferredLen(c); + int shard_count = 0; + /* This call will add slot_info_pairs to all nodes */ + clusterGenNodesSlotsInfo(0); + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + /* Iterate over all the available nodes in the cluster, for each primary + * node return generate the cluster shards response. if the primary node + * doesn't own any slot, cluster shard response contains the node related + * information and an empty slots array. */ + while((de = dictNext(di)) != NULL) { + clusterNode *n = dictGetVal(de); + if (nodeIsSlave(n)) { + /* You can force a replica to own slots, even though it'll get reverted, + * so freeing the slot pair here just in case. */ + clusterFreeNodesSlotsInfo(n); + continue; + } + shard_count++; + /* n->slot_info_pairs is set to NULL when the the node owns no slots. */ + addShardReplyForClusterShards(c, n, n->slot_info_pairs, n->slot_info_pairs_count); + clusterFreeNodesSlotsInfo(n); + } + dictReleaseIterator(di); + setDeferredArrayLen(c, shard_replylen, shard_count); +} + void clusterReplyMultiBulkSlots(client * c) { /* Format: 1) 1) start slot * 2) end slot @@ -5035,6 +5228,8 @@ void clusterCommand(client *c) { "SLOTS", " Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", +"SHARDS", +" Return information about slot range mappings and the nodes associated with them.", "LINKS", " Return information about all network links between this node and its peers.", " Output format is an array where each array element is a map containing attributes of a link", @@ -5084,6 +5279,9 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) { /* CLUSTER SLOTS */ clusterReplyMultiBulkSlots(c); + } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { + /* CLUSTER SHARDS */ + clusterReplyShards(c); } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ if (dictSize(server.db[0].dict) != 0) { @@ -5181,7 +5379,8 @@ NULL addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); return; } - if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (n == NULL) { addReplyErrorFormat(c,"I don't know about node %s", (char*)c->argv[4]->ptr); return; @@ -5197,7 +5396,8 @@ NULL "I'm already the owner of hash slot %u",slot); return; } - if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (n == NULL) { addReplyErrorFormat(c,"I don't know about node %s", (char*)c->argv[4]->ptr); return; @@ -5213,8 +5413,7 @@ NULL server.cluster->migrating_slots_to[slot] = NULL; } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */ - clusterNode *n = clusterLookupNode(c->argv[4]->ptr); - + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); if (!n) { addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr); @@ -5241,9 +5440,26 @@ NULL server.cluster->migrating_slots_to[slot]) server.cluster->migrating_slots_to[slot] = NULL; + int slot_was_mine = server.cluster->slots[slot] == myself; clusterDelSlot(slot); clusterAddSlot(n,slot); + /* If we are a master left without slots, we should turn into a + * replica of the new master. */ + if (slot_was_mine && + n != myself && + myself->numslots == 0 && + server.cluster_allow_replica_migration) + { + serverLog(LL_WARNING, + "Configuration change detected. Reconfiguring myself " + "as a replica of %.40s", n->name); + clusterSetMaster(n); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | + CLUSTER_TODO_UPDATE_STATE | + CLUSTER_TODO_FSYNC_CONFIG); + } + /* If this node was importing this slot, assigning the slot to * itself also clears the importing status. */ if (n == myself && @@ -5409,8 +5625,7 @@ NULL } } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) { /* CLUSTER FORGET <NODE ID> */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr); - + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); if (!n) { addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); return; @@ -5428,9 +5643,8 @@ NULL addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { /* CLUSTER REPLICATE <NODE ID> */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr); - /* Lookup the specified node in our table. */ + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); if (!n) { addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); return; @@ -5466,7 +5680,7 @@ NULL } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") || !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) { /* CLUSTER SLAVES <NODE ID> */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr); + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); int j; /* Lookup the specified node in our table. */ @@ -5493,7 +5707,7 @@ NULL c->argc == 3) { /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr); + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); if (!n) { addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); |