diff options
author | artix <artix2@gmail.com> | 2018-05-23 18:00:42 +0200 |
---|---|---|
committer | artix <artix2@gmail.com> | 2019-02-28 16:57:57 +0100 |
commit | 2f499304aafd865e40f121389834a871d4f441cc (patch) | |
tree | 957f6573c8d90f77aab115e292f9566380e84a9c /src | |
parent | 9b0b0b3942b10f4b89cfcff25573cb8ea10e1013 (diff) | |
download | redis-2f499304aafd865e40f121389834a871d4f441cc.tar.gz |
Cluster Manager: check for unreachable nodes during cluster join.
Diffstat (limited to 'src')
-rw-r--r-- | src/redis-cli.c | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c index 97d90837c..1b6355637 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1974,6 +1974,15 @@ typedef struct clusterManagerReshardTableItem { int slot; } clusterManagerReshardTableItem; +/* Info about a cluster internal link. */ + +typedef struct clusterManagerLink { + sds node_name; + sds node_addr; + int connected; + int handshaking; +} clusterManagerLink; + static dictType clusterManagerDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ @@ -2012,6 +2021,7 @@ static void clusterManagerWaitForClusterJoin(void); static int clusterManagerCheckCluster(int quiet); static void clusterManagerLog(int level, const char* fmt, ...); static int clusterManagerIsConfigConsistent(void); +static dict *clusterManagerGetLinkStatus(void); static void clusterManagerOnError(sds err); static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, int len); @@ -3381,10 +3391,43 @@ cleanup: /* Wait until the cluster configuration is consistent. */ static void clusterManagerWaitForClusterJoin(void) { printf("Waiting for the cluster to join\n"); + int counter = 0, check_after = listLength(cluster_manager.nodes) * 2; while(!clusterManagerIsConfigConsistent()) { printf("."); fflush(stdout); sleep(1); + if (++counter > check_after) { + dict *status = clusterManagerGetLinkStatus(); + if (status != NULL && dictSize(status) > 0) { + printf("\n"); + clusterManagerLogErr("Warning: %d nodes may " + "be unreachable\n", dictSize(status)); + dictIterator *iter = dictGetIterator(status); + dictEntry *entry; + while ((entry = dictNext(iter)) != NULL) { + sds nodename = (sds) dictGetKey(entry); + list *from = (list *) dictGetVal(entry); + clusterManagerLogErr(" - Node %s may be unreachable " + "from:\n", nodename); + listIter li; + listNode *ln; + listRewind(from, &li); + while ((ln = listNext(&li)) != NULL) { + sds from_addr = ln->value; + clusterManagerLogErr(" %s\n", from_addr); + sdsfree(from_addr); + } + clusterManagerLogErr("Cluster bus ports must be reachable " + "by every node.\nRemember that " + "cluster bus ports are different " + "from standard instance port.\n"); + listEmpty(from); + } + dictReleaseIterator(iter); + dictRelease(status); + } + counter = 0; + } } printf("\n"); } @@ -3788,6 +3831,91 @@ static int clusterManagerIsConfigConsistent(void) { return consistent; } +static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) { + list *links = NULL; + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES"); + if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup; + links = listCreate(); + char *lines = reply->str, *p, *line; + while ((p = strstr(lines, "\n")) != NULL) { + int i = 0; + *p = '\0'; + line = lines; + lines = p + 1; + char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL; + while ((p = strchr(line, ' ')) != NULL) { + *p = '\0'; + char *token = line; + line = p + 1; + if (i == 0) nodename = token; + else if (i == 1) addr = token; + else if (i == 2) flags = token; + else if (i == 7) link_status = token; + else if (i == 8) break; + i++; + } + if (i == 7) link_status = line; + + if (nodename == NULL || addr == NULL || flags == NULL || + link_status == NULL) + continue; + if (strstr(flags, "myself") != NULL) continue; + int disconnected = ((strstr(flags, "disconnected") != NULL) || + (strstr(link_status, "disconnected"))); + if (disconnected) { + clusterManagerLink *link = malloc(sizeof(*link)); + link->node_name = sdsnew(nodename); + link->node_addr = sdsnew(addr); + link->connected = 0; + link->handshaking = (strstr(flags, "handshaking") != NULL); + listAddNodeTail(links, link); + } + } +cleanup: + if (reply != NULL) freeReplyObject(reply); + return links; +} + +/* Check for disconnected cluster links. It returns a dict whose keys + * are the unreachable node addresses and the values are lists of + * node addresses that cannot reach the unreachable node. */ +static dict *clusterManagerGetLinkStatus(void) { + if (cluster_manager.nodes == NULL) return NULL; + dictType dtype = clusterManagerDictType; + dtype.valDestructor = dictListDestructor; + dict *status = dictCreate(&dtype, NULL); + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *node = ln->value; + list *links = clusterManagerGetDisconnectedLinks(node); + if (links) { + listIter lli; + listNode *lln; + listRewind(links, &lli); + while ((lln = listNext(&lli)) != NULL) { + clusterManagerLink *link = lln->value; + list *from = NULL; + dictEntry *entry = dictFind(status, link->node_addr); + if (entry) from = dictGetVal(entry); + else { + from = listCreate(); + dictAdd(status, sdsdup(link->node_addr), from); + } + sds myaddr = sdsempty(); + myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port); + listAddNodeTail(from, myaddr); + sdsfree(link->node_name); + sdsfree(link->node_addr); + zfree(link); + } + listRelease(links); + } + } + return status; +} + /* Add the error string to cluster_manager.errors and print it. */ static void clusterManagerOnError(sds err) { if (cluster_manager.errors == NULL) |