summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
authorViktor Söderqvist <viktor.soderqvist@est.tech>2022-07-26 09:28:13 +0200
committerGitHub <noreply@github.com>2022-07-26 10:28:13 +0300
commit5032de50f207b7d4969ee20f79a75d2f1c50dda3 (patch)
tree7dff6b02cb74a7594c8a9f02cb3a50db6cef74d7 /src/cluster.c
parent33bd8fb9810249a35d3bd9f5ddcb8a4f85a1c725 (diff)
downloadredis-5032de50f207b7d4969ee20f79a75d2f1c50dda3.tar.gz
Gossip forgotten nodes on `CLUSTER FORGET` (#10869)
Gossip the cluster node blacklist in ping and pong messages. This means that CLUSTER FORGET doesn't need to be sent to all nodes in a cluster. It can be sent to one or more nodes and then be propagated to the rest of them. For each blacklisted node, its node id and its remaining blacklist TTL is gossiped in a cluster bus ping extension (introduced in #9530).
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c53
1 files changed, 52 insertions, 1 deletions
diff --git a/src/cluster.c b/src/cluster.c
index b4630299a..1612c9e83 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -2039,6 +2039,21 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) {
return extension_size;
}
+/* Write the forgotten node ping extension at the start of the cursor, update
+ * the cursor to point to the end of the written extension and return the number
+ * of bytes written. */
+int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) {
+ serverAssert(sdslen(name) == CLUSTER_NAMELEN);
+ clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node;
+ memcpy(ext->name, name, CLUSTER_NAMELEN);
+ ext->ttl = htonu64(ttl);
+ uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode);
+ (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE);
+ (*cursor)->length = htonl(extension_size);
+ *cursor = (clusterMsgPingExt *) (ext->name + sizeof(clusterMsgPingExtForgottenNode));
+ return extension_size;
+}
+
/* We previously validated the extensions, so this function just needs to
* handle the extensions. */
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
@@ -2052,6 +2067,19 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname;
+ } else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
+ clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
+ clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
+ if (n && n != myself && !(nodeIsSlave(myself) && myself->slaveof == n)) {
+ sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN);
+ dictEntry *de = dictAddRaw(server.cluster->nodes_black_list, id, NULL);
+ serverAssert(de != NULL);
+ uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl);
+ dictSetUnsignedIntegerVal(de, expire);
+ clusterDelNode(n);
+ clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
+ CLUSTER_TODO_SAVE_CONFIG);
+ }
} else {
/* Unknown type, we will ignore it but log what happened. */
serverLog(LL_WARNING, "Received unknown extension type %d", type);
@@ -2951,6 +2979,8 @@ void clusterSendPing(clusterLink *link, int type) {
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
estlen += getHostnamePingExtSize();
+ estlen += dictSize(server.cluster->nodes_black_list) *
+ (sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
@@ -3031,6 +3061,22 @@ void clusterSendPing(clusterLink *link, int type) {
extensions++;
}
+ /* Gossip forgotten nodes */
+ if (dictSize(server.cluster->nodes_black_list) > 0) {
+ dictIterator *di = dictGetIterator(server.cluster->nodes_black_list);
+ dictEntry *de;
+ while ((de = dictNext(di)) != NULL) {
+ sds name = dictGetKey(de);
+ uint64_t expire = dictGetUnsignedIntegerVal(de);
+ if ((time_t)expire < server.unixtime) continue; /* already expired */
+ uint64_t ttl = expire - server.unixtime;
+ hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
+ totlen += writeForgottenNodePingExt(&cursor, name, ttl);
+ extensions++;
+ }
+ dictReleaseIterator(di);
+ }
+
/* Compute the actual total length and send! */
totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
@@ -5639,7 +5685,12 @@ NULL
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
- addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
+ if (clusterBlacklistExists((char*)c->argv[2]->ptr))
+ /* Already forgotten. The deletion may have been gossipped by
+ * another node, so we pretend it succeeded. */
+ addReply(c,shared.ok);
+ else
+ addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else if (n == myself) {
addReplyError(c,"I tried hard but I can't forget myself...");