summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPing Xie <pingxie@google.com>2022-11-16 19:24:18 -0800
committerGitHub <noreply@github.com>2022-11-16 19:24:18 -0800
commit203b12e41ff7981f0fae5b23819f072d61594813 (patch)
treeff5f2f829bbfcc6928190a01d401ee4b1ebbaf9e /src
parent2168ccc661791ced6271c5e4ab0f5eb60b1559e2 (diff)
downloadredis-203b12e41ff7981f0fae5b23819f072d61594813.tar.gz
Introduce Shard IDs to logically group nodes in cluster mode (#10536)
Introduce Shard IDs to logically group nodes in cluster mode. 1. Added a new "shard_id" field to "cluster nodes" output and nodes.conf after "hostname" 2. Added a new PING extension to propagate "shard_id" 3. Handled upgrade from pre-7.2 releases automatically 4. Refactored PING extension assembling/parsing logic Behavior of Shard IDs: Replicas will always follow the shards of their reported primaries. If a primary updates its shard ID, the replica will follow. (This need not follow for cluster v2) This is not an expected use case.
Diffstat (limited to 'src')
-rw-r--r--src/cluster.c579
-rw-r--r--src/cluster.h8
-rw-r--r--src/commands.c12
-rw-r--r--src/commands/cluster-myshardid.json18
-rw-r--r--src/server.h4
5 files changed, 462 insertions, 159 deletions
diff --git a/src/cluster.c b/src/cluster.c
index d3b800198..436ed014c 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -40,6 +40,7 @@
#include <sys/stat.h>
#include <sys/file.h>
#include <math.h>
+#include <ctype.h>
/* A global reference to myself is handy to make code more clear.
* Myself always points to server.cluster->myself, that is, the clusterNode
@@ -56,7 +57,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter, int use_pport);
-list *clusterGetNodesServingMySlots(clusterNode *node);
+list *clusterGetNodesInMyShard(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
@@ -83,6 +84,11 @@ void removeChannelsInSlot(unsigned int slot);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
+void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
+list *clusterLookupNodeListByShardId(const char *shard_id);
+void clusterRemoveNodeFromShard(clusterNode *node);
+int auxShardIdSetter(clusterNode *n, void *value, int length);
+sds auxShardIdGetter(clusterNode *n, sds s);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
/* Links to the next and previous entries for keys in the same slot are stored
@@ -127,6 +133,85 @@ static ConnectionType *connTypeOfCluster() {
return connectionTypeTcp();
}
+/* Cluster shards hash table, mapping shard id to list of nodes */
+dictType clusterSdsToListType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ dictListDestructor, /* val destructor */
+ NULL /* allow to expand */
+};
+
+/* Aux fields are introduced in Redis 7.2 to support the persistence
+ * of various important node properties, such as shard id, in nodes.conf.
+ * Aux fields take an explicit format of name=value pairs and have no
+ * intrinsic order among them. Aux fields are always grouped together
+ * at the end of the second column of each row after the node's IP
+ * address/port/cluster_port and the optional hostname. Aux fields
+ * are separated by ','. */
+
+/* Aux field setter function prototype
+ * return C_OK when the update is successful; C_ERR otherwise */
+typedef int (aux_value_setter) (clusterNode* n, void *value, int length);
+/* Aux field getter function prototype
+ * return an sds that is a concatenation of the input sds string and
+ * the aux value */
+typedef sds (aux_value_getter) (clusterNode* n, sds s);
+
+typedef struct {
+ char *field;
+ aux_value_setter *setter;
+ aux_value_getter *getter;
+ int present;
+} auxFieldHandler;
+
+/* Assign index to each aux field */
+typedef enum {
+ af_start,
+ af_shard_id = af_start,
+ af_count,
+} auxFieldIndex;
+
+/* Note that
+ * 1. the order of the elements below must match that of their
+ * indices as defined in auxFieldIndex
+ * 2. aux name can contain characters that pass the isValidAuxChar check only */
+auxFieldHandler auxFieldHandlers[] = {
+ {"shard-id", auxShardIdSetter, auxShardIdGetter, 0},
+};
+
+int isValidAuxChar(int c) {
+ return isalnum(c) || (strchr("!#$%&()*+-.:;<>?@[]^_{|}~", c) != NULL);
+}
+
+int isValidAuxString(sds s) {
+ for (unsigned i = 0; i < sdslen(s); i++) {
+ if (!isValidAuxChar(s[i])) return 0;
+ }
+ return 1;
+}
+
+int auxShardIdSetter(clusterNode *n, void *value, int length) {
+ if (verifyClusterNodeId(value, length) == C_ERR) {
+ return C_ERR;
+ }
+ memcpy(n->shard_id, value, CLUSTER_NAMELEN);
+ /* if n already has replicas, make sure they all agree
+ * on the shard id */
+ for (int i = 0; i < n->numslaves; i++) {
+ if (memcmp(n->slaves[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) {
+ return C_ERR;
+ }
+ }
+ clusterAddNodeToShard(value, n);
+ return C_OK;
+}
+
+sds auxShardIdGetter(clusterNode *n, sds s) {
+ return sdscatprintf(s, "%.40s", n->shard_id);
+}
/* clusterLink send queue blocks */
typedef struct {
@@ -185,8 +270,8 @@ int clusterLoadConfig(char *filename) {
maxline = 1024+CLUSTER_SLOTS*128;
line = zmalloc(maxline);
while(fgets(line,maxline,fp) != NULL) {
- int argc;
- sds *argv;
+ int argc, aux_argc;
+ sds *argv, *aux_argv;
clusterNode *n, *master;
char *p, *s;
@@ -236,27 +321,85 @@ int clusterLoadConfig(char *filename) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}
- /* Format for the node address information:
- * ip:port[@cport][,hostname] */
+ /* Format for the node address and auxiliary argument information:
+ * ip:port[@cport][,hostname[,aux=val]*] */
+
+ aux_argv = sdssplitlen(argv[1], sdslen(argv[1]), ",", 1, &aux_argc);
+ if (aux_argv == NULL) {
+ sdsfreesplitres(argv,argc);
+ goto fmterr;
+ }
/* Hostname is an optional argument that defines the endpoint
* that can be reported to clients instead of IP. */
- char *hostname = strchr(argv[1], ',');
- if (hostname) {
- *hostname = '\0';
- hostname++;
- n->hostname = sdscpy(n->hostname, hostname);
+ if (aux_argc > 1 && sdslen(aux_argv[1]) > 0) {
+ n->hostname = sdscpy(n->hostname, aux_argv[1]);
} else if (sdslen(n->hostname) != 0) {
sdsclear(n->hostname);
}
+ /* All fields after hostname are auxiliary and they take on
+ * the format of "aux=val" where both aux and val can contain
+ * characters that pass the isValidAuxChar check only. The order
+ * of the aux fields is insignificant. */
+
+ for (int i = 2; i < aux_argc; i++) {
+ int field_argc;
+ sds *field_argv;
+ field_argv = sdssplitlen(aux_argv[i], sdslen(aux_argv[i]), "=", 1, &field_argc);
+ if (field_argv == NULL || field_argc != 2) {
+ /* Invalid aux field format */
+ if (field_argv != NULL) sdsfreesplitres(field_argv, field_argc);
+ sdsfreesplitres(argv,argc);
+ goto fmterr;
+ }
+
+ /* Validate that both aux and value contain valid characters only */
+ for (unsigned j = 0; j < 2; j++) {
+ if (!isValidAuxString(field_argv[j])) {
+ /* Invalid aux field format */
+ sdsfreesplitres(field_argv, field_argc);
+ sdsfreesplitres(argv,argc);
+ goto fmterr;
+ }
+ }
+
+ /* Note that we don't expect lots of aux fields in the foreseeable
+ * future so a linear search is completely fine. */
+ int field_found = 0;
+ for (unsigned j = 0; j < numElements(auxFieldHandlers); j++) {
+ if (sdslen(field_argv[0]) != strlen(auxFieldHandlers[j].field) ||
+ memcmp(field_argv[0], auxFieldHandlers[j].field, sdslen(field_argv[0])) != 0) {
+ continue;
+ }
+ field_found = 1;
+ if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
+ /* Invalid aux field format */
+ sdsfreesplitres(field_argv, field_argc);
+ sdsfreesplitres(argv,argc);
+ goto fmterr;
+ }
+ auxFieldHandlers[j].present = 1;
+ }
+
+ if (field_found == 0) {
+ /* Invalid aux field format */
+ sdsfreesplitres(field_argv, field_argc);
+ sdsfreesplitres(argv,argc);
+ goto fmterr;
+ }
+
+ sdsfreesplitres(field_argv, field_argc);
+ }
+
/* Address and port */
- if ((p = strrchr(argv[1],':')) == NULL) {
+ if ((p = strrchr(aux_argv[0],':')) == NULL) {
+ sdsfreesplitres(aux_argv, aux_argc);
sdsfreesplitres(argv,argc);
goto fmterr;
}
*p = '\0';
- memcpy(n->ip,argv[1],strlen(argv[1])+1);
+ memcpy(n->ip,aux_argv[0],strlen(aux_argv[0])+1);
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
@@ -272,6 +415,8 @@ int clusterLoadConfig(char *filename) {
/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */
+ sdsfreesplitres(aux_argv, aux_argc);
+
/* Parse flags */
p = s = argv[2];
while(p) {
@@ -316,8 +461,27 @@ int clusterLoadConfig(char *filename) {
master = createClusterNode(argv[3],0);
clusterAddNode(master);
}
+ /* shard_id can be absent if we are loading a nodes.conf generated
+ * by an older version of Redis; we should follow the primary's
+ * shard_id in this case */
+ if (auxFieldHandlers[af_shard_id].present == 0) {
+ memcpy(n->shard_id, master->shard_id, CLUSTER_NAMELEN);
+ clusterAddNodeToShard(master->shard_id, n);
+ } else if (clusterGetNodesInMyShard(master) != NULL &&
+ memcmp(master->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0)
+ {
+ /* If the primary has been added to a shard, make sure this
+ * node has the same persisted shard id as the primary. */
+ goto fmterr;
+ }
n->slaveof = master;
clusterNodeAddSlave(master,n);
+ } else if (auxFieldHandlers[af_shard_id].present == 0) {
+ /* n is a primary but it does not have a persisted shard_id.
+ * This happens if we are loading a nodes.conf generated by
+ * an older version of Redis. We should manually update the
+ * shard membership in this case */
+ clusterAddNodeToShard(n->shard_id, n);
}
/* Set ping sent / pong received timestamps */
@@ -403,7 +567,7 @@ int clusterLoadConfig(char *filename) {
fmterr:
serverLog(LL_WARNING,
- "Unrecoverable error: corrupted cluster config file.");
+ "Unrecoverable error: corrupted cluster config file \"%s\".", line);
zfree(line);
if (fp) fclose(fp);
exit(1);
@@ -622,6 +786,8 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) {
/* Previous and new hostname are the same, no need to update. */
if (new && !strcmp(new, node->hostname)) {
return;
+ } else if (!new && (sdslen(node->hostname) == 0)) {
+ return;
}
if (new) {
@@ -629,6 +795,26 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) {
} else if (sdslen(node->hostname) != 0) {
sdsclear(node->hostname);
}
+ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
+}
+
+static void updateShardId(clusterNode *node, const char *shard_id) {
+ if (memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) {
+ clusterRemoveNodeFromShard(node);
+ memcpy(node->shard_id, shard_id, CLUSTER_NAMELEN);
+ clusterAddNodeToShard(shard_id, node);
+ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
+ }
+ if (myself != node && myself->slaveof == node) {
+ if (memcmp(myself->shard_id, shard_id, CLUSTER_NAMELEN) != 0) {
+ /* shard-id can diverge right after a rolling upgrade
+ * from pre-7.2 releases */
+ clusterRemoveNodeFromShard(myself);
+ memcpy(myself->shard_id, shard_id, CLUSTER_NAMELEN);
+ clusterAddNodeToShard(shard_id, myself);
+ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
+ }
+ }
}
/* Update my hostname based on server configuration values */
@@ -647,6 +833,7 @@ void clusterInit(void) {
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType);
+ server.cluster->shards = dictCreate(&clusterSdsToListType);
server.cluster->nodes_black_list =
dictCreate(&clusterNodesBlackListDictType);
server.cluster->failover_auth_time = 0;
@@ -682,6 +869,7 @@ void clusterInit(void) {
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
+ clusterAddNodeToShard(myself->shard_id, myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1);
@@ -772,6 +960,9 @@ void clusterReset(int hard) {
/* Unassign all the slots. */
for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
+ /* Recreate shards dict */
+ dictEmpty(server.cluster->shards, NULL);
+
/* Forget all the nodes, but myself. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
@@ -797,10 +988,14 @@ void clusterReset(int hard) {
dictDelete(server.cluster->nodes,oldname);
sdsfree(oldname);
getRandomHexChars(myself->name, CLUSTER_NAMELEN);
+ getRandomHexChars(myself->shard_id, CLUSTER_NAMELEN);
clusterAddNode(myself);
serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
}
+ /* Re-populate shards */
+ clusterAddNodeToShard(myself->shard_id, myself);
+
/* Make sure to persist the new config and update the state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
@@ -1029,6 +1224,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
memcpy(node->name, nodename, CLUSTER_NAMELEN);
else
getRandomHexChars(node->name, CLUSTER_NAMELEN);
+ getRandomHexChars(node->shard_id, CLUSTER_NAMELEN);
node->ctime = mstime();
node->configEpoch = 0;
node->flags = flags;
@@ -1238,7 +1434,8 @@ void clusterAddNode(clusterNode *node) {
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
- * 3) Free the node with freeClusterNode() that will in turn remove it
+ * 3) Remove the node from the owning shard
+ * 4) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of slaves of its master, if
* it is a slave node.
*/
@@ -1267,7 +1464,10 @@ void clusterDelNode(clusterNode *delnode) {
}
dictReleaseIterator(di);
- /* 3) Free the node, unlinking it from the cluster. */
+ /* 3) Remove the node from the owning shard */
+ clusterRemoveNodeFromShard(delnode);
+
+ /* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}
@@ -1293,19 +1493,17 @@ clusterNode *clusterLookupNode(const char *name, int length) {
return dictGetVal(de);
}
-/* Get all the nodes serving the same slots as the given node. */
-list *clusterGetNodesServingMySlots(clusterNode *node) {
- list *nodes_for_slot = listCreate();
- clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof;
-
- /* This function is only valid for fully connected nodes, so
- * they should have a known primary. */
- serverAssert(my_primary);
- listAddNodeTail(nodes_for_slot, my_primary);
- for (int i=0; i < my_primary->numslaves; i++) {
- listAddNodeTail(nodes_for_slot, my_primary->slaves[i]);
- }
- return nodes_for_slot;
+/* Get all the nodes in my shard.
+ * Note that the list returned is not computed on the fly
+ * via slaveof; rather, it is maintained permanently to
+ * track the shard membership and its life cycle is tied
+ * to this Redis process. Therefore, the caller must not
+ * release the list. */
+list *clusterGetNodesInMyShard(clusterNode *node) {
+ sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN);
+ dictEntry *de = dictFind(server.cluster->shards,s);
+ sdsfree(s);
+ return (de != NULL) ? dictGetVal(de) : NULL;
}
/* This is only used after the handshake. When we connect a given IP/PORT
@@ -1325,6 +1523,38 @@ void clusterRenameNode(clusterNode *node, char *newname) {
clusterAddNode(node);
}
+void clusterAddNodeToShard(const char *shard_id, clusterNode *node) {
+ sds s = sdsnewlen(shard_id, CLUSTER_NAMELEN);
+ dictEntry *de = dictFind(server.cluster->shards,s);
+ if (de == NULL) {
+ list *l = listCreate();
+ listAddNodeTail(l, node);
+ serverAssert(dictAdd(server.cluster->shards, s, l) == DICT_OK);
+ } else {
+ list *l = dictGetVal(de);
+ if (listSearchKey(l, node) == NULL) {
+ listAddNodeTail(l, node);
+ }
+ sdsfree(s);
+ }
+}
+
+void clusterRemoveNodeFromShard(clusterNode *node) {
+ sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN);
+ dictEntry *de = dictFind(server.cluster->shards, s);
+ if (de != NULL) {
+ list *l = dictGetVal(de);
+ listNode *ln = listSearchKey(l, node);
+ if (ln != NULL) {
+ listDelNode(l, ln);
+ }
+ if (listLength(l) == 0) {
+ dictDelete(server.cluster->shards, s);
+ }
+ }
+ sdsfree(s);
+}
+
/* -----------------------------------------------------------------------------
* CLUSTER config epoch handling
* -------------------------------------------------------------------------- */
@@ -2060,7 +2290,7 @@ static uint32_t getPingExtLength(clusterMsgPingExt *ext) {
/* Returns the initial position of ping extensions. May return an invalid
* address if there are no ping extensions. */
-static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, uint16_t count) {
+static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, int count) {
clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]);
return initial;
}
@@ -2072,49 +2302,108 @@ static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) {
return next;
}
-/* Returns the exact size needed to store the hostname. The returned value
- * will be 8 byte padded. */
-int getHostnamePingExtSize() {
- /* If hostname is not set, we don't send this extension */
- if (sdslen(myself->hostname) == 0) return 0;
+/* All PING extensions must be 8-byte aligned */
+uint32_t getAlignedPingExtSize(uint32_t dataSize) {
- int totlen = sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(sdslen(myself->hostname) + 1);
- return totlen;
+ return sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(dataSize);
+}
+
+uint32_t getHostnamePingExtSize() {
+ if (sdslen(myself->hostname) == 0) {
+ return 0;
+ }
+ return getAlignedPingExtSize(sdslen(myself->hostname) + 1);
+}
+
+uint32_t getShardIdPingExtSize() {
+ return getAlignedPingExtSize(sizeof(clusterMsgPingExtShardId));
+}
+
+uint32_t getForgottenNodeExtSize() {
+ return getAlignedPingExtSize(sizeof(clusterMsgPingExtForgottenNode));
}
-/* Write the hostname ping extension at the start of the cursor. This function
- * will update the cursor to point to the end of the written extension and
- * will return the amount of bytes written. */
-int writeHostnamePingExt(clusterMsgPingExt **cursor) {
- /* If hostname is not set, we don't send this extension */
- if (sdslen(myself->hostname) == 0) return 0;
-
- /* Add the hostname information at the extension cursor */
- clusterMsgPingExtHostname *ext = &(*cursor)->ext[0].hostname;
- memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname));
- uint32_t extension_size = getHostnamePingExtSize();
-
- /* Move the write cursor */
- (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_HOSTNAME);
- (*cursor)->length = htonl(extension_size);
- /* Make sure the string is NULL terminated by adding 1 */
- *cursor = (clusterMsgPingExt *) ((intptr_t)ext + EIGHT_BYTE_ALIGN(sdslen(myself->hostname) + 1));
- return extension_size;
-}
-
-/* Write the forgotten node ping extension at the start of the cursor, update
- * the cursor to point to the end of the written extension and return the number
- * of bytes written. */
-int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) {
- serverAssert(sdslen(name) == CLUSTER_NAMELEN);
- clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node;
- memcpy(ext->name, name, CLUSTER_NAMELEN);
- ext->ttl = htonu64(ttl);
- uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode);
- (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE);
- (*cursor)->length = htonl(extension_size);
- *cursor = (clusterMsgPingExt *) (ext + 1);
- return extension_size;
+void *preparePingExt(clusterMsgPingExt *ext, uint16_t type, uint32_t length) {
+ ext->type = htons(type);
+ ext->length = htonl(length);
+ return &ext->ext[0];
+}
+
+clusterMsgPingExt *nextPingExt(clusterMsgPingExt *ext) {
+ return (clusterMsgPingExt *)((char*)ext + ntohl(ext->length));
+}
+
+/* 1. If a NULL hdr is provided, compute the extension size;
+ * 2. If a non-NULL hdr is provided, write the hostname ping
+ * extension at the start of the cursor. This function
+ * will update the cursor to point to the end of the
+ * written extension and will return the amount of bytes
+ * written. */
+uint32_t writePingExt(clusterMsg *hdr, int gossipcount) {
+ uint16_t extensions = 0;
+ uint32_t totlen = 0;
+ clusterMsgPingExt *cursor = NULL;
+ /* Set the initial extension position */
+ if (hdr != NULL) {
+ cursor = getInitialPingExt(hdr, gossipcount);
+ }
+
+ /* hostname is optional */
+ if (sdslen(myself->hostname) != 0) {
+ if (cursor != NULL) {
+ /* Populate hostname */
+ clusterMsgPingExtHostname *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, getHostnamePingExtSize());
+ memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname));
+
+ /* Move the write cursor */
+ cursor = nextPingExt(cursor);
+ }
+
+ totlen += getHostnamePingExtSize();
+ extensions++;
+ }
+
+ /* Gossip forgotten nodes */
+ if (dictSize(server.cluster->nodes_black_list) > 0) {
+ dictIterator *di = dictGetIterator(server.cluster->nodes_black_list);
+ dictEntry *de;
+ while ((de = dictNext(di)) != NULL) {
+ if (cursor != NULL) {
+ uint64_t expire = dictGetUnsignedIntegerVal(de);
+ if ((time_t)expire < server.unixtime) continue; /* already expired */
+ uint64_t ttl = expire - server.unixtime;
+ clusterMsgPingExtForgottenNode *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE, getForgottenNodeExtSize());
+ memcpy(ext->name, dictGetKey(de), CLUSTER_NAMELEN);
+ ext->ttl = htonu64(ttl);
+
+ /* Move the write cursor */
+ cursor = nextPingExt(cursor);
+ }
+ totlen += getForgottenNodeExtSize();
+ extensions++;
+ }
+ dictReleaseIterator(di);
+ }
+
+ /* Populate shard_id */
+ if (cursor != NULL) {
+ clusterMsgPingExtShardId *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_SHARDID, getShardIdPingExtSize());
+ memcpy(ext->shard_id, myself->shard_id, CLUSTER_NAMELEN);
+
+ /* Move the write cursor */
+ cursor = nextPingExt(cursor);
+ }
+ totlen += getShardIdPingExtSize();
+ extensions++;
+
+ if (hdr != NULL) {
+ if (extensions != 0) {
+ hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
+ }
+ hdr->extensions = htons(extensions);
+ }
+
+ return totlen;
}
/* We previously validated the extensions, so this function just needs to
@@ -2122,6 +2411,7 @@ int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
char *ext_hostname = NULL;
+ char *ext_shardid = NULL;
uint16_t extensions = ntohs(hdr->extensions);
/* Loop through all the extensions and process them */
clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count));
@@ -2143,6 +2433,9 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
}
+ } else if (type == CLUSTERMSG_EXT_TYPE_SHARDID) {
+ clusterMsgPingExtShardId *shardid_ext = (clusterMsgPingExtShardId *) &(ext->ext[0].shard_id);
+ ext_shardid = shardid_ext->shard_id;
} else {
/* Unknown type, we will ignore it but log what happened. */
serverLog(LL_WARNING, "Received unknown extension type %d", type);
@@ -2155,6 +2448,7 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
* they don't have an announced hostname. Otherwise, we'll
* set it now. */
updateAnnouncedHostname(sender, ext_hostname);
+ updateShardId(sender, ext_shardid);
}
static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
@@ -2232,7 +2526,7 @@ int clusterProcessPacket(clusterLink *link) {
while (extensions--) {
uint16_t extlen = getPingExtLength(ext);
if (extlen % 8 != 0) {
- serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
+ serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
clusterGetMessageTypeString(type), (int) extlen);
return 1;
}
@@ -2273,10 +2567,10 @@ int clusterProcessPacket(clusterLink *link) {
}
if (totlen != explen) {
- serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld",
+ serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld",
clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen);
return 1;
- }
+ }
sender = getNodeFromLinkAndMsg(link, hdr);
@@ -3065,9 +3359,7 @@ void clusterSendPing(clusterLink *link, int type) {
* to put inside the packet. */
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
- estlen += getHostnamePingExtSize();
- estlen += dictSize(server.cluster->nodes_black_list) *
- (sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode));
+ estlen += writePingExt(NULL, 0);
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
@@ -3134,39 +3426,13 @@ void clusterSendPing(clusterLink *link, int type) {
dictReleaseIterator(di);
}
-
- int totlen = 0;
- int extensions = 0;
- /* Set the initial extension position */
- clusterMsgPingExt *cursor = getInitialPingExt(hdr, gossipcount);
- /* Add in the extensions */
- if (sdslen(myself->hostname) != 0) {
- hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
- totlen += writeHostnamePingExt(&cursor);
- extensions++;
- }
-
- /* Gossip forgotten nodes */
- if (dictSize(server.cluster->nodes_black_list) > 0) {
- dictIterator *di = dictGetIterator(server.cluster->nodes_black_list);
- dictEntry *de;
- while ((de = dictNext(di)) != NULL) {
- sds name = dictGetKey(de);
- uint64_t expire = dictGetUnsignedIntegerVal(de);
- if ((time_t)expire < server.unixtime) continue; /* already expired */
- uint64_t ttl = expire - server.unixtime;
- hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
- totlen += writeForgottenNodePingExt(&cursor, name, ttl);
- extensions++;
- }
- dictReleaseIterator(di);
- }
-
/* Compute the actual total length and send! */
+ uint32_t totlen = 0;
+ totlen += writePingExt(hdr, gossipcount);
totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
+ serverAssert(gossipcount < USHRT_MAX);
hdr->count = htons(gossipcount);
- hdr->extensions = htons(extensions);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,msgblock);
@@ -3341,20 +3607,18 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
return;
}
+ listIter li;
+ listNode *ln;
+ list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself);
+ serverAssert(nodes_for_slot != NULL);
+ listRewind(nodes_for_slot, &li);
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
- list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
- if (listLength(nodes_for_slot) != 0) {
- listIter li;
- listNode *ln;
- listRewind(nodes_for_slot, &li);
- while((ln = listNext(&li))) {
- clusterNode *node = listNodeValue(ln);
- if (node != myself) {
- clusterSendMessage(node->link,msgblock);
- }
+ while((ln = listNext(&li))) {
+ clusterNode *node = listNodeValue(ln);
+ if (node != myself) {
+ clusterSendMessage(node->link,msgblock);
}
}
- listRelease(nodes_for_slot);
clusterMsgSendBlockDecrRefCount(msgblock);
}
@@ -4418,12 +4682,12 @@ int clusterDelSlot(int slot) {
if (!n) return C_ERR;
/* Cleanup the channels in master/replica as part of slot deletion. */
- list *nodes_for_slot = clusterGetNodesServingMySlots(n);
+ list *nodes_for_slot = clusterGetNodesInMyShard(n);
+ serverAssert(nodes_for_slot != NULL);
listNode *ln = listSearchKey(nodes_for_slot, myself);
if (ln != NULL) {
removeChannelsInSlot(slot);
}
- listRelease(nodes_for_slot);
serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
server.cluster->slots[slot] = NULL;
return C_OK;
@@ -4650,6 +4914,7 @@ void clusterSetMaster(clusterNode *n) {
clusterNodeRemoveSlave(myself->slaveof,myself);
}
myself->slaveof = n;
+ updateShardId(myself, n->shard_id);
clusterNodeAddSlave(n,myself);
replicationSetMaster(n->ip, n->port);
resetManualFailover();
@@ -4717,20 +4982,28 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
/* Node coordinates */
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
+ /* Node's ip/port and optional announced hostname */
if (sdslen(node->hostname) != 0) {
- ci = sdscatfmt(ci," %s:%i@%i,%s ",
+ ci = sdscatprintf(ci," %s:%i@%i,%s",
node->ip,
port,
node->cport,
node->hostname);
} else {
- ci = sdscatfmt(ci," %s:%i@%i ",
+ ci = sdscatprintf(ci," %s:%i@%i,",
node->ip,
port,
node->cport);
}
+ /* Node's aux fields */
+ for (int i = af_start; i < af_count; i++) {
+ ci = sdscatprintf(ci, ",%s=", auxFieldHandlers[i].field);
+ ci = auxFieldHandlers[i].getter(node, ci);
+ }
+
/* Flags */
+ ci = sdscatlen(ci," ",1);
ci = representClusterNodeFlags(ci, node->flags);
/* Slave of... or just "-" */
@@ -5163,33 +5436,33 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
}
/* Add the shard reply of a single shard based off the given primary node. */
-void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_info_pairs, int slot_pairs_count) {
+void addShardReplyForClusterShards(client *c, list *nodes) {
+ serverAssert(listLength(nodes) > 0);
+ clusterNode *n = listNodeValue(listFirst(nodes));
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");
- if (slot_info_pairs) {
- serverAssert((slot_pairs_count % 2) == 0);
- addReplyArrayLen(c, slot_pairs_count);
- for (int i = 0; i < slot_pairs_count; i++)
- addReplyLongLong(c, (unsigned long)slot_info_pairs[i]);
+
+ /* Use slot_info_pairs from the primary only */
+ while (n->slaveof != NULL) n = n->slaveof;
+
+ if (n->slot_info_pairs != NULL) {
+ serverAssert((n->slot_info_pairs_count % 2) == 0);
+ addReplyArrayLen(c, n->slot_info_pairs_count);
+ for (int i = 0; i < n->slot_info_pairs_count; i++)
+ addReplyBulkLongLong(c, (unsigned long)n->slot_info_pairs[i]);
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}
addReplyBulkCString(c, "nodes");
- list *nodes_for_slot = clusterGetNodesServingMySlots(node);
- /* At least the provided node should be serving its slots */
- serverAssert(nodes_for_slot);
- addReplyArrayLen(c, listLength(nodes_for_slot));
- if (listLength(nodes_for_slot) != 0) {
- listIter li;
- listNode *ln;
- listRewind(nodes_for_slot, &li);
- while ((ln = listNext(&li))) {
- clusterNode *node = listNodeValue(ln);
- addNodeDetailsToShardReply(c, node);
- }
- listRelease(nodes_for_slot);
+ addReplyArrayLen(c, listLength(nodes));
+ listIter li;
+ listRewind(nodes, &li);
+ for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
+ clusterNode *n = listNodeValue(ln);
+ addNodeDetailsToShardReply(c, n);
+ clusterFreeNodesSlotsInfo(n);
}
}
@@ -5197,31 +5470,14 @@ void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
void clusterReplyShards(client *c) {
- void *shard_replylen = addReplyDeferredLen(c);
- int shard_count = 0;
+ addReplyArrayLen(c, dictSize(server.cluster->shards));
/* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0);
- dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
- dictEntry *de;
- /* Iterate over all the available nodes in the cluster, for each primary
- * node return generate the cluster shards response. if the primary node
- * doesn't own any slot, cluster shard response contains the node related
- * information and an empty slots array. */
- while((de = dictNext(di)) != NULL) {
- clusterNode *n = dictGetVal(de);
- if (!nodeIsMaster(n)) {
- /* You can force a replica to own slots, even though it'll get reverted,
- * so freeing the slot pair here just in case. */
- clusterFreeNodesSlotsInfo(n);
- continue;
- }
- shard_count++;
- /* n->slot_info_pairs is set to NULL when the node owns no slots. */
- addShardReplyForClusterShards(c, n, n->slot_info_pairs, n->slot_info_pairs_count);
- clusterFreeNodesSlotsInfo(n);
+ dictIterator *di = dictGetSafeIterator(server.cluster->shards);
+ for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
+ addShardReplyForClusterShards(c, dictGetVal(de));
}
dictReleaseIterator(di);
- setDeferredArrayLen(c, shard_replylen, shard_count);
}
void clusterReplyMultiBulkSlots(client * c) {
@@ -5299,6 +5555,8 @@ void clusterCommand(client *c) {
" Connect nodes into a working cluster.",
"MYID",
" Return the node id.",
+"MYSHARDID",
+" Return the node's shard id.",
"NODES",
" Return cluster configuration seen by node. Output format:",
" <id> <ip:port@bus-port[,hostname]> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
@@ -5365,6 +5623,9 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
/* CLUSTER MYID */
addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
+ } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) {
+ /* CLUSTER MYSHARDID */
+ addReplyBulkCBuffer(c,myself->shard_id, CLUSTER_NAMELEN);
} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
/* CLUSTER SLOTS */
clusterReplyMultiBulkSlots(c);
diff --git a/src/cluster.h b/src/cluster.h
index f27072f20..332624ea5 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -117,6 +117,7 @@ typedef struct clusterNodeFailReport {
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
+ char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
@@ -175,6 +176,7 @@ typedef struct clusterState {
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
+ dict *shards; /* Hash table of shard_id -> list (of nodes) structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
@@ -256,6 +258,7 @@ typedef struct {
typedef enum {
CLUSTERMSG_EXT_TYPE_HOSTNAME,
CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE,
+ CLUSTERMSG_EXT_TYPE_SHARDID,
} clusterMsgPingtypes;
/* Helper function for making sure extensions are eight byte aligned. */
@@ -273,12 +276,17 @@ typedef struct {
static_assert(sizeof(clusterMsgPingExtForgottenNode) % 8 == 0, "");
typedef struct {
+ char shard_id[CLUSTER_NAMELEN]; /* The shard_id, 40 bytes fixed. */
+} clusterMsgPingExtShardId;
+
+typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union {
clusterMsgPingExtHostname hostname;
clusterMsgPingExtForgottenNode forgotten_node;
+ clusterMsgPingExtShardId shard_id;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
diff --git a/src/commands.c b/src/commands.c
index caf7bda89..7e43b1e6d 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -476,6 +476,17 @@ struct redisCommandArg CLUSTER_MEET_Args[] = {
/* CLUSTER MYID tips */
#define CLUSTER_MYID_tips NULL
+/********** CLUSTER MYSHARDID ********************/
+
+/* CLUSTER MYSHARDID history */
+#define CLUSTER_MYSHARDID_History NULL
+
+/* CLUSTER MYSHARDID tips */
+const char *CLUSTER_MYSHARDID_tips[] = {
+"nondeterministic_output",
+NULL
+};
+
/********** CLUSTER NODES ********************/
/* CLUSTER NODES history */
@@ -647,6 +658,7 @@ struct redisCommand CLUSTER_Subcommands[] = {
{"links","Returns a list of all TCP links to and from peer nodes in cluster","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,CLUSTER_LINKS_tips,clusterCommand,2,CMD_STALE,0},
{"meet","Force a node cluster to handshake with another node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MEET_History,CLUSTER_MEET_tips,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,.args=CLUSTER_MEET_Args},
{"myid","Return the node id","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,CLUSTER_MYID_tips,clusterCommand,2,CMD_STALE,0},
+{"myshardid","Return the node shard id","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYSHARDID_History,CLUSTER_MYSHARDID_tips,clusterCommand,2,CMD_STALE,0},
{"nodes","Get Cluster config for the node","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,CLUSTER_NODES_tips,clusterCommand,2,CMD_STALE,0},
{"replicas","List replica nodes of the specified master node","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,CLUSTER_REPLICAS_tips,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,.args=CLUSTER_REPLICAS_Args},
{"replicate","Reconfigure a node as a replica of the specified master node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,CLUSTER_REPLICATE_tips,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,.args=CLUSTER_REPLICATE_Args},
diff --git a/src/commands/cluster-myshardid.json b/src/commands/cluster-myshardid.json
new file mode 100644
index 000000000..ffd26eec6
--- /dev/null
+++ b/src/commands/cluster-myshardid.json
@@ -0,0 +1,18 @@
+{
+ "MYSHARDID": {
+ "summary": "Return the node shard id",
+ "complexity": "O(1)",
+ "group": "cluster",
+ "since": "7.2.0",
+ "arity": 2,
+ "container": "CLUSTER",
+ "function": "clusterCommand",
+ "history": [],
+ "command_flags": [
+ "STALE"
+ ],
+ "command_tips": [
+ "NONDETERMINISTIC_OUTPUT"
+ ]
+ }
+}
diff --git a/src/server.h b/src/server.h
index 3af135114..23e9d8e8e 100644
--- a/src/server.h
+++ b/src/server.h
@@ -91,6 +91,9 @@ typedef struct redisObject robj;
#include "endianconv.h"
#include "crc64.h"
+/* helpers */
+#define numElements(x) (sizeof(x)/sizeof((x)[0]))
+
/* min/max */
#undef min
#undef max
@@ -3301,6 +3304,7 @@ uint64_t dictSdsCaseHash(const void *key);
int dictSdsKeyCompare(dict *d, const void *key1, const void *key2);
int dictSdsKeyCaseCompare(dict *d, const void *key1, const void *key2);
void dictSdsDestructor(dict *d, void *val);
+void dictListDestructor(dict *d, void *val);
void *dictSdsDup(dict *d, const void *key);
/* Git SHA1 */