summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-03-24 11:07:10 +0100
committerantirez <antirez@gmail.com>2015-03-24 16:16:44 +0100
commit3468cd3664e7a7bd123e27493fe551c311c7a63c (patch)
tree9575bb57dd44b7c26d7340eb7c0df6bde7c0a33d
parentd1b5c5defd5651b463cb676f30903463036bf4c3 (diff)
downloadredis-3468cd3664e7a7bd123e27493fe551c311c7a63c.tar.gz
Cluster: redirection refactoring + handling of blocked clients.
There was a bug in Redis Cluster caused by clients blocked in a blocking list pop operation, for keys no longer handled by the instance, or in a condition where the cluster became down after the client blocked. A typical situation is: 1) BLPOP <somekey> 0 2) <somekey> hash slot is resharded to another master. The client will block forever int this case. A symmentrical non-cluster-specific bug happens when an instance is turned from master to slave. In that case it is more serious since this will desynchronize data between slaves and masters. This other bug was discovered as a side effect of thinking about the bug explained and fixed in this commit, but will be fixed in a separated commit.
-rw-r--r--src/blocked.c2
-rw-r--r--src/cluster.c90
-rw-r--r--src/cluster.h9
-rw-r--r--src/redis.c30
4 files changed, 102 insertions, 29 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 8ea2cfc43..9e1b88317 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -59,6 +59,8 @@
* When implementing a new type of blocking opeation, the implementation
* should modify unblockClient() and replyToBlockedClientTimedOut() in order
* to handle the btype-specific behavior of this two functions.
+ * If the blocking operation waits for certain keys to change state, the
+ * clusterRedirectBlockedClientIfNeeded() function should also be updated.
*/
#include "redis.h"
diff --git a/src/cluster.c b/src/cluster.c
index e1ae92f02..916a4be6a 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4768,10 +4768,10 @@ void readwriteCommand(redisClient *c) {
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress).
*
- * REDIS_CLUSTER_REDIR_DOWN if the request addresses a slot which is not
- * bound to any node. In this case the cluster global state should be already
- * "down" but it is fragile to rely on the update of the global state, so
- * we also handle it here. */
+ * REDIS_CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
+ * not bound to any node. In this case the cluster global state should be
+ * already "down" but it is fragile to rely on the update of the global state,
+ * so we also handle it here. */
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *n = NULL;
robj *firstkey = NULL;
@@ -4833,7 +4833,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
if (n == NULL) {
getKeysFreeResult(keyindex);
if (error_code)
- *error_code = REDIS_CLUSTER_REDIR_DOWN;
+ *error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
@@ -4925,3 +4925,83 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
return n;
}
+
+/* Send the client the right redirection code, according to error_code
+ * that should be set to one of REDIS_CLUSTER_REDIR_* macros.
+ *
+ * If REDIS_CLUSTER_REDIR_ASK or REDIS_CLUSTER_REDIR_MOVED error codes
+ * are used, then the node 'n' should not be NULL, but should be the
+ * node we want to mention in the redirection. Moreover hashslot should
+ * be set to the hash slot that caused the redirection. */
+void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code) {
+ if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
+ addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
+ } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
+ /* The request spawns mutliple keys in the same slot,
+ * but the slot is not "stable" currently as there is
+ * a migration or import in progress. */
+ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
+ } else if (error_code == REDIS_CLUSTER_REDIR_DOWN_STATE) {
+ addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
+ } else if (error_code == REDIS_CLUSTER_REDIR_DOWN_UNBOUND) {
+ addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
+ } else if (error_code == REDIS_CLUSTER_REDIR_MOVED ||
+ error_code == REDIS_CLUSTER_REDIR_ASK)
+ {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-%s %d %s:%d\r\n",
+ (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
+ hashslot,n->ip,n->port));
+ } else {
+ redisPanic("getNodeByQuery() unknown error.");
+ }
+}
+
+/* This function is called by the function processing clients incrementally
+ * to detect timeouts, in order to handle the following case:
+ *
+ * 1) A client blocks with BLPOP or similar blocking operation.
+ * 2) The master migrates the hash slot elsewhere or turns into a slave.
+ * 3) The client may remain blocked forever (or up to the max timeout time)
+ * waiting for a key change that will never happen.
+ *
+ * If the client is found to be blocked into an hash slot this node no
+ * longer handles, the client is sent a redirection error, and the function
+ * returns 1. Otherwise 0 is returned and no operation is performed. */
+int clusterRedirectBlockedClientIfNeeded(redisClient *c) {
+ if (c->flags & REDIS_BLOCKED && c->btype == REDIS_BLOCKED_LIST) {
+ dictEntry *de;
+ dictIterator *di;
+
+ /* If the cluster is down, unblock the client with the right error. */
+ if (server.cluster->state == REDIS_CLUSTER_FAIL) {
+ clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
+ return 1;
+ }
+
+ di = dictGetIterator(c->bpop.keys);
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+ int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
+ clusterNode *node = server.cluster->slots[slot];
+
+ /* We send an error and unblock the client if:
+ * 1) The slot is unassigned, emitting a cluster down error.
+ * 2) The slot is not handled by this node, nor being imported. */
+ if (node != myself &&
+ server.cluster->importing_slots_from[slot] == NULL)
+ {
+ if (node == NULL) {
+ clusterRedirectClient(c,NULL,0,
+ REDIS_CLUSTER_REDIR_DOWN_UNBOUND);
+ } else {
+ clusterRedirectClient(c,node,slot,
+ REDIS_CLUSTER_REDIR_MOVED);
+ }
+ return 1;
+ }
+ }
+ dictReleaseIterator(di);
+ }
+ return 0;
+}
diff --git a/src/cluster.h b/src/cluster.h
index 8eaa0ab98..bf442a222 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -26,11 +26,12 @@
/* Redirection errors returned by getNodeByQuery(). */
#define REDIS_CLUSTER_REDIR_NONE 0 /* Node can serve the request. */
-#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* Keys in different slots. */
-#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* Keys in slot resharding. */
+#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* -CROSSSLOT request. */
+#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* -TRYAGAIN redirection required */
#define REDIS_CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */
#define REDIS_CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */
-#define REDIS_CLUSTER_REDIR_DOWN 5 /* -CLUSTERDOWN error. */
+#define REDIS_CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */
+#define REDIS_CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */
struct clusterNode;
@@ -249,5 +250,7 @@ typedef struct {
/* ---------------------- API exported outside cluster.c -------------------- */
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
+int clusterRedirectBlockedClientIfNeeded(redisClient *c);
+void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code);
#endif /* __REDIS_CLUSTER_H */
diff --git a/src/redis.c b/src/redis.c
index 5867c4c9a..309e9088b 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -923,8 +923,14 @@ int clientsCronHandleTimeout(redisClient *c) {
mstime_t now_ms = mstime();
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
+ /* Handle blocking operation specific timeout. */
replyToBlockedClientTimedOut(c);
unblockClient(c);
+ } else if (server.cluster_enabled) {
+ /* Cluster: handle unblock & redirect of clients blocked
+ * into keys no longer served by this server. */
+ if (clusterRedirectBlockedClientIfNeeded(c))
+ unblockClient(c);
}
}
return 0;
@@ -2166,32 +2172,14 @@ int processCommand(redisClient *c) {
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
- addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
+ clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
return REDIS_OK;
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
- if (n == NULL) {
+ if (n == NULL || n != server.cluster->myself) {
flagTransaction(c);
- if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
- addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
- } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
- /* The request spawns mutliple keys in the same slot,
- * but the slot is not "stable" currently as there is
- * a migration or import in progress. */
- addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
- } else if (error_code == REDIS_CLUSTER_REDIR_DOWN) {
- addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Hash slot is unbound\r\n"));
- } else {
- redisPanic("getNodeByQuery() unknown error.");
- }
- return REDIS_OK;
- } else if (n != server.cluster->myself) {
- flagTransaction(c);
- addReplySds(c,sdscatprintf(sdsempty(),
- "-%s %d %s:%d\r\n",
- (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
- hashslot,n->ip,n->port));
+ clusterRedirectClient(c,n,hashslot,error_code);
return REDIS_OK;
}
}