summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-03-20 16:42:49 +0100
committerantirez <antirez@gmail.com>2015-03-20 16:42:52 +0100
commit4f2555aa17504b1f99a5f35a69302cc425b30d74 (patch)
treee6584ee635a5b909b558264abe189383c517b940
parent25c0f5ac63320e85f81750bd0f999fc6f5eba5c1 (diff)
downloadredis-4f2555aa17504b1f99a5f35a69302cc425b30d74.tar.gz
Cluster: refactoring around configEpoch handling.
This commit moves the process of generating a new config epoch without consensus out of the clusterCommand() implementation, in order to make it reusable for other reasons (current target is to have a CLUSTER FAILOVER option forcing the failover when no master majority is reachable). Moreover the commit moves other functions which are similarly related to config epochs in a new logical section of the cluster.c file, just for clarity.
-rw-r--r--src/cluster.c281
1 files changed, 157 insertions, 124 deletions
diff --git a/src/cluster.c b/src/cluster.c
index a5f682ba8..4ff8fb4d6 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -74,27 +74,13 @@ void clusterCloseAllSlots(void);
void clusterSetNodeAsMaster(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
sds representRedisNodeFlags(sds ci, uint16_t flags);
+uint64_t clusterGetMaxEpoch(void);
+int clusterBumpConfigEpochWithoutConsensus(void);
/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
-/* Return the greatest configEpoch found in the cluster. */
-uint64_t clusterGetMaxEpoch(void) {
- uint64_t max = 0;
- dictIterator *di;
- dictEntry *de;
-
- di = dictGetSafeIterator(server.cluster->nodes);
- while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetVal(de);
- if (node->configEpoch > max) max = node->configEpoch;
- }
- dictReleaseIterator(di);
- if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
- return max;
-}
-
/* Load the cluster config from 'filename'.
*
* If the file does not exist or is zero-length (this may happen because
@@ -928,6 +914,137 @@ void clusterRenameNode(clusterNode *node, char *newname) {
}
/* -----------------------------------------------------------------------------
+ * CLUSTER config epoch handling
+ * -------------------------------------------------------------------------- */
+
+/* Return the greatest configEpoch found in the cluster. */
+uint64_t clusterGetMaxEpoch(void) {
+ uint64_t max = 0;
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetSafeIterator(server.cluster->nodes);
+ while((de = dictNext(di)) != NULL) {
+ clusterNode *node = dictGetVal(de);
+ if (node->configEpoch > max) max = node->configEpoch;
+ }
+ dictReleaseIterator(di);
+ if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
+ return max;
+}
+
+/* If this node epoch is zero or is not already the greatest across the
+ * cluster (from the POV of the local configuration), this function will:
+ *
+ * 1) Generate a new config epoch increment the current epoch.
+ * 2) Assign the new epoch to this node, WITHOUT any consensus.
+ * 3) Persist the configuration on disk before sending packets with the
+ * new configuration.
+ *
+ * If the new config epoch is generated and assigend, REDIS_OK is returned,
+ * otherwise REDIS_ERR is returned (since the node has already the greatest
+ * configuration around) and no operation is performed.
+ *
+ * Important note: this function violates the principle that config epochs
+ * should be generated with consensus and should be unique across the cluster.
+ * However Redis Cluster uses this auto-generated new config epochs in two
+ * cases:
+ *
+ * 1) When slots are closed after importing. Otherwise resharding would be
+ * too exansive.
+ * 2) When CLUSTER FAILOVER is called with options that force a slave to
+ * failover its master even if there is not master majority able to
+ * create a new configuration epoch.
+ *
+ * Redis Cluster does not explode using this function, even in the case of
+ * a collision between this node and another node, generating the same
+ * configuration epoch unilaterally, because the config epoch conflict
+ * resolution algorithm will eventually move colliding nodes to different
+ * config epochs. However usign this function may violate the "last failover
+ * wins" rule, so should only be used with care. */
+int clusterBumpConfigEpochWithoutConsensus(void) {
+ uint64_t maxEpoch = clusterGetMaxEpoch();
+
+ if (myself->configEpoch == 0 ||
+ myself->configEpoch != maxEpoch)
+ {
+ server.cluster->currentEpoch++;
+ myself->configEpoch = server.cluster->currentEpoch;
+ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
+ CLUSTER_TODO_FSYNC_CONFIG);
+ redisLog(REDIS_WARNING,
+ "New configEpoch set to %llu",
+ (unsigned long long) myself->configEpoch);
+ return REDIS_OK;
+ } else {
+ return REDIS_ERR;
+ }
+}
+
+/* This function is called when this node is a master, and we receive from
+ * another master a configuration epoch that is equal to our configuration
+ * epoch.
+ *
+ * BACKGROUND
+ *
+ * It is not possible that different slaves get the same config
+ * epoch during a failover election, because the slaves need to get voted
+ * by a majority. However when we perform a manual resharding of the cluster
+ * the node will assign a configuration epoch to itself without to ask
+ * for agreement. Usually resharding happens when the cluster is working well
+ * and is supervised by the sysadmin, however it is possible for a failover
+ * to happen exactly while the node we are resharding a slot to assigns itself
+ * a new configuration epoch, but before it is able to propagate it.
+ *
+ * So technically it is possible in this condition that two nodes end with
+ * the same configuration epoch.
+ *
+ * Another possibility is that there are bugs in the implementation causing
+ * this to happen.
+ *
+ * Moreover when a new cluster is created, all the nodes start with the same
+ * configEpoch. This collision resolution code allows nodes to automatically
+ * end with a different configEpoch at startup automatically.
+ *
+ * In all the cases, we want a mechanism that resolves this issue automatically
+ * as a safeguard. The same configuration epoch for masters serving different
+ * set of slots is not harmful, but it is if the nodes end serving the same
+ * slots for some reason (manual errors or software bugs) without a proper
+ * failover procedure.
+ *
+ * In general we want a system that eventually always ends with different
+ * masters having different configuration epochs whatever happened, since
+ * nothign is worse than a split-brain condition in a distributed system.
+ *
+ * BEHAVIOR
+ *
+ * When this function gets called, what happens is that if this node
+ * has the lexicographically smaller Node ID compared to the other node
+ * with the conflicting epoch (the 'sender' node), it will assign itself
+ * the greatest configuration epoch currently detected among nodes plus 1.
+ *
+ * This means that even if there are multiple nodes colliding, the node
+ * with the greatest Node ID never moves forward, so eventually all the nodes
+ * end with a different configuration epoch.
+ */
+void clusterHandleConfigEpochCollision(clusterNode *sender) {
+ /* Prerequisites: nodes have the same configEpoch and are both masters. */
+ if (sender->configEpoch != myself->configEpoch ||
+ !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
+ /* Don't act if the colliding node has a smaller Node ID. */
+ if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return;
+ /* Get the next ID available at the best of this node knowledge. */
+ server.cluster->currentEpoch++;
+ myself->configEpoch = server.cluster->currentEpoch;
+ clusterSaveConfigOrDie(1);
+ redisLog(REDIS_VERBOSE,
+ "WARNING: configEpoch collision with node %.40s."
+ " configEpoch set to %llu",
+ sender->name,
+ (unsigned long long) myself->configEpoch);
+}
+
+/* -----------------------------------------------------------------------------
* CLUSTER nodes blacklist
*
* The nodes blacklist is just a way to ensure that a given node with a given
@@ -1399,69 +1516,6 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
}
}
-/* This function is called when this node is a master, and we receive from
- * another master a configuration epoch that is equal to our configuration
- * epoch.
- *
- * BACKGROUND
- *
- * It is not possible that different slaves get the same config
- * epoch during a failover election, because the slaves need to get voted
- * by a majority. However when we perform a manual resharding of the cluster
- * the node will assign a configuration epoch to itself without to ask
- * for agreement. Usually resharding happens when the cluster is working well
- * and is supervised by the sysadmin, however it is possible for a failover
- * to happen exactly while the node we are resharding a slot to assigns itself
- * a new configuration epoch, but before it is able to propagate it.
- *
- * So technically it is possible in this condition that two nodes end with
- * the same configuration epoch.
- *
- * Another possibility is that there are bugs in the implementation causing
- * this to happen.
- *
- * Moreover when a new cluster is created, all the nodes start with the same
- * configEpoch. This collision resolution code allows nodes to automatically
- * end with a different configEpoch at startup automatically.
- *
- * In all the cases, we want a mechanism that resolves this issue automatically
- * as a safeguard. The same configuration epoch for masters serving different
- * set of slots is not harmful, but it is if the nodes end serving the same
- * slots for some reason (manual errors or software bugs) without a proper
- * failover procedure.
- *
- * In general we want a system that eventually always ends with different
- * masters having different configuration epochs whatever happened, since
- * nothign is worse than a split-brain condition in a distributed system.
- *
- * BEHAVIOR
- *
- * When this function gets called, what happens is that if this node
- * has the lexicographically smaller Node ID compared to the other node
- * with the conflicting epoch (the 'sender' node), it will assign itself
- * the greatest configuration epoch currently detected among nodes plus 1.
- *
- * This means that even if there are multiple nodes colliding, the node
- * with the greatest Node ID never moves forward, so eventually all the nodes
- * end with a different configuration epoch.
- */
-void clusterHandleConfigEpochCollision(clusterNode *sender) {
- /* Prerequisites: nodes have the same configEpoch and are both masters. */
- if (sender->configEpoch != myself->configEpoch ||
- !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
- /* Don't act if the colliding node has a smaller Node ID. */
- if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return;
- /* Get the next ID available at the best of this node knowledge. */
- server.cluster->currentEpoch++;
- myself->configEpoch = server.cluster->currentEpoch;
- clusterSaveConfigOrDie(1);
- redisLog(REDIS_VERBOSE,
- "WARNING: configEpoch collision with node %.40s."
- " configEpoch set to %llu",
- sender->name,
- (unsigned long long) myself->configEpoch);
-}
-
/* When this function is called, there is a packet to process starting
* at node->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
@@ -3547,30 +3601,28 @@ sds representRedisNodeFlags(sds ci, uint16_t flags) {
/* Generate a csv-alike representation of the specified cluster node.
* See clusterGenNodesDescription() top comment for more information.
*
- * The function appends the node representation to the SDS string 'ci' and
- * returns it (that may point to a different string as usually with the
- * SDS-style API). */
-sds clusterGenNodeDescription(sds ci, clusterNode *node) {
+ * The function returns the string representation as an SDS string. */
+sds clusterGenNodeDescription(clusterNode *node) {
int j, start;
+ sds ci;
/* Node coordinates */
- ci = sdscatlen(ci,node->name,40);
- ci = sdscatfmt(ci," %s:%i ",node->ip,node->port);
+ ci = sdscatprintf(sdsempty(),"%.40s %s:%d ",
+ node->name,
+ node->ip,
+ node->port);
/* Flags */
ci = representRedisNodeFlags(ci, node->flags);
/* Slave of... or just "-" */
- if (node->slaveof) {
- ci = sdscatlen(ci," ",1);
- ci = sdscatlen(ci,node->slaveof->name,40);
- ci = sdscatlen(ci," ",1);
- } else {
+ if (node->slaveof)
+ ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
+ else
ci = sdscatlen(ci," - ",3);
- }
/* Latency from the POV of this node, link status */
- ci = sdscatfmt(ci,"%I %I %U %s",
+ ci = sdscatprintf(ci,"%lld %lld %llu %s",
(long long) node->ping_sent,
(long long) node->pong_received,
(unsigned long long) node->configEpoch,
@@ -3582,19 +3634,6 @@ sds clusterGenNodeDescription(sds ci, clusterNode *node) {
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
int bit;
- /* It is common for a node to have pretty contiguous slots, so
- * optimize this loop by skipping whole 32bit words if they have
- * no set bits. We stop to the penultimate word because last word
- * has special handling when start != -1 (later in the loop). */
- if ((j&31)==0 && j < REDIS_CLUSTER_SLOTS-32) {
- uint32_t *slotword = ((uint32_t*)node->slots)+(j/32);
- if ((start == -1 && *slotword == 0) ||
- (start != -1 && *slotword == UINT32_MAX)) {
- j += 31; /* The for loop will increment j one more time. */
- continue;
- }
- }
-
if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
if (start == -1) start = j;
}
@@ -3640,19 +3679,18 @@ sds clusterGenNodeDescription(sds ci, clusterNode *node) {
* of the CLUSTER NODES function, and as format for the cluster
* configuration file (nodes.conf) for a given node. */
sds clusterGenNodesDescription(int filter) {
- sds ci = sdsempty();
+ sds ci = sdsempty(), ni;
dictIterator *di;
dictEntry *de;
- /* Make room to avoid multiple resizes of the buffer. */
- ci = sdsMakeRoomFor(ci,256*dictSize(server.cluster->nodes));
-
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->flags & filter) continue;
- ci = clusterGenNodeDescription(ci,node);
+ ni = clusterGenNodeDescription(node);
+ ci = sdscatsds(ci,ni);
+ sdsfree(ni);
ci = sdscatlen(ci,"\n",1);
}
dictReleaseIterator(di);
@@ -3918,17 +3956,9 @@ void clusterCommand(redisClient *c) {
* failover happens at the same time we close the slot, the
* configEpoch collision resolution will fix it assigning
* a different epoch to each node. */
- uint64_t maxEpoch = clusterGetMaxEpoch();
-
- if (myself->configEpoch == 0 ||
- myself->configEpoch != maxEpoch)
- {
- server.cluster->currentEpoch++;
- myself->configEpoch = server.cluster->currentEpoch;
- clusterDoBeforeSleep(CLUSTER_TODO_FSYNC_CONFIG);
+ if (clusterBumpConfigEpochWithoutConsensus() == REDIS_OK) {
redisLog(REDIS_WARNING,
- "configEpoch set to %llu after importing slot %d",
- (unsigned long long) myself->configEpoch, slot);
+ "configEpoch updated after importing slot %d", slot);
}
server.cluster->importing_slots_from[slot] = NULL;
}
@@ -3989,7 +4019,10 @@ void clusterCommand(redisClient *c) {
server.cluster->stats_bus_messages_sent,
server.cluster->stats_bus_messages_received
);
- addReplyBulkSds(c, info);
+ addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
+ (unsigned long)sdslen(info)));
+ addReplySds(c,info);
+ addReply(c,shared.crlf);
} else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
int retval = clusterSaveConfig(1);
@@ -4109,7 +4142,7 @@ void clusterCommand(redisClient *c) {
addReplyMultiBulkLen(c,n->numslaves);
for (j = 0; j < n->numslaves; j++) {
- sds ni = clusterGenNodeDescription(sdsempty(),n->slaves[j]);
+ sds ni = clusterGenNodeDescription(n->slaves[j]);
addReplyBulkCString(c,ni);
sdsfree(ni);
}
@@ -4526,7 +4559,7 @@ try_again:
/* Check if the key is here. If not we reply with success as there is
* nothing to migrate (for instance the key expired in the meantime), but
* we include such information in the reply string. */
- if ((o = lookupKeyWrite(c->db,c->argv[3])) == NULL) {
+ if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
@@ -4579,7 +4612,7 @@ try_again:
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
- ssize_t nwritten = 0;
+ int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);