summaryrefslogtreecommitdiff
path: root/src/cluster.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cluster.c')
-rw-r--r--src/cluster.c53
1 files changed, 52 insertions, 1 deletions
diff --git a/src/cluster.c b/src/cluster.c
index b4630299a..1612c9e83 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -2039,6 +2039,21 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) {
return extension_size;
}
+/* Write the forgotten node ping extension at the start of the cursor, update
+ * the cursor to point to the end of the written extension and return the number
+ * of bytes written. */
+int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) {
+ serverAssert(sdslen(name) == CLUSTER_NAMELEN);
+ clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node;
+ memcpy(ext->name, name, CLUSTER_NAMELEN);
+ ext->ttl = htonu64(ttl);
+ uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode);
+ (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE);
+ (*cursor)->length = htonl(extension_size);
+ *cursor = (clusterMsgPingExt *) (ext->name + sizeof(clusterMsgPingExtForgottenNode));
+ return extension_size;
+}
+
/* We previously validated the extensions, so this function just needs to
* handle the extensions. */
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
@@ -2052,6 +2067,19 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname;
+ } else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
+ clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
+ clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
+ if (n && n != myself && !(nodeIsSlave(myself) && myself->slaveof == n)) {
+ sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN);
+ dictEntry *de = dictAddRaw(server.cluster->nodes_black_list, id, NULL);
+ serverAssert(de != NULL);
+ uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl);
+ dictSetUnsignedIntegerVal(de, expire);
+ clusterDelNode(n);
+ clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
+ CLUSTER_TODO_SAVE_CONFIG);
+ }
} else {
/* Unknown type, we will ignore it but log what happened. */
serverLog(LL_WARNING, "Received unknown extension type %d", type);
@@ -2951,6 +2979,8 @@ void clusterSendPing(clusterLink *link, int type) {
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
estlen += getHostnamePingExtSize();
+ estlen += dictSize(server.cluster->nodes_black_list) *
+ (sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
@@ -3031,6 +3061,22 @@ void clusterSendPing(clusterLink *link, int type) {
extensions++;
}
+ /* Gossip forgotten nodes */
+ if (dictSize(server.cluster->nodes_black_list) > 0) {
+ dictIterator *di = dictGetIterator(server.cluster->nodes_black_list);
+ dictEntry *de;
+ while ((de = dictNext(di)) != NULL) {
+ sds name = dictGetKey(de);
+ uint64_t expire = dictGetUnsignedIntegerVal(de);
+ if ((time_t)expire < server.unixtime) continue; /* already expired */
+ uint64_t ttl = expire - server.unixtime;
+ hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
+ totlen += writeForgottenNodePingExt(&cursor, name, ttl);
+ extensions++;
+ }
+ dictReleaseIterator(di);
+ }
+
/* Compute the actual total length and send! */
totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
@@ -5639,7 +5685,12 @@ NULL
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
- addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
+ if (clusterBlacklistExists((char*)c->argv[2]->ptr))
+ /* Already forgotten. The deletion may have been gossipped by
+ * another node, so we pretend it succeeded. */
+ addReply(c,shared.ok);
+ else
+ addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else if (n == myself) {
addReplyError(c,"I tried hard but I can't forget myself...");