diff options
author | Ping Xie <pingxie@google.com> | 2022-11-16 19:24:18 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-16 19:24:18 -0800 |
commit | 203b12e41ff7981f0fae5b23819f072d61594813 (patch) | |
tree | ff5f2f829bbfcc6928190a01d401ee4b1ebbaf9e /src | |
parent | 2168ccc661791ced6271c5e4ab0f5eb60b1559e2 (diff) | |
download | redis-203b12e41ff7981f0fae5b23819f072d61594813.tar.gz |
Introduce Shard IDs to logically group nodes in cluster mode (#10536)
Introduce Shard IDs to logically group nodes in cluster mode.
1. Added a new "shard_id" field to "cluster nodes" output and nodes.conf after "hostname"
2. Added a new PING extension to propagate "shard_id"
3. Handled upgrade from pre-7.2 releases automatically
4. Refactored PING extension assembling/parsing logic
Behavior of Shard IDs:
Replicas will always follow the shards of their reported primaries. If a primary updates its shard ID, the replica will follow. (This need not follow for cluster v2) This is not an expected use case.
Diffstat (limited to 'src')
-rw-r--r-- | src/cluster.c | 579 | ||||
-rw-r--r-- | src/cluster.h | 8 | ||||
-rw-r--r-- | src/commands.c | 12 | ||||
-rw-r--r-- | src/commands/cluster-myshardid.json | 18 | ||||
-rw-r--r-- | src/server.h | 4 |
5 files changed, 462 insertions, 159 deletions
diff --git a/src/cluster.c b/src/cluster.c index d3b800198..436ed014c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -40,6 +40,7 @@ #include <sys/stat.h> #include <sys/file.h> #include <math.h> +#include <ctype.h> /* A global reference to myself is handy to make code more clear. * Myself always points to server.cluster->myself, that is, the clusterNode @@ -56,7 +57,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request); void clusterUpdateState(void); int clusterNodeGetSlotBit(clusterNode *n, int slot); sds clusterGenNodesDescription(int filter, int use_pport); -list *clusterGetNodesServingMySlots(clusterNode *node); +list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); @@ -83,6 +84,11 @@ void removeChannelsInSlot(unsigned int slot); unsigned int countKeysInSlot(unsigned int hashslot); unsigned int countChannelsInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot); +void clusterAddNodeToShard(const char *shard_id, clusterNode *node); +list *clusterLookupNodeListByShardId(const char *shard_id); +void clusterRemoveNodeFromShard(clusterNode *node); +int auxShardIdSetter(clusterNode *n, void *value, int length); +sds auxShardIdGetter(clusterNode *n, sds s); static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); /* Links to the next and previous entries for keys in the same slot are stored @@ -127,6 +133,85 @@ static ConnectionType *connTypeOfCluster() { return connectionTypeTcp(); } +/* Cluster shards hash table, mapping shard id to list of nodes */ +dictType clusterSdsToListType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictListDestructor, /* val destructor */ + NULL /* allow to expand */ +}; + +/* Aux fields are introduced in Redis 7.2 to support the persistence + * of various important node properties, such as shard id, in nodes.conf. + * Aux fields take an explicit format of name=value pairs and have no + * intrinsic order among them. Aux fields are always grouped together + * at the end of the second column of each row after the node's IP + * address/port/cluster_port and the optional hostname. Aux fields + * are separated by ','. */ + +/* Aux field setter function prototype + * return C_OK when the update is successful; C_ERR otherwise */ +typedef int (aux_value_setter) (clusterNode* n, void *value, int length); +/* Aux field getter function prototype + * return an sds that is a concatenation of the input sds string and + * the aux value */ +typedef sds (aux_value_getter) (clusterNode* n, sds s); + +typedef struct { + char *field; + aux_value_setter *setter; + aux_value_getter *getter; + int present; +} auxFieldHandler; + +/* Assign index to each aux field */ +typedef enum { + af_start, + af_shard_id = af_start, + af_count, +} auxFieldIndex; + +/* Note that + * 1. the order of the elements below must match that of their + * indices as defined in auxFieldIndex + * 2. aux name can contain characters that pass the isValidAuxChar check only */ +auxFieldHandler auxFieldHandlers[] = { + {"shard-id", auxShardIdSetter, auxShardIdGetter, 0}, +}; + +int isValidAuxChar(int c) { + return isalnum(c) || (strchr("!#$%&()*+-.:;<>?@[]^_{|}~", c) != NULL); +} + +int isValidAuxString(sds s) { + for (unsigned i = 0; i < sdslen(s); i++) { + if (!isValidAuxChar(s[i])) return 0; + } + return 1; +} + +int auxShardIdSetter(clusterNode *n, void *value, int length) { + if (verifyClusterNodeId(value, length) == C_ERR) { + return C_ERR; + } + memcpy(n->shard_id, value, CLUSTER_NAMELEN); + /* if n already has replicas, make sure they all agree + * on the shard id */ + for (int i = 0; i < n->numslaves; i++) { + if (memcmp(n->slaves[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { + return C_ERR; + } + } + clusterAddNodeToShard(value, n); + return C_OK; +} + +sds auxShardIdGetter(clusterNode *n, sds s) { + return sdscatprintf(s, "%.40s", n->shard_id); +} /* clusterLink send queue blocks */ typedef struct { @@ -185,8 +270,8 @@ int clusterLoadConfig(char *filename) { maxline = 1024+CLUSTER_SLOTS*128; line = zmalloc(maxline); while(fgets(line,maxline,fp) != NULL) { - int argc; - sds *argv; + int argc, aux_argc; + sds *argv, *aux_argv; clusterNode *n, *master; char *p, *s; @@ -236,27 +321,85 @@ int clusterLoadConfig(char *filename) { n = createClusterNode(argv[0],0); clusterAddNode(n); } - /* Format for the node address information: - * ip:port[@cport][,hostname] */ + /* Format for the node address and auxiliary argument information: + * ip:port[@cport][,hostname[,aux=val]*] */ + + aux_argv = sdssplitlen(argv[1], sdslen(argv[1]), ",", 1, &aux_argc); + if (aux_argv == NULL) { + sdsfreesplitres(argv,argc); + goto fmterr; + } /* Hostname is an optional argument that defines the endpoint * that can be reported to clients instead of IP. */ - char *hostname = strchr(argv[1], ','); - if (hostname) { - *hostname = '\0'; - hostname++; - n->hostname = sdscpy(n->hostname, hostname); + if (aux_argc > 1 && sdslen(aux_argv[1]) > 0) { + n->hostname = sdscpy(n->hostname, aux_argv[1]); } else if (sdslen(n->hostname) != 0) { sdsclear(n->hostname); } + /* All fields after hostname are auxiliary and they take on + * the format of "aux=val" where both aux and val can contain + * characters that pass the isValidAuxChar check only. The order + * of the aux fields is insignificant. */ + + for (int i = 2; i < aux_argc; i++) { + int field_argc; + sds *field_argv; + field_argv = sdssplitlen(aux_argv[i], sdslen(aux_argv[i]), "=", 1, &field_argc); + if (field_argv == NULL || field_argc != 2) { + /* Invalid aux field format */ + if (field_argv != NULL) sdsfreesplitres(field_argv, field_argc); + sdsfreesplitres(argv,argc); + goto fmterr; + } + + /* Validate that both aux and value contain valid characters only */ + for (unsigned j = 0; j < 2; j++) { + if (!isValidAuxString(field_argv[j])) { + /* Invalid aux field format */ + sdsfreesplitres(field_argv, field_argc); + sdsfreesplitres(argv,argc); + goto fmterr; + } + } + + /* Note that we don't expect lots of aux fields in the foreseeable + * future so a linear search is completely fine. */ + int field_found = 0; + for (unsigned j = 0; j < numElements(auxFieldHandlers); j++) { + if (sdslen(field_argv[0]) != strlen(auxFieldHandlers[j].field) || + memcmp(field_argv[0], auxFieldHandlers[j].field, sdslen(field_argv[0])) != 0) { + continue; + } + field_found = 1; + if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) { + /* Invalid aux field format */ + sdsfreesplitres(field_argv, field_argc); + sdsfreesplitres(argv,argc); + goto fmterr; + } + auxFieldHandlers[j].present = 1; + } + + if (field_found == 0) { + /* Invalid aux field format */ + sdsfreesplitres(field_argv, field_argc); + sdsfreesplitres(argv,argc); + goto fmterr; + } + + sdsfreesplitres(field_argv, field_argc); + } + /* Address and port */ - if ((p = strrchr(argv[1],':')) == NULL) { + if ((p = strrchr(aux_argv[0],':')) == NULL) { + sdsfreesplitres(aux_argv, aux_argc); sdsfreesplitres(argv,argc); goto fmterr; } *p = '\0'; - memcpy(n->ip,argv[1],strlen(argv[1])+1); + memcpy(n->ip,aux_argv[0],strlen(aux_argv[0])+1); char *port = p+1; char *busp = strchr(port,'@'); if (busp) { @@ -272,6 +415,8 @@ int clusterLoadConfig(char *filename) { /* The plaintext port for client in a TLS cluster (n->pport) is not * stored in nodes.conf. It is received later over the bus protocol. */ + sdsfreesplitres(aux_argv, aux_argc); + /* Parse flags */ p = s = argv[2]; while(p) { @@ -316,8 +461,27 @@ int clusterLoadConfig(char *filename) { master = createClusterNode(argv[3],0); clusterAddNode(master); } + /* shard_id can be absent if we are loading a nodes.conf generated + * by an older version of Redis; we should follow the primary's + * shard_id in this case */ + if (auxFieldHandlers[af_shard_id].present == 0) { + memcpy(n->shard_id, master->shard_id, CLUSTER_NAMELEN); + clusterAddNodeToShard(master->shard_id, n); + } else if (clusterGetNodesInMyShard(master) != NULL && + memcmp(master->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) + { + /* If the primary has been added to a shard, make sure this + * node has the same persisted shard id as the primary. */ + goto fmterr; + } n->slaveof = master; clusterNodeAddSlave(master,n); + } else if (auxFieldHandlers[af_shard_id].present == 0) { + /* n is a primary but it does not have a persisted shard_id. + * This happens if we are loading a nodes.conf generated by + * an older version of Redis. We should manually update the + * shard membership in this case */ + clusterAddNodeToShard(n->shard_id, n); } /* Set ping sent / pong received timestamps */ @@ -403,7 +567,7 @@ int clusterLoadConfig(char *filename) { fmterr: serverLog(LL_WARNING, - "Unrecoverable error: corrupted cluster config file."); + "Unrecoverable error: corrupted cluster config file \"%s\".", line); zfree(line); if (fp) fclose(fp); exit(1); @@ -622,6 +786,8 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) { /* Previous and new hostname are the same, no need to update. */ if (new && !strcmp(new, node->hostname)) { return; + } else if (!new && (sdslen(node->hostname) == 0)) { + return; } if (new) { @@ -629,6 +795,26 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) { } else if (sdslen(node->hostname) != 0) { sdsclear(node->hostname); } + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); +} + +static void updateShardId(clusterNode *node, const char *shard_id) { + if (memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) { + clusterRemoveNodeFromShard(node); + memcpy(node->shard_id, shard_id, CLUSTER_NAMELEN); + clusterAddNodeToShard(shard_id, node); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + } + if (myself != node && myself->slaveof == node) { + if (memcmp(myself->shard_id, shard_id, CLUSTER_NAMELEN) != 0) { + /* shard-id can diverge right after a rolling upgrade + * from pre-7.2 releases */ + clusterRemoveNodeFromShard(myself); + memcpy(myself->shard_id, shard_id, CLUSTER_NAMELEN); + clusterAddNodeToShard(shard_id, myself); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG); + } + } } /* Update my hostname based on server configuration values */ @@ -647,6 +833,7 @@ void clusterInit(void) { server.cluster->size = 1; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType); + server.cluster->shards = dictCreate(&clusterSdsToListType); server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType); server.cluster->failover_auth_time = 0; @@ -682,6 +869,7 @@ void clusterInit(void) { serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s", myself->name); clusterAddNode(myself); + clusterAddNodeToShard(myself->shard_id, myself); saveconf = 1; } if (saveconf) clusterSaveConfigOrDie(1); @@ -772,6 +960,9 @@ void clusterReset(int hard) { /* Unassign all the slots. */ for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j); + /* Recreate shards dict */ + dictEmpty(server.cluster->shards, NULL); + /* Forget all the nodes, but myself. */ di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { @@ -797,10 +988,14 @@ void clusterReset(int hard) { dictDelete(server.cluster->nodes,oldname); sdsfree(oldname); getRandomHexChars(myself->name, CLUSTER_NAMELEN); + getRandomHexChars(myself->shard_id, CLUSTER_NAMELEN); clusterAddNode(myself); serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name); } + /* Re-populate shards */ + clusterAddNodeToShard(myself->shard_id, myself); + /* Make sure to persist the new config and update the state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| @@ -1029,6 +1224,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { memcpy(node->name, nodename, CLUSTER_NAMELEN); else getRandomHexChars(node->name, CLUSTER_NAMELEN); + getRandomHexChars(node->shard_id, CLUSTER_NAMELEN); node->ctime = mstime(); node->configEpoch = 0; node->flags = flags; @@ -1238,7 +1434,8 @@ void clusterAddNode(clusterNode *node) { * 1) Mark all the slots handled by it as unassigned. * 2) Remove all the failure reports sent by this node and referenced by * other nodes. - * 3) Free the node with freeClusterNode() that will in turn remove it + * 3) Remove the node from the owning shard + * 4) Free the node with freeClusterNode() that will in turn remove it * from the hash table and from the list of slaves of its master, if * it is a slave node. */ @@ -1267,7 +1464,10 @@ void clusterDelNode(clusterNode *delnode) { } dictReleaseIterator(di); - /* 3) Free the node, unlinking it from the cluster. */ + /* 3) Remove the node from the owning shard */ + clusterRemoveNodeFromShard(delnode); + + /* 4) Free the node, unlinking it from the cluster. */ freeClusterNode(delnode); } @@ -1293,19 +1493,17 @@ clusterNode *clusterLookupNode(const char *name, int length) { return dictGetVal(de); } -/* Get all the nodes serving the same slots as the given node. */ -list *clusterGetNodesServingMySlots(clusterNode *node) { - list *nodes_for_slot = listCreate(); - clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof; - - /* This function is only valid for fully connected nodes, so - * they should have a known primary. */ - serverAssert(my_primary); - listAddNodeTail(nodes_for_slot, my_primary); - for (int i=0; i < my_primary->numslaves; i++) { - listAddNodeTail(nodes_for_slot, my_primary->slaves[i]); - } - return nodes_for_slot; +/* Get all the nodes in my shard. + * Note that the list returned is not computed on the fly + * via slaveof; rather, it is maintained permanently to + * track the shard membership and its life cycle is tied + * to this Redis process. Therefore, the caller must not + * release the list. */ +list *clusterGetNodesInMyShard(clusterNode *node) { + sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN); + dictEntry *de = dictFind(server.cluster->shards,s); + sdsfree(s); + return (de != NULL) ? dictGetVal(de) : NULL; } /* This is only used after the handshake. When we connect a given IP/PORT @@ -1325,6 +1523,38 @@ void clusterRenameNode(clusterNode *node, char *newname) { clusterAddNode(node); } +void clusterAddNodeToShard(const char *shard_id, clusterNode *node) { + sds s = sdsnewlen(shard_id, CLUSTER_NAMELEN); + dictEntry *de = dictFind(server.cluster->shards,s); + if (de == NULL) { + list *l = listCreate(); + listAddNodeTail(l, node); + serverAssert(dictAdd(server.cluster->shards, s, l) == DICT_OK); + } else { + list *l = dictGetVal(de); + if (listSearchKey(l, node) == NULL) { + listAddNodeTail(l, node); + } + sdsfree(s); + } +} + +void clusterRemoveNodeFromShard(clusterNode *node) { + sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN); + dictEntry *de = dictFind(server.cluster->shards, s); + if (de != NULL) { + list *l = dictGetVal(de); + listNode *ln = listSearchKey(l, node); + if (ln != NULL) { + listDelNode(l, ln); + } + if (listLength(l) == 0) { + dictDelete(server.cluster->shards, s); + } + } + sdsfree(s); +} + /* ----------------------------------------------------------------------------- * CLUSTER config epoch handling * -------------------------------------------------------------------------- */ @@ -2060,7 +2290,7 @@ static uint32_t getPingExtLength(clusterMsgPingExt *ext) { /* Returns the initial position of ping extensions. May return an invalid * address if there are no ping extensions. */ -static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, uint16_t count) { +static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, int count) { clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]); return initial; } @@ -2072,49 +2302,108 @@ static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) { return next; } -/* Returns the exact size needed to store the hostname. The returned value - * will be 8 byte padded. */ -int getHostnamePingExtSize() { - /* If hostname is not set, we don't send this extension */ - if (sdslen(myself->hostname) == 0) return 0; +/* All PING extensions must be 8-byte aligned */ +uint32_t getAlignedPingExtSize(uint32_t dataSize) { - int totlen = sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(sdslen(myself->hostname) + 1); - return totlen; + return sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(dataSize); +} + +uint32_t getHostnamePingExtSize() { + if (sdslen(myself->hostname) == 0) { + return 0; + } + return getAlignedPingExtSize(sdslen(myself->hostname) + 1); +} + +uint32_t getShardIdPingExtSize() { + return getAlignedPingExtSize(sizeof(clusterMsgPingExtShardId)); +} + +uint32_t getForgottenNodeExtSize() { + return getAlignedPingExtSize(sizeof(clusterMsgPingExtForgottenNode)); } -/* Write the hostname ping extension at the start of the cursor. This function - * will update the cursor to point to the end of the written extension and - * will return the amount of bytes written. */ -int writeHostnamePingExt(clusterMsgPingExt **cursor) { - /* If hostname is not set, we don't send this extension */ - if (sdslen(myself->hostname) == 0) return 0; - - /* Add the hostname information at the extension cursor */ - clusterMsgPingExtHostname *ext = &(*cursor)->ext[0].hostname; - memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname)); - uint32_t extension_size = getHostnamePingExtSize(); - - /* Move the write cursor */ - (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_HOSTNAME); - (*cursor)->length = htonl(extension_size); - /* Make sure the string is NULL terminated by adding 1 */ - *cursor = (clusterMsgPingExt *) ((intptr_t)ext + EIGHT_BYTE_ALIGN(sdslen(myself->hostname) + 1)); - return extension_size; -} - -/* Write the forgotten node ping extension at the start of the cursor, update - * the cursor to point to the end of the written extension and return the number - * of bytes written. */ -int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) { - serverAssert(sdslen(name) == CLUSTER_NAMELEN); - clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node; - memcpy(ext->name, name, CLUSTER_NAMELEN); - ext->ttl = htonu64(ttl); - uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode); - (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE); - (*cursor)->length = htonl(extension_size); - *cursor = (clusterMsgPingExt *) (ext + 1); - return extension_size; +void *preparePingExt(clusterMsgPingExt *ext, uint16_t type, uint32_t length) { + ext->type = htons(type); + ext->length = htonl(length); + return &ext->ext[0]; +} + +clusterMsgPingExt *nextPingExt(clusterMsgPingExt *ext) { + return (clusterMsgPingExt *)((char*)ext + ntohl(ext->length)); +} + +/* 1. If a NULL hdr is provided, compute the extension size; + * 2. If a non-NULL hdr is provided, write the hostname ping + * extension at the start of the cursor. This function + * will update the cursor to point to the end of the + * written extension and will return the amount of bytes + * written. */ +uint32_t writePingExt(clusterMsg *hdr, int gossipcount) { + uint16_t extensions = 0; + uint32_t totlen = 0; + clusterMsgPingExt *cursor = NULL; + /* Set the initial extension position */ + if (hdr != NULL) { + cursor = getInitialPingExt(hdr, gossipcount); + } + + /* hostname is optional */ + if (sdslen(myself->hostname) != 0) { + if (cursor != NULL) { + /* Populate hostname */ + clusterMsgPingExtHostname *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, getHostnamePingExtSize()); + memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname)); + + /* Move the write cursor */ + cursor = nextPingExt(cursor); + } + + totlen += getHostnamePingExtSize(); + extensions++; + } + + /* Gossip forgotten nodes */ + if (dictSize(server.cluster->nodes_black_list) > 0) { + dictIterator *di = dictGetIterator(server.cluster->nodes_black_list); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + if (cursor != NULL) { + uint64_t expire = dictGetUnsignedIntegerVal(de); + if ((time_t)expire < server.unixtime) continue; /* already expired */ + uint64_t ttl = expire - server.unixtime; + clusterMsgPingExtForgottenNode *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE, getForgottenNodeExtSize()); + memcpy(ext->name, dictGetKey(de), CLUSTER_NAMELEN); + ext->ttl = htonu64(ttl); + + /* Move the write cursor */ + cursor = nextPingExt(cursor); + } + totlen += getForgottenNodeExtSize(); + extensions++; + } + dictReleaseIterator(di); + } + + /* Populate shard_id */ + if (cursor != NULL) { + clusterMsgPingExtShardId *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_SHARDID, getShardIdPingExtSize()); + memcpy(ext->shard_id, myself->shard_id, CLUSTER_NAMELEN); + + /* Move the write cursor */ + cursor = nextPingExt(cursor); + } + totlen += getShardIdPingExtSize(); + extensions++; + + if (hdr != NULL) { + if (extensions != 0) { + hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA; + } + hdr->extensions = htons(extensions); + } + + return totlen; } /* We previously validated the extensions, so this function just needs to @@ -2122,6 +2411,7 @@ int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN); char *ext_hostname = NULL; + char *ext_shardid = NULL; uint16_t extensions = ntohs(hdr->extensions); /* Loop through all the extensions and process them */ clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count)); @@ -2143,6 +2433,9 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_SAVE_CONFIG); } + } else if (type == CLUSTERMSG_EXT_TYPE_SHARDID) { + clusterMsgPingExtShardId *shardid_ext = (clusterMsgPingExtShardId *) &(ext->ext[0].shard_id); + ext_shardid = shardid_ext->shard_id; } else { /* Unknown type, we will ignore it but log what happened. */ serverLog(LL_WARNING, "Received unknown extension type %d", type); @@ -2155,6 +2448,7 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { * they don't have an announced hostname. Otherwise, we'll * set it now. */ updateAnnouncedHostname(sender, ext_hostname); + updateShardId(sender, ext_shardid); } static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { @@ -2232,7 +2526,7 @@ int clusterProcessPacket(clusterLink *link) { while (extensions--) { uint16_t extlen = getPingExtLength(ext); if (extlen % 8 != 0) { - serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)", + serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)", clusterGetMessageTypeString(type), (int) extlen); return 1; } @@ -2273,10 +2567,10 @@ int clusterProcessPacket(clusterLink *link) { } if (totlen != explen) { - serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld", + serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld", clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen); return 1; - } + } sender = getNodeFromLinkAndMsg(link, hdr); @@ -3065,9 +3359,7 @@ void clusterSendPing(clusterLink *link, int type) { * to put inside the packet. */ estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData); estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted)); - estlen += getHostnamePingExtSize(); - estlen += dictSize(server.cluster->nodes_black_list) * - (sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode)); + estlen += writePingExt(NULL, 0); /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ @@ -3134,39 +3426,13 @@ void clusterSendPing(clusterLink *link, int type) { dictReleaseIterator(di); } - - int totlen = 0; - int extensions = 0; - /* Set the initial extension position */ - clusterMsgPingExt *cursor = getInitialPingExt(hdr, gossipcount); - /* Add in the extensions */ - if (sdslen(myself->hostname) != 0) { - hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA; - totlen += writeHostnamePingExt(&cursor); - extensions++; - } - - /* Gossip forgotten nodes */ - if (dictSize(server.cluster->nodes_black_list) > 0) { - dictIterator *di = dictGetIterator(server.cluster->nodes_black_list); - dictEntry *de; - while ((de = dictNext(di)) != NULL) { - sds name = dictGetKey(de); - uint64_t expire = dictGetUnsignedIntegerVal(de); - if ((time_t)expire < server.unixtime) continue; /* already expired */ - uint64_t ttl = expire - server.unixtime; - hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA; - totlen += writeForgottenNodePingExt(&cursor, name, ttl); - extensions++; - } - dictReleaseIterator(di); - } - /* Compute the actual total length and send! */ + uint32_t totlen = 0; + totlen += writePingExt(hdr, gossipcount); totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); + serverAssert(gossipcount < USHRT_MAX); hdr->count = htons(gossipcount); - hdr->extensions = htons(extensions); hdr->totlen = htonl(totlen); clusterSendMessage(link,msgblock); @@ -3341,20 +3607,18 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { return; } + listIter li; + listNode *ln; + list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself); + serverAssert(nodes_for_slot != NULL); + listRewind(nodes_for_slot, &li); msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); - list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself); - if (listLength(nodes_for_slot) != 0) { - listIter li; - listNode *ln; - listRewind(nodes_for_slot, &li); - while((ln = listNext(&li))) { - clusterNode *node = listNodeValue(ln); - if (node != myself) { - clusterSendMessage(node->link,msgblock); - } + while((ln = listNext(&li))) { + clusterNode *node = listNodeValue(ln); + if (node != myself) { + clusterSendMessage(node->link,msgblock); } } - listRelease(nodes_for_slot); clusterMsgSendBlockDecrRefCount(msgblock); } @@ -4418,12 +4682,12 @@ int clusterDelSlot(int slot) { if (!n) return C_ERR; /* Cleanup the channels in master/replica as part of slot deletion. */ - list *nodes_for_slot = clusterGetNodesServingMySlots(n); + list *nodes_for_slot = clusterGetNodesInMyShard(n); + serverAssert(nodes_for_slot != NULL); listNode *ln = listSearchKey(nodes_for_slot, myself); if (ln != NULL) { removeChannelsInSlot(slot); } - listRelease(nodes_for_slot); serverAssert(clusterNodeClearSlotBit(n,slot) == 1); server.cluster->slots[slot] = NULL; return C_OK; @@ -4650,6 +4914,7 @@ void clusterSetMaster(clusterNode *n) { clusterNodeRemoveSlave(myself->slaveof,myself); } myself->slaveof = n; + updateShardId(myself, n->shard_id); clusterNodeAddSlave(n,myself); replicationSetMaster(n->ip, n->port); resetManualFailover(); @@ -4717,20 +4982,28 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) { /* Node coordinates */ ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN); + /* Node's ip/port and optional announced hostname */ if (sdslen(node->hostname) != 0) { - ci = sdscatfmt(ci," %s:%i@%i,%s ", + ci = sdscatprintf(ci," %s:%i@%i,%s", node->ip, port, node->cport, node->hostname); } else { - ci = sdscatfmt(ci," %s:%i@%i ", + ci = sdscatprintf(ci," %s:%i@%i,", node->ip, port, node->cport); } + /* Node's aux fields */ + for (int i = af_start; i < af_count; i++) { + ci = sdscatprintf(ci, ",%s=", auxFieldHandlers[i].field); + ci = auxFieldHandlers[i].getter(node, ci); + } + /* Flags */ + ci = sdscatlen(ci," ",1); ci = representClusterNodeFlags(ci, node->flags); /* Slave of... or just "-" */ @@ -5163,33 +5436,33 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) { } /* Add the shard reply of a single shard based off the given primary node. */ -void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_info_pairs, int slot_pairs_count) { +void addShardReplyForClusterShards(client *c, list *nodes) { + serverAssert(listLength(nodes) > 0); + clusterNode *n = listNodeValue(listFirst(nodes)); addReplyMapLen(c, 2); addReplyBulkCString(c, "slots"); - if (slot_info_pairs) { - serverAssert((slot_pairs_count % 2) == 0); - addReplyArrayLen(c, slot_pairs_count); - for (int i = 0; i < slot_pairs_count; i++) - addReplyLongLong(c, (unsigned long)slot_info_pairs[i]); + + /* Use slot_info_pairs from the primary only */ + while (n->slaveof != NULL) n = n->slaveof; + + if (n->slot_info_pairs != NULL) { + serverAssert((n->slot_info_pairs_count % 2) == 0); + addReplyArrayLen(c, n->slot_info_pairs_count); + for (int i = 0; i < n->slot_info_pairs_count; i++) + addReplyBulkLongLong(c, (unsigned long)n->slot_info_pairs[i]); } else { /* If no slot info pair is provided, the node owns no slots */ addReplyArrayLen(c, 0); } addReplyBulkCString(c, "nodes"); - list *nodes_for_slot = clusterGetNodesServingMySlots(node); - /* At least the provided node should be serving its slots */ - serverAssert(nodes_for_slot); - addReplyArrayLen(c, listLength(nodes_for_slot)); - if (listLength(nodes_for_slot) != 0) { - listIter li; - listNode *ln; - listRewind(nodes_for_slot, &li); - while ((ln = listNext(&li))) { - clusterNode *node = listNodeValue(ln); - addNodeDetailsToShardReply(c, node); - } - listRelease(nodes_for_slot); + addReplyArrayLen(c, listLength(nodes)); + listIter li; + listRewind(nodes, &li); + for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { + clusterNode *n = listNodeValue(ln); + addNodeDetailsToShardReply(c, n); + clusterFreeNodesSlotsInfo(n); } } @@ -5197,31 +5470,14 @@ void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_ * pair owned by the shard, also the primary and set of replica(s) along with * information about each node. */ void clusterReplyShards(client *c) { - void *shard_replylen = addReplyDeferredLen(c); - int shard_count = 0; + addReplyArrayLen(c, dictSize(server.cluster->shards)); /* This call will add slot_info_pairs to all nodes */ clusterGenNodesSlotsInfo(0); - dictIterator *di = dictGetSafeIterator(server.cluster->nodes); - dictEntry *de; - /* Iterate over all the available nodes in the cluster, for each primary - * node return generate the cluster shards response. if the primary node - * doesn't own any slot, cluster shard response contains the node related - * information and an empty slots array. */ - while((de = dictNext(di)) != NULL) { - clusterNode *n = dictGetVal(de); - if (!nodeIsMaster(n)) { - /* You can force a replica to own slots, even though it'll get reverted, - * so freeing the slot pair here just in case. */ - clusterFreeNodesSlotsInfo(n); - continue; - } - shard_count++; - /* n->slot_info_pairs is set to NULL when the node owns no slots. */ - addShardReplyForClusterShards(c, n, n->slot_info_pairs, n->slot_info_pairs_count); - clusterFreeNodesSlotsInfo(n); + dictIterator *di = dictGetSafeIterator(server.cluster->shards); + for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) { + addShardReplyForClusterShards(c, dictGetVal(de)); } dictReleaseIterator(di); - setDeferredArrayLen(c, shard_replylen, shard_count); } void clusterReplyMultiBulkSlots(client * c) { @@ -5299,6 +5555,8 @@ void clusterCommand(client *c) { " Connect nodes into a working cluster.", "MYID", " Return the node id.", +"MYSHARDID", +" Return the node's shard id.", "NODES", " Return cluster configuration seen by node. Output format:", " <id> <ip:port@bus-port[,hostname]> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...", @@ -5365,6 +5623,9 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { /* CLUSTER MYID */ addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN); + } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) { + /* CLUSTER MYSHARDID */ + addReplyBulkCBuffer(c,myself->shard_id, CLUSTER_NAMELEN); } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) { /* CLUSTER SLOTS */ clusterReplyMultiBulkSlots(c); diff --git a/src/cluster.h b/src/cluster.h index f27072f20..332624ea5 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -117,6 +117,7 @@ typedef struct clusterNodeFailReport { typedef struct clusterNode { mstime_t ctime; /* Node object creation time. */ char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ + char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */ int flags; /* CLUSTER_NODE_... */ uint64_t configEpoch; /* Last configEpoch observed for this node */ unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ @@ -175,6 +176,7 @@ typedef struct clusterState { int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ int size; /* Num of master nodes with at least one slot */ dict *nodes; /* Hash table of name -> clusterNode structures */ + dict *shards; /* Hash table of shard_id -> list (of nodes) structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; @@ -256,6 +258,7 @@ typedef struct { typedef enum { CLUSTERMSG_EXT_TYPE_HOSTNAME, CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE, + CLUSTERMSG_EXT_TYPE_SHARDID, } clusterMsgPingtypes; /* Helper function for making sure extensions are eight byte aligned. */ @@ -273,12 +276,17 @@ typedef struct { static_assert(sizeof(clusterMsgPingExtForgottenNode) % 8 == 0, ""); typedef struct { + char shard_id[CLUSTER_NAMELEN]; /* The shard_id, 40 bytes fixed. */ +} clusterMsgPingExtShardId; + +typedef struct { uint32_t length; /* Total length of this extension message (including this header) */ uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */ uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */ union { clusterMsgPingExtHostname hostname; clusterMsgPingExtForgottenNode forgotten_node; + clusterMsgPingExtShardId shard_id; } ext[]; /* Actual extension information, formatted so that the data is 8 * byte aligned, regardless of its content. */ } clusterMsgPingExt; diff --git a/src/commands.c b/src/commands.c index caf7bda89..7e43b1e6d 100644 --- a/src/commands.c +++ b/src/commands.c @@ -476,6 +476,17 @@ struct redisCommandArg CLUSTER_MEET_Args[] = { /* CLUSTER MYID tips */ #define CLUSTER_MYID_tips NULL +/********** CLUSTER MYSHARDID ********************/ + +/* CLUSTER MYSHARDID history */ +#define CLUSTER_MYSHARDID_History NULL + +/* CLUSTER MYSHARDID tips */ +const char *CLUSTER_MYSHARDID_tips[] = { +"nondeterministic_output", +NULL +}; + /********** CLUSTER NODES ********************/ /* CLUSTER NODES history */ @@ -647,6 +658,7 @@ struct redisCommand CLUSTER_Subcommands[] = { {"links","Returns a list of all TCP links to and from peer nodes in cluster","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,CLUSTER_LINKS_tips,clusterCommand,2,CMD_STALE,0}, {"meet","Force a node cluster to handshake with another node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MEET_History,CLUSTER_MEET_tips,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,.args=CLUSTER_MEET_Args}, {"myid","Return the node id","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,CLUSTER_MYID_tips,clusterCommand,2,CMD_STALE,0}, +{"myshardid","Return the node shard id","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYSHARDID_History,CLUSTER_MYSHARDID_tips,clusterCommand,2,CMD_STALE,0}, {"nodes","Get Cluster config for the node","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,CLUSTER_NODES_tips,clusterCommand,2,CMD_STALE,0}, {"replicas","List replica nodes of the specified master node","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,CLUSTER_REPLICAS_tips,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,.args=CLUSTER_REPLICAS_Args}, {"replicate","Reconfigure a node as a replica of the specified master node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,CLUSTER_REPLICATE_tips,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,.args=CLUSTER_REPLICATE_Args}, diff --git a/src/commands/cluster-myshardid.json b/src/commands/cluster-myshardid.json new file mode 100644 index 000000000..ffd26eec6 --- /dev/null +++ b/src/commands/cluster-myshardid.json @@ -0,0 +1,18 @@ +{ + "MYSHARDID": { + "summary": "Return the node shard id", + "complexity": "O(1)", + "group": "cluster", + "since": "7.2.0", + "arity": 2, + "container": "CLUSTER", + "function": "clusterCommand", + "history": [], + "command_flags": [ + "STALE" + ], + "command_tips": [ + "NONDETERMINISTIC_OUTPUT" + ] + } +} diff --git a/src/server.h b/src/server.h index 3af135114..23e9d8e8e 100644 --- a/src/server.h +++ b/src/server.h @@ -91,6 +91,9 @@ typedef struct redisObject robj; #include "endianconv.h" #include "crc64.h" +/* helpers */ +#define numElements(x) (sizeof(x)/sizeof((x)[0])) + /* min/max */ #undef min #undef max @@ -3301,6 +3304,7 @@ uint64_t dictSdsCaseHash(const void *key); int dictSdsKeyCompare(dict *d, const void *key1, const void *key2); int dictSdsKeyCaseCompare(dict *d, const void *key1, const void *key2); void dictSdsDestructor(dict *d, void *val); +void dictListDestructor(dict *d, void *val); void *dictSdsDup(dict *d, const void *key); /* Git SHA1 */ |