summaryrefslogtreecommitdiff
path: root/src/redis-cli.c
diff options
context:
space:
mode:
authorartix <artix2@gmail.com>2018-11-22 11:47:59 +0100
committerantirez <antirez@gmail.com>2018-12-11 17:59:16 +0100
commite084b8ccbf5d1f29dce94be3dbca8deec5419910 (patch)
treeba536224d3163dc6e6d5924e52dd70f2ad7ad19d /src/redis-cli.c
parentfa726e2af5aae647b5322e4836f0c3ee2597e05e (diff)
downloadredis-e084b8ccbf5d1f29dce94be3dbca8deec5419910.tar.gz
Cluster Manager: check/fix commands now handle multiple owners even if
all slots are covered and not open.
Diffstat (limited to 'src/redis-cli.c')
-rw-r--r--src/redis-cli.c135
1 files changed, 129 insertions, 6 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 8c7ba2b6f..84e0e8020 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -2746,6 +2746,41 @@ cleanup:
return success;
}
+static int clusterManagerDelSlot(clusterManagerNode *node, int slot) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER DELSLOTS %d", slot);
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER ADDSLOTS %d", slot);
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
+ int slot)
+{
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER COUNTKEYSINSLOT %d", slot);
+ int count = -1;
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
+ if (reply) freeReplyObject(reply);
+ return count;
+}
+
+static int clusterManagerBumpEpoch(clusterManagerNode *node) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
/* Migrate keys taken from reply->elements. It returns the reply from the
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
* is not NULL, a dot will be printed for every migrated key. */
@@ -4053,17 +4088,62 @@ cleanup:
return success;
}
+static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
+ clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
+ int success = 0;
+ assert(listLength(owners) > 1);
+ clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
+ slot,
+ NULL);
+ if (!owner) owner = listFirst(owners)->value;
+ clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d",
+ slot, owner->ip, owner->port);
+ /* Set the owner node by calling DELSLOTS in order to unassign the slot
+ * in case it's already assigned to another node and by finally calling
+ * ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it
+ * could reply with an "already unassigned" error and if it should fail
+ * for other reasons, it would lead to a failure in the follwing ADDSLOTS
+ * command. */
+ clusterManagerDelSlot(owner, slot);
+ if (!clusterManagerAddSlot(owner, slot)) return 0;
+ if (!clusterManagerBumpEpoch(owner)) return 0;
+ listIter li;
+ listNode *ln;
+ listRewind(cluster_manager.nodes, &li);
+ /* Update configuration in all the other master nodes by assigning the slot
+ * itself to the new owner, and by eventually migrating keys if the node
+ * has keys for the slot. */
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ if (n == owner) continue;
+ if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
+ int count = clusterManagerCountKeysInSlot(n, slot);
+ success = (count >= 0);
+ if (!success) break;
+ clusterManagerDelSlot(n, slot);
+ if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
+ if (count > 0) {
+ int opts = CLUSTER_MANAGER_OPT_VERBOSE |
+ CLUSTER_MANAGER_OPT_COLD;
+ success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
+ if (!success) break;
+ }
+ }
+ return success;
+}
+
static int clusterManagerCheckCluster(int quiet) {
listNode *ln = listFirst(cluster_manager.nodes);
if (!ln) return 0;
- int result = 1;
- int do_fix = config.cluster_manager_command.flags &
- CLUSTER_MANAGER_CMD_FLAG_FIX;
clusterManagerNode *node = ln->value;
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
node->ip, node->port);
+ int result = 1, consistent = 0;
+ int do_fix = config.cluster_manager_command.flags &
+ CLUSTER_MANAGER_CMD_FLAG_FIX;
if (!quiet) clusterManagerShowNodes();
- if (!clusterManagerIsConfigConsistent()) {
+ consistent = clusterManagerIsConfigConsistent();
+ if (!consistent) {
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
clusterManagerOnError(err);
result = 0;
@@ -4071,7 +4151,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerLogOk("[OK] All nodes agree about slots "
"configuration.\n");
}
- // Check open slots
+ /* Check open slots */
clusterManagerLogInfo(">>> Check for open slots...\n");
listIter li;
listRewind(cluster_manager.nodes, &li);
@@ -4130,7 +4210,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerLogErr("%s.\n", (char *) errstr);
sdsfree(errstr);
if (do_fix) {
- // Fix open slots.
+ /* Fix open slots. */
dictReleaseIterator(iter);
iter = dictGetIterator(open_slots);
while ((entry = dictNext(iter)) != NULL) {
@@ -4165,6 +4245,49 @@ static int clusterManagerCheckCluster(int quiet) {
if (fixed > 0) result = 1;
}
}
+ if (!consistent) {
+ /* Check whether there are multiple owners, even when slots are
+ * fully covered and there are no open slots. */
+ clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
+ int slot = 0;
+ for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
+ listIter li;
+ listNode *ln;
+ listRewind(cluster_manager.nodes, &li);
+ list *owners = listCreate();
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
+ if (n->slots[slot]) listAddNodeTail(owners, n);
+ else {
+ /* Nodes having keys for the slot will be considered
+ * owners too. */
+ int count = clusterManagerCountKeysInSlot(n, slot);
+ if (count > 0) listAddNodeTail(owners, n);
+ }
+ }
+ if (listLength(owners) > 1) {
+ result = 0;
+ clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
+ slot, listLength(owners));
+ listRewind(owners, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ clusterManagerNode *n = ln->value;
+ clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
+ }
+ if (do_fix) {
+ result = clusterManagerFixMultipleSlotOwners(slot, owners);
+ if (!result) {
+ clusterManagerLogErr("Failed to fix multiple owners "
+ "for slot %d\n", slot);
+ listRelease(owners);
+ break;
+ }
+ }
+ }
+ listRelease(owners);
+ }
+ }
return result;
}