summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorartix <artix2@gmail.com>2018-05-23 18:00:42 +0200
committerartix <artix2@gmail.com>2019-02-28 16:57:57 +0100
commit2f499304aafd865e40f121389834a871d4f441cc (patch)
tree957f6573c8d90f77aab115e292f9566380e84a9c /src
parent9b0b0b3942b10f4b89cfcff25573cb8ea10e1013 (diff)
downloadredis-2f499304aafd865e40f121389834a871d4f441cc.tar.gz
Cluster Manager: check for unreachable nodes during cluster join.
Diffstat (limited to 'src')
-rw-r--r--src/redis-cli.c128
1 files changed, 128 insertions, 0 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 97d90837c..1b6355637 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1974,6 +1974,15 @@ typedef struct clusterManagerReshardTableItem {
int slot;
} clusterManagerReshardTableItem;
+/* Info about a cluster internal link. */
+
+typedef struct clusterManagerLink {
+ sds node_name;
+ sds node_addr;
+ int connected;
+ int handshaking;
+} clusterManagerLink;
+
static dictType clusterManagerDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
@@ -2012,6 +2021,7 @@ static void clusterManagerWaitForClusterJoin(void);
static int clusterManagerCheckCluster(int quiet);
static void clusterManagerLog(int level, const char* fmt, ...);
static int clusterManagerIsConfigConsistent(void);
+static dict *clusterManagerGetLinkStatus(void);
static void clusterManagerOnError(sds err);
static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array,
int len);
@@ -3381,10 +3391,43 @@ cleanup:
/* Wait until the cluster configuration is consistent. */
static void clusterManagerWaitForClusterJoin(void) {
printf("Waiting for the cluster to join\n");
+ int counter = 0, check_after = listLength(cluster_manager.nodes) * 2;
while(!clusterManagerIsConfigConsistent()) {
printf(".");
fflush(stdout);
sleep(1);
+ if (++counter > check_after) {
+ dict *status = clusterManagerGetLinkStatus();
+ if (status != NULL && dictSize(status) > 0) {
+ printf("\n");
+ clusterManagerLogErr("Warning: %d nodes may "
+ "be unreachable\n", dictSize(status));
+ dictIterator *iter = dictGetIterator(status);
+ dictEntry *entry;
+ while ((entry = dictNext(iter)) != NULL) {
+ sds nodename = (sds) dictGetKey(entry);
+ list *from = (list *) dictGetVal(entry);
+ clusterManagerLogErr(" - Node %s may be unreachable "
+ "from:\n", nodename);
+ listIter li;
+ listNode *ln;
+ listRewind(from, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ sds from_addr = ln->value;
+ clusterManagerLogErr(" %s\n", from_addr);
+ sdsfree(from_addr);
+ }
+ clusterManagerLogErr("Cluster bus ports must be reachable "
+ "by every node.\nRemember that "
+ "cluster bus ports are different "
+ "from standard instance port.\n");
+ listEmpty(from);
+ }
+ dictReleaseIterator(iter);
+ dictRelease(status);
+ }
+ counter = 0;
+ }
}
printf("\n");
}
@@ -3788,6 +3831,91 @@ static int clusterManagerIsConfigConsistent(void) {
return consistent;
}
+static list *clusterManagerGetDisconnectedLinks(clusterManagerNode *node) {
+ list *links = NULL;
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER NODES");
+ if (!clusterManagerCheckRedisReply(node, reply, NULL)) goto cleanup;
+ links = listCreate();
+ char *lines = reply->str, *p, *line;
+ while ((p = strstr(lines, "\n")) != NULL) {
+ int i = 0;
+ *p = '\0';
+ line = lines;
+ lines = p + 1;
+ char *nodename = NULL, *addr = NULL, *flags = NULL, *link_status = NULL;
+ while ((p = strchr(line, ' ')) != NULL) {
+ *p = '\0';
+ char *token = line;
+ line = p + 1;
+ if (i == 0) nodename = token;
+ else if (i == 1) addr = token;
+ else if (i == 2) flags = token;
+ else if (i == 7) link_status = token;
+ else if (i == 8) break;
+ i++;
+ }
+ if (i == 7) link_status = line;
+
+ if (nodename == NULL || addr == NULL || flags == NULL ||
+ link_status == NULL)
+ continue;
+ if (strstr(flags, "myself") != NULL) continue;
+ int disconnected = ((strstr(flags, "disconnected") != NULL) ||
+ (strstr(link_status, "disconnected")));
+ if (disconnected) {
+ clusterManagerLink *link = malloc(sizeof(*link));
+ link->node_name = sdsnew(nodename);
+ link->node_addr = sdsnew(addr);
+ link->connected = 0;
+ link->handshaking = (strstr(flags, "handshaking") != NULL);
+ listAddNodeTail(links, link);
+ }
+ }
+cleanup:
+ if (reply != NULL) freeReplyObject(reply);
+ return links;
+}
+
+/* Check for disconnected cluster links. It returns a dict whose keys
+ * are the unreachable node addresses and the values are lists of
+ * node addresses that cannot reach the unreachable node. */
+static dict *clusterManagerGetLinkStatus(void) {
+ if (cluster_manager.nodes == NULL) return NULL;
+ dictType dtype = clusterManagerDictType;
+ dtype.valDestructor = dictListDestructor;
+ dict *status = dictCreate(&dtype, NULL);
+ listIter li;
+ listNode *ln;
+ listRewind(cluster_manager.nodes, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *node = ln->value;
+ list *links = clusterManagerGetDisconnectedLinks(node);
+ if (links) {
+ listIter lli;
+ listNode *lln;
+ listRewind(links, &lli);
+ while ((lln = listNext(&lli)) != NULL) {
+ clusterManagerLink *link = lln->value;
+ list *from = NULL;
+ dictEntry *entry = dictFind(status, link->node_addr);
+ if (entry) from = dictGetVal(entry);
+ else {
+ from = listCreate();
+ dictAdd(status, sdsdup(link->node_addr), from);
+ }
+ sds myaddr = sdsempty();
+ myaddr = sdscatfmt(myaddr, "%s:%u", node->ip, node->port);
+ listAddNodeTail(from, myaddr);
+ sdsfree(link->node_name);
+ sdsfree(link->node_addr);
+ zfree(link);
+ }
+ listRelease(links);
+ }
+ }
+ return status;
+}
+
/* Add the error string to cluster_manager.errors and print it. */
static void clusterManagerOnError(sds err) {
if (cluster_manager.errors == NULL)