From 2280f4f7c4d2e1b86323c1bbcfbb2968ae6584c5 Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 27 Nov 2018 12:26:56 +0100 Subject: Cluster Manager: code cleanup. --- src/redis-cli.c | 128 ++++++++++++++++++-------------------------------------- 1 file changed, 41 insertions(+), 87 deletions(-) (limited to 'src/redis-cli.c') diff --git a/src/redis-cli.c b/src/redis-cli.c index 84e0e8020..c1a81b5cd 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2746,10 +2746,23 @@ cleanup: return success; } -static int clusterManagerDelSlot(clusterManagerNode *node, int slot) { +static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) { + redisReply *reply = CLUSTER_MANAGER_COMMAND(node, + "CLUSTER SETSLOT %d %s", slot, "STABLE"); + int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (reply) freeReplyObject(reply); + return success; +} + +static int clusterManagerDelSlot(clusterManagerNode *node, int slot, + int ignore_unassigned_err) +{ redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER DELSLOTS %d", slot); int success = clusterManagerCheckRedisReply(node, reply, NULL); + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + ignore_unassigned_err && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); return success; } @@ -3658,24 +3671,18 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(n, s, 1); + if (!clusterManagerAddSlot(n, s)) fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ - n->slots[atoi(slot)] = 1; + n->slots[s] = 1; fixed++; } } @@ -3691,6 +3698,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { listRewind(single, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; + int s = atoi(slot); dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot); assert(entry != NULL); list *nodes = (list *) dictGetVal(entry); @@ -3700,16 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(n, s, 1); + if (!clusterManagerAddSlot(n, s)) fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3744,21 +3745,12 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "to %s:%d\n", slot, target->ip, target->port); /* Ensure the slot is not already assigned. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER DELSLOTS %s", slot); - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER ADDSLOTS %s", slot); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + clusterManagerDelSlot(target, s, 1); + if (!clusterManagerAddSlot(target, s)) fixed = -1; if (fixed < 0) goto cleanup; - r = CLUSTER_MANAGER_COMMAND(target, - "CLUSTER SETSLOT %s %s", slot, "STABLE"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); - r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); - if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerClearSlotStatus(target, s)) + fixed = -1; + if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3770,23 +3762,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *src = nln->value; if (src == target) continue; /* Assign the slot to target node in the source node. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "NODE", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerSetSlot(src, target, s, "NODE", NULL)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s %s", slot, - "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(src, r, NULL)) - fixed = -1; - if (r) freeReplyObject(r); + if (!clusterManagerSetSlot(src, target, s, + "IMPORTING", NULL)) fixed = -1; if (fixed < 0) goto cleanup; int opts = CLUSTER_MANAGER_OPT_VERBOSE | CLUSTER_MANAGER_OPT_COLD; @@ -3794,12 +3778,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } - r = CLUSTER_MANAGER_COMMAND(src, - "CLUSTER SETSLOT %s %s", slot, - "STABLE"); - if (!clusterManagerCheckRedisReply(src, r, NULL)) + if (!clusterManagerClearSlotStatus(src, s)) fixed = -1; - if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; } fixed++; @@ -3923,24 +3903,13 @@ static int clusterManagerFixOpenSlot(int slot) { // Use ADDSLOTS to assign the slot. clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n", owner->ip, owner->port); - redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER " - "SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerClearSlotStatus(owner, slot); if (!success) goto cleanup; /* Ensure that the slot is unassigned before assigning it to the * owner. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(owner, slot, 1); if (!success) goto cleanup; - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerAddSlot(owner, slot); if (!success) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3948,9 +3917,7 @@ static int clusterManagerFixOpenSlot(int slot) { /* Make sure this information will propagate. Not strictly needed * since there is no past owner, so all the other nodes will accept * whatever epoch this node will claim the slot with. */ - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); + success = clusterManagerBumpEpoch(owner); if (!success) goto cleanup; /* Remove the owner from the list of migrating/importing * nodes. */ @@ -3970,16 +3937,10 @@ static int clusterManagerFixOpenSlot(int slot) { * the owner has been set in the previous condition (owner == NULL). */ assert(owner != NULL); listRewind(owners, &li); - redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); - success = clusterManagerCheckRedisReply(n, reply, NULL); - /* Ignore "already unassigned" error. */ - if (!success && reply && reply->type == REDIS_REPLY_ERROR && - strstr(reply->str, "already unassigned") != NULL) success = 1; - if (reply) freeReplyObject(reply); + success = clusterManagerDelSlot(n, slot, 1); if (!success) goto cleanup; n->slots[slot] = 0; /* Assign the slot to the owner in the node 'n' configuration.' */ @@ -4021,11 +3982,7 @@ static int clusterManagerFixOpenSlot(int slot) { if (!success) goto cleanup; clusterManagerLogInfo(">>> Setting %d as STABLE in " "%s:%d\n", slot, n->ip, n->port); - - redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", - slot, "STABLE"); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerClearSlotStatus(n, slot); if (!success) goto cleanup; } /* Since the slot has been moved in "cold" mode, ensure that all the @@ -4035,10 +3992,7 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerNode *n = ln->value; if (n == owner) continue; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) freeReplyObject(r); + success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL); if (!success) goto cleanup; } } else { @@ -4104,7 +4058,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { * could reply with an "already unassigned" error and if it should fail * for other reasons, it would lead to a failure in the follwing ADDSLOTS * command. */ - clusterManagerDelSlot(owner, slot); + clusterManagerDelSlot(owner, slot, 1); if (!clusterManagerAddSlot(owner, slot)) return 0; if (!clusterManagerBumpEpoch(owner)) return 0; listIter li; @@ -4120,7 +4074,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) { int count = clusterManagerCountKeysInSlot(n, slot); success = (count >= 0); if (!success) break; - clusterManagerDelSlot(n, slot); + clusterManagerDelSlot(n, slot, 1); if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0; if (count > 0) { int opts = CLUSTER_MANAGER_OPT_VERBOSE | -- cgit v1.2.1