summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2018-12-11 08:23:54 +0100
committerGitHub <noreply@github.com>2018-12-11 08:23:54 +0100
commit086363babf0e47162d975a265bd8d77e8d49dfb1 (patch)
tree27872e0fc9b3230ce04c75d0590bdbf6f3f7ff67
parent5bfd8ae25301820ae3c321a838263925e70849b5 (diff)
parent27ddb2ba3a8759b306501882bd76760640e6705a (diff)
downloadredis-086363babf0e47162d975a265bd8d77e8d49dfb1.tar.gz
Merge pull request #5681 from artix75/cluster_manager_fix_cmd
Cluster manager fix cmd
-rw-r--r--src/redis-cli.c429
1 files changed, 318 insertions, 111 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 7e558a306..a3fb065d5 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -117,6 +117,7 @@
#define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
+#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
@@ -1378,6 +1379,9 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
+ } else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) {
+ config.cluster_manager_command.flags |=
+ CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
sds version = cliVersion();
printf("redis-cli %s\n", version);
@@ -1846,7 +1850,7 @@ static int evalMode(int argc, char **argv) {
if (eval_ldb) {
if (!config.eval_ldb) {
/* If the debugging session ended immediately, there was an
- * error compiling the script. Show it and don't enter
+ * error compiling the script. Show it and they don't enter
* the REPL at all. */
printf("Eval debugging session can't start:\n");
cliReadReply(0);
@@ -1929,6 +1933,7 @@ static dictType clusterManagerDictType = {
};
typedef int clusterManagerCommandProc(int argc, char **argv);
+typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx);
/* Cluster Manager helper functions */
@@ -1990,14 +1995,17 @@ typedef struct clusterManagerCommandDef {
clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"},
- {"check", clusterManagerCommandCheck, -1, "host:port", NULL},
+ {"check", clusterManagerCommandCheck, -1, "host:port",
+ "search-multiple-owners"},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
- {"fix", clusterManagerCommandFix, -1, "host:port", NULL},
+ {"fix", clusterManagerCommandFix, -1, "host:port",
+ "search-multiple-owners"},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
- "from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
+ "from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
+ "replace"},
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
- "timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
+ "timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
{"add-node", clusterManagerCommandAddNode, 2,
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
@@ -2188,6 +2196,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1;
}
+/* Execute MULTI command on a cluster node. */
+static int clusterManagerStartTransaction(clusterManagerNode *node) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+/* Execute EXEC command on a cluster node. */
+static int clusterManagerExecTransaction(clusterManagerNode *node,
+ clusterManagerOnReplyError onerror)
+{
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (success) {
+ if (reply->type != REDIS_REPLY_ARRAY) {
+ success = 0;
+ goto cleanup;
+ }
+ size_t i;
+ for (i = 0; i < reply->elements; i++) {
+ redisReply *r = reply->element[i];
+ char *err = NULL;
+ success = clusterManagerCheckRedisReply(node, r, &err);
+ if (!success && onerror) success = onerror(r, i);
+ if (err) {
+ if (!success)
+ CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
+ zfree(err);
+ }
+ if (!success) break;
+ }
+ }
+cleanup:
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
static int clusterManagerNodeConnect(clusterManagerNode *node) {
if (node->context) redisFree(node->context);
node->context = redisConnect(node->ip, node->port);
@@ -2746,6 +2792,84 @@ cleanup:
return success;
}
+static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER SETSLOT %d %s", slot, "STABLE");
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
+ int ignore_unassigned_err)
+{
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER DELSLOTS %d", slot);
+ char *err = NULL;
+ int success = clusterManagerCheckRedisReply(node, reply, &err);
+ if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
+ ignore_unassigned_err &&
+ strstr(reply->str, "already unassigned") != NULL) success = 1;
+ if (!success && err != NULL) {
+ CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
+ zfree(err);
+ }
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER ADDSLOTS %d", slot);
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
+ int slot)
+{
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER COUNTKEYSINSLOT %d", slot);
+ int count = -1;
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
+ if (reply) freeReplyObject(reply);
+ return count;
+}
+
+static int clusterManagerBumpEpoch(clusterManagerNode *node) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) {
+ if (bulk_idx == 0 && reply) {
+ if (reply->type == REDIS_REPLY_ERROR)
+ return strstr(reply->str, "already unassigned") != NULL;
+ }
+ return 0;
+}
+
+static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
+ int slot,
+ int do_clear)
+{
+ int success = clusterManagerStartTransaction(owner);
+ if (!success) return 0;
+ /* Ensure the slot is not already assigned. */
+ clusterManagerDelSlot(owner, slot, 1);
+ /* Add the slot and bump epoch. */
+ clusterManagerAddSlot(owner, slot);
+ if (do_clear) clusterManagerClearSlotStatus(owner, slot);
+ clusterManagerBumpEpoch(owner);
+ success = clusterManagerExecTransaction(owner,
+ clusterManagerIgnoreUnassignedErr);
+ return success;
+}
+
/* Migrate keys taken from reply->elements. It returns the reply from the
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
* is not NULL, a dot will be printed for every migrated key. */
@@ -3623,24 +3747,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
listRewind(none, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
+ int s = atoi(slot);
clusterManagerNode *n = clusterManagerNodeMasterRandom();
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
- /* Ensure the slot is not already assigned. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER DELSLOTS %s", slot);
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER ADDSLOTS %s", slot);
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- if (fixed < 0) goto cleanup;
+ if (!clusterManagerSetSlotOwner(n, s, 0)) {
+ fixed = -1;
+ goto cleanup;
+ }
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
- n->slots[atoi(slot)] = 1;
+ n->slots[s] = 1;
fixed++;
}
}
@@ -3648,7 +3765,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
/* Handle case "2": keys only in one node. */
if (listLength(single) > 0) {
- printf("The following uncovered slots have keys in just one node:\n");
+ printf("The following uncovered slots have keys in just one node:\n");
clusterManagerPrintSlotsList(single);
if (confirmWithYes("Fix these slots by covering with those nodes?")){
listIter li;
@@ -3656,6 +3773,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
listRewind(single, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
+ int s = atoi(slot);
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
assert(entry != NULL);
list *nodes = (list *) dictGetVal(entry);
@@ -3664,18 +3782,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = fn->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
- /* Ensure the slot is not already assigned. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER DELSLOTS %s", slot);
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER ADDSLOTS %s", slot);
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- if (fixed < 0) goto cleanup;
+ if (!clusterManagerSetSlotOwner(n, s, 0)) {
+ fixed = -1;
+ goto cleanup;
+ }
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
n->slots[atoi(slot)] = 1;
@@ -3708,23 +3818,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerLogInfo(">>> Covering slot %s moving keys "
"to %s:%d\n", slot,
target->ip, target->port);
- /* Ensure the slot is not already assigned. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(target,
- "CLUSTER DELSLOTS %s", slot);
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(target,
- "CLUSTER ADDSLOTS %s", slot);
- if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- if (fixed < 0) goto cleanup;
- r = CLUSTER_MANAGER_COMMAND(target,
- "CLUSTER SETSLOT %s %s", slot, "STABLE");
- if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
- if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- if (fixed < 0) goto cleanup;
+ if (!clusterManagerSetSlotOwner(target, s, 1)) {
+ fixed = -1;
+ goto cleanup;
+ }
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
target->slots[atoi(slot)] = 1;
@@ -3735,23 +3832,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *src = nln->value;
if (src == target) continue;
/* Assign the slot to target node in the source node. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(src,
- "CLUSTER SETSLOT %s %s %s", slot,
- "NODE", target->name);
- if (!clusterManagerCheckRedisReply(src, r, NULL))
+ if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
fixed = -1;
- if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Set the source node in 'importing' state
* (even if we will actually migrate keys away)
* in order to avoid receiving redirections
* for MIGRATE. */
- r = CLUSTER_MANAGER_COMMAND(src,
- "CLUSTER SETSLOT %s %s %s", slot,
- "IMPORTING", target->name);
- if (!clusterManagerCheckRedisReply(src, r, NULL))
- fixed = -1;
- if (r) freeReplyObject(r);
+ if (!clusterManagerSetSlot(src, target, s,
+ "IMPORTING", NULL)) fixed = -1;
if (fixed < 0) goto cleanup;
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
CLUSTER_MANAGER_OPT_COLD;
@@ -3759,12 +3848,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
fixed = -1;
goto cleanup;
}
- r = CLUSTER_MANAGER_COMMAND(src,
- "CLUSTER SETSLOT %s %s", slot,
- "STABLE");
- if (!clusterManagerCheckRedisReply(src, r, NULL))
+ if (!clusterManagerClearSlotStatus(src, s))
fixed = -1;
- if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
}
fixed++;
@@ -3888,24 +3973,9 @@ static int clusterManagerFixOpenSlot(int slot) {
// Use ADDSLOTS to assign the slot.
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
owner->ip, owner->port);
- redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
- "SETSLOT %d %s",
- slot, "STABLE");
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- if (reply) freeReplyObject(reply);
- if (!success) goto cleanup;
- /* Ensure that the slot is unassigned before assigning it to the
- * owner. */
- reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- /* Ignore "already unassigned" error. */
- if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
- strstr(reply->str, "already unassigned") != NULL) success = 1;
- if (reply) freeReplyObject(reply);
+ success = clusterManagerClearSlotStatus(owner, slot);
if (!success) goto cleanup;
- reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- if (reply) freeReplyObject(reply);
+ success = clusterManagerSetSlotOwner(owner, slot, 0);
if (!success) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@@ -3913,9 +3983,7 @@ static int clusterManagerFixOpenSlot(int slot) {
/* Make sure this information will propagate. Not strictly needed
* since there is no past owner, so all the other nodes will accept
* whatever epoch this node will claim the slot with. */
- reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- if (reply) freeReplyObject(reply);
+ success = clusterManagerBumpEpoch(owner);
if (!success) goto cleanup;
/* Remove the owner from the list of migrating/importing
* nodes. */
@@ -3935,16 +4003,10 @@ static int clusterManagerFixOpenSlot(int slot) {
* the owner has been set in the previous condition (owner == NULL). */
assert(owner != NULL);
listRewind(owners, &li);
- redisReply *reply = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
- reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
- success = clusterManagerCheckRedisReply(n, reply, NULL);
- /* Ignore "already unassigned" error. */
- if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
- strstr(reply->str, "already unassigned") != NULL) success = 1;
- if (reply) freeReplyObject(reply);
+ success = clusterManagerDelSlot(n, slot, 1);
if (!success) goto cleanup;
n->slots[slot] = 0;
/* Assign the slot to the owner in the node 'n' configuration.' */
@@ -3968,6 +4030,7 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
"%s:%d to %s:%d\n", slot,
src->ip, src->port, dst->ip, dst->port);
+ move_opts |= CLUSTER_MANAGER_OPT_UPDATE;
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
}
/* Case 2: There are multiple nodes that claim the slot as importing,
@@ -3986,11 +4049,7 @@ static int clusterManagerFixOpenSlot(int slot) {
if (!success) goto cleanup;
clusterManagerLogInfo(">>> Setting %d as STABLE in "
"%s:%d\n", slot, n->ip, n->port);
-
- redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
- slot, "STABLE");
- success = clusterManagerCheckRedisReply(n, r, NULL);
- if (r) freeReplyObject(r);
+ success = clusterManagerClearSlotStatus(n, slot);
if (!success) goto cleanup;
}
/* Since the slot has been moved in "cold" mode, ensure that all the
@@ -4000,12 +4059,76 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
- redisReply *r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
- success = clusterManagerCheckRedisReply(n, r, NULL);
- if (r) freeReplyObject(r);
+ success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
if (!success) goto cleanup;
}
+ }
+ /* Case 3: The slot is in migrating state in one node but multiple
+ * other nodes claim to be in importing state and don't have any key in
+ * the slot. We search for the importing node having the same ID as
+ * the destination node of the migrating node.
+ * In that case we move the slot from the migrating node to this node and
+ * we close the importing states on all the other importing nodes.
+ * If no importing node has the same ID as the destination node of the
+ * migrating node, the slot's state is closed on both the migrating node
+ * and the importing nodes. */
+ else if (listLength(migrating) == 1 && listLength(importing) > 1) {
+ int try_to_fix = 1;
+ clusterManagerNode *src = listFirst(migrating)->value;
+ clusterManagerNode *dst = NULL;
+ sds target_id = NULL;
+ for (int i = 0; i < src->migrating_count; i += 2) {
+ sds migrating_slot = src->migrating[i];
+ if (atoi(migrating_slot) == slot) {
+ target_id = src->migrating[i + 1];
+ break;
+ }
+ }
+ assert(target_id != NULL);
+ listIter li;
+ listNode *ln;
+ listRewind(importing, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ int count = clusterManagerCountKeysInSlot(n, slot);
+ if (count > 0) {
+ try_to_fix = 0;
+ break;
+ }
+ if (strcmp(n->name, target_id) == 0) dst = n;
+ }
+ if (!try_to_fix) goto unhandled_case;
+ if (dst != NULL) {
+ clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to "
+ "%s:%d and closing it on all the other "
+ "importing nodes.\n",
+ slot, src->ip, src->port,
+ dst->ip, dst->port);
+ /* Move the slot to the destination node. */
+ success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
+ if (!success) goto cleanup;
+ /* Close slot on all the other importing nodes. */
+ listRewind(importing, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ if (dst == n) continue;
+ success = clusterManagerClearSlotStatus(n, slot);
+ if (!success) goto cleanup;
+ }
+ } else {
+ clusterManagerLogInfo(">>> Case 3: Closing slot %d on both "
+ "migrating and importing nodes.\n", slot);
+ /* Close the slot on both the migrating node and the importing
+ * nodes. */
+ success = clusterManagerClearSlotStatus(src, slot);
+ if (!success) goto cleanup;
+ listRewind(importing, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ success = clusterManagerClearSlotStatus(n, slot);
+ if (!success) goto cleanup;
+ }
+ }
} else {
int try_to_close_slot = (listLength(importing) == 0 &&
listLength(migrating) == 1);
@@ -4022,13 +4145,13 @@ static int clusterManagerFixOpenSlot(int slot) {
if (!success) goto cleanup;
}
}
- /* Case 3: There are no slots claiming to be in importing state, but
- * there is a migrating node that actually don't have any key or is the
- * slot owner. We can just close the slot, probably a reshard interrupted
- * in the middle. */
+ /* Case 4: There are no slots claiming to be in importing state, but
+ * there is a migrating node that actually don't have any key or is the
+ * slot owner. We can just close the slot, probably a reshard
+ * interrupted in the middle. */
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
- clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
+ clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n",
slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
slot, "STABLE");
@@ -4036,6 +4159,7 @@ static int clusterManagerFixOpenSlot(int slot) {
if (r) freeReplyObject(r);
if (!success) goto cleanup;
} else {
+unhandled_case:
success = 0;
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
"yet (work in progress). Slot is set as "
@@ -4053,17 +4177,55 @@ cleanup:
return success;
}
+static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
+ clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
+ int success = 0;
+ assert(listLength(owners) > 1);
+ clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
+ slot,
+ NULL);
+ if (!owner) owner = listFirst(owners)->value;
+ clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n",
+ slot, owner->ip, owner->port);
+ /* Set the slot owner. */
+ if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
+ listIter li;
+ listNode *ln;
+ listRewind(cluster_manager.nodes, &li);
+ /* Update configuration in all the other master nodes by assigning the slot
+ * itself to the new owner, and by eventually migrating keys if the node
+ * has keys for the slot. */
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ if (n == owner) continue;
+ if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
+ int count = clusterManagerCountKeysInSlot(n, slot);
+ success = (count >= 0);
+ if (!success) break;
+ clusterManagerDelSlot(n, slot, 1);
+ if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
+ if (count > 0) {
+ int opts = CLUSTER_MANAGER_OPT_VERBOSE |
+ CLUSTER_MANAGER_OPT_COLD;
+ success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
+ if (!success) break;
+ }
+ }
+ return success;
+}
+
static int clusterManagerCheckCluster(int quiet) {
listNode *ln = listFirst(cluster_manager.nodes);
if (!ln) return 0;
- int result = 1;
- int do_fix = config.cluster_manager_command.flags &
- CLUSTER_MANAGER_CMD_FLAG_FIX;
clusterManagerNode *node = ln->value;
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
node->ip, node->port);
+ int result = 1, consistent = 0;
+ int do_fix = config.cluster_manager_command.flags &
+ CLUSTER_MANAGER_CMD_FLAG_FIX;
if (!quiet) clusterManagerShowNodes();
- if (!clusterManagerIsConfigConsistent()) {
+ consistent = clusterManagerIsConfigConsistent();
+ if (!consistent) {
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
clusterManagerOnError(err);
result = 0;
@@ -4071,7 +4233,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerLogOk("[OK] All nodes agree about slots "
"configuration.\n");
}
- // Check open slots
+ /* Check open slots */
clusterManagerLogInfo(">>> Check for open slots...\n");
listIter li;
listRewind(cluster_manager.nodes, &li);
@@ -4130,7 +4292,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerLogErr("%s.\n", (char *) errstr);
sdsfree(errstr);
if (do_fix) {
- // Fix open slots.
+ /* Fix open slots. */
dictReleaseIterator(iter);
iter = dictGetIterator(open_slots);
while ((entry = dictNext(iter)) != NULL) {
@@ -4165,6 +4327,51 @@ static int clusterManagerCheckCluster(int quiet) {
if (fixed > 0) result = 1;
}
}
+ int search_multiple_owners = config.cluster_manager_command.flags &
+ CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
+ if (search_multiple_owners) {
+ /* Check whether there are multiple owners, even when slots are
+ * fully covered and there are no open slots. */
+ clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
+ int slot = 0;
+ for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
+ listIter li;
+ listNode *ln;
+ listRewind(cluster_manager.nodes, &li);
+ list *owners = listCreate();
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
+ if (n->slots[slot]) listAddNodeTail(owners, n);
+ else {
+ /* Nodes having keys for the slot will be considered
+ * owners too. */
+ int count = clusterManagerCountKeysInSlot(n, slot);
+ if (count > 0) listAddNodeTail(owners, n);
+ }
+ }
+ if (listLength(owners) > 1) {
+ result = 0;
+ clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
+ slot, listLength(owners));
+ listRewind(owners, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
+ }
+ if (do_fix) {
+ result = clusterManagerFixMultipleSlotOwners(slot, owners);
+ if (!result) {
+ clusterManagerLogErr("Failed to fix multiple owners "
+ "for slot %d\n", slot);
+ listRelease(owners);
+ break;
+ }
+ }
+ }
+ listRelease(owners);
+ }
+ }
return result;
}