summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorartix <artix2@gmail.com>2018-11-27 12:26:56 +0100
committerartix <artix2@gmail.com>2018-12-10 17:43:58 +0100
commiteaac9f9e930d8959d681b7d03d7411bfb18db3a7 (patch)
tree38f0b05b80e89a16a39c60b1a3a16315649c2583
parent5bf13eaaf8f444bb547b00ed5ecd727a93cd8399 (diff)
downloadredis-eaac9f9e930d8959d681b7d03d7411bfb18db3a7.tar.gz
Cluster Manager: code cleanup.
-rw-r--r--src/redis-cli.c128
1 files changed, 41 insertions, 87 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 9d8c0bcaa..a3eadf364 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -2746,10 +2746,23 @@ cleanup:
return success;
}
-static int clusterManagerDelSlot(clusterManagerNode *node, int slot) {
+static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
+ redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
+ "CLUSTER SETSLOT %d %s", slot, "STABLE");
+ int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (reply) freeReplyObject(reply);
+ return success;
+}
+
+static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
+ int ignore_unassigned_err)
+{
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
"CLUSTER DELSLOTS %d", slot);
int success = clusterManagerCheckRedisReply(node, reply, NULL);
+ if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
+ ignore_unassigned_err &&
+ strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
return success;
}
@@ -3658,24 +3671,18 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
listRewind(none, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
+ int s = atoi(slot);
clusterManagerNode *n = clusterManagerNodeMasterRandom();
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER DELSLOTS %s", slot);
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER ADDSLOTS %s", slot);
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
+ clusterManagerDelSlot(n, s, 1);
+ if (!clusterManagerAddSlot(n, s)) fixed = -1;
+ if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1;
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
- n->slots[atoi(slot)] = 1;
+ n->slots[s] = 1;
fixed++;
}
}
@@ -3691,6 +3698,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
listRewind(single, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
+ int s = atoi(slot);
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
assert(entry != NULL);
list *nodes = (list *) dictGetVal(entry);
@@ -3700,16 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER DELSLOTS %s", slot);
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER ADDSLOTS %s", slot);
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
- if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
+ clusterManagerDelSlot(n, s, 1);
+ if (!clusterManagerAddSlot(n, s)) fixed = -1;
+ if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1;
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@@ -3744,21 +3745,12 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
"to %s:%d\n", slot,
target->ip, target->port);
/* Ensure the slot is not already assigned. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(target,
- "CLUSTER DELSLOTS %s", slot);
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(target,
- "CLUSTER ADDSLOTS %s", slot);
- if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
+ clusterManagerDelSlot(target, s, 1);
+ if (!clusterManagerAddSlot(target, s)) fixed = -1;
if (fixed < 0) goto cleanup;
- r = CLUSTER_MANAGER_COMMAND(target,
- "CLUSTER SETSLOT %s %s", slot, "STABLE");
- if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
- r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
- if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
- if (r) freeReplyObject(r);
+ if (!clusterManagerClearSlotStatus(target, s))
+ fixed = -1;
+ if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1;
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@@ -3770,23 +3762,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *src = nln->value;
if (src == target) continue;
/* Assign the slot to target node in the source node. */
- redisReply *r = CLUSTER_MANAGER_COMMAND(src,
- "CLUSTER SETSLOT %s %s %s", slot,
- "NODE", target->name);
- if (!clusterManagerCheckRedisReply(src, r, NULL))
+ if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
fixed = -1;
- if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Set the source node in 'importing' state
* (even if we will actually migrate keys away)
* in order to avoid receiving redirections
* for MIGRATE. */
- r = CLUSTER_MANAGER_COMMAND(src,
- "CLUSTER SETSLOT %s %s %s", slot,
- "IMPORTING", target->name);
- if (!clusterManagerCheckRedisReply(src, r, NULL))
- fixed = -1;
- if (r) freeReplyObject(r);
+ if (!clusterManagerSetSlot(src, target, s,
+ "IMPORTING", NULL)) fixed = -1;
if (fixed < 0) goto cleanup;
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
CLUSTER_MANAGER_OPT_COLD;
@@ -3794,12 +3778,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
fixed = -1;
goto cleanup;
}
- r = CLUSTER_MANAGER_COMMAND(src,
- "CLUSTER SETSLOT %s %s", slot,
- "STABLE");
- if (!clusterManagerCheckRedisReply(src, r, NULL))
+ if (!clusterManagerClearSlotStatus(src, s))
fixed = -1;
- if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
}
fixed++;
@@ -3923,24 +3903,13 @@ static int clusterManagerFixOpenSlot(int slot) {
// Use ADDSLOTS to assign the slot.
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
owner->ip, owner->port);
- redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
- "SETSLOT %d %s",
- slot, "STABLE");
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- if (reply) freeReplyObject(reply);
+ success = clusterManagerClearSlotStatus(owner, slot);
if (!success) goto cleanup;
/* Ensure that the slot is unassigned before assigning it to the
* owner. */
- reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- /* Ignore "already unassigned" error. */
- if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
- strstr(reply->str, "already unassigned") != NULL) success = 1;
- if (reply) freeReplyObject(reply);
+ success = clusterManagerDelSlot(owner, slot, 1);
if (!success) goto cleanup;
- reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- if (reply) freeReplyObject(reply);
+ success = clusterManagerAddSlot(owner, slot);
if (!success) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@@ -3948,9 +3917,7 @@ static int clusterManagerFixOpenSlot(int slot) {
/* Make sure this information will propagate. Not strictly needed
* since there is no past owner, so all the other nodes will accept
* whatever epoch this node will claim the slot with. */
- reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
- success = clusterManagerCheckRedisReply(owner, reply, NULL);
- if (reply) freeReplyObject(reply);
+ success = clusterManagerBumpEpoch(owner);
if (!success) goto cleanup;
/* Remove the owner from the list of migrating/importing
* nodes. */
@@ -3970,16 +3937,10 @@ static int clusterManagerFixOpenSlot(int slot) {
* the owner has been set in the previous condition (owner == NULL). */
assert(owner != NULL);
listRewind(owners, &li);
- redisReply *reply = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
- reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
- success = clusterManagerCheckRedisReply(n, reply, NULL);
- /* Ignore "already unassigned" error. */
- if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
- strstr(reply->str, "already unassigned") != NULL) success = 1;
- if (reply) freeReplyObject(reply);
+ success = clusterManagerDelSlot(n, slot, 1);
if (!success) goto cleanup;
n->slots[slot] = 0;
/* Assign the slot to the owner in the node 'n' configuration.' */
@@ -4021,11 +3982,7 @@ static int clusterManagerFixOpenSlot(int slot) {
if (!success) goto cleanup;
clusterManagerLogInfo(">>> Setting %d as STABLE in "
"%s:%d\n", slot, n->ip, n->port);
-
- redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
- slot, "STABLE");
- success = clusterManagerCheckRedisReply(n, r, NULL);
- if (r) freeReplyObject(r);
+ success = clusterManagerClearSlotStatus(n, slot);
if (!success) goto cleanup;
}
/* Since the slot has been moved in "cold" mode, ensure that all the
@@ -4035,10 +3992,7 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
- redisReply *r = CLUSTER_MANAGER_COMMAND(n,
- "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
- success = clusterManagerCheckRedisReply(n, r, NULL);
- if (r) freeReplyObject(r);
+ success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
if (!success) goto cleanup;
}
} else {
@@ -4104,7 +4058,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
* could reply with an "already unassigned" error and if it should fail
* for other reasons, it would lead to a failure in the follwing ADDSLOTS
* command. */
- clusterManagerDelSlot(owner, slot);
+ clusterManagerDelSlot(owner, slot, 1);
if (!clusterManagerAddSlot(owner, slot)) return 0;
if (!clusterManagerBumpEpoch(owner)) return 0;
listIter li;
@@ -4120,7 +4074,7 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
int count = clusterManagerCountKeysInSlot(n, slot);
success = (count >= 0);
if (!success) break;
- clusterManagerDelSlot(n, slot);
+ clusterManagerDelSlot(n, slot, 1);
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
if (count > 0) {
int opts = CLUSTER_MANAGER_OPT_VERBOSE |