diff options
Diffstat (limited to 'src/cluster.c')
-rw-r--r-- | src/cluster.c | 53 |
1 files changed, 52 insertions, 1 deletions
diff --git a/src/cluster.c b/src/cluster.c index b4630299a..1612c9e83 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2039,6 +2039,21 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) { return extension_size; } +/* Write the forgotten node ping extension at the start of the cursor, update + * the cursor to point to the end of the written extension and return the number + * of bytes written. */ +int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) { + serverAssert(sdslen(name) == CLUSTER_NAMELEN); + clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node; + memcpy(ext->name, name, CLUSTER_NAMELEN); + ext->ttl = htonu64(ttl); + uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode); + (*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE); + (*cursor)->length = htonl(extension_size); + *cursor = (clusterMsgPingExt *) (ext->name + sizeof(clusterMsgPingExtForgottenNode)); + return extension_size; +} + /* We previously validated the extensions, so this function just needs to * handle the extensions. */ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { @@ -2052,6 +2067,19 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) { clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname); ext_hostname = hostname_ext->hostname; + } else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) { + clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node); + clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN); + if (n && n != myself && !(nodeIsSlave(myself) && myself->slaveof == n)) { + sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN); + dictEntry *de = dictAddRaw(server.cluster->nodes_black_list, id, NULL); + serverAssert(de != NULL); + uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl); + dictSetUnsignedIntegerVal(de, expire); + clusterDelNode(n); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_SAVE_CONFIG); + } } else { /* Unknown type, we will ignore it but log what happened. */ serverLog(LL_WARNING, "Received unknown extension type %d", type); @@ -2951,6 +2979,8 @@ void clusterSendPing(clusterLink *link, int type) { estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData); estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted)); estlen += getHostnamePingExtSize(); + estlen += dictSize(server.cluster->nodes_black_list) * + (sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode)); /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ @@ -3031,6 +3061,22 @@ void clusterSendPing(clusterLink *link, int type) { extensions++; } + /* Gossip forgotten nodes */ + if (dictSize(server.cluster->nodes_black_list) > 0) { + dictIterator *di = dictGetIterator(server.cluster->nodes_black_list); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + sds name = dictGetKey(de); + uint64_t expire = dictGetUnsignedIntegerVal(de); + if ((time_t)expire < server.unixtime) continue; /* already expired */ + uint64_t ttl = expire - server.unixtime; + hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA; + totlen += writeForgottenNodePingExt(&cursor, name, ttl); + extensions++; + } + dictReleaseIterator(di); + } + /* Compute the actual total length and send! */ totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); @@ -5639,7 +5685,12 @@ NULL /* CLUSTER FORGET <NODE ID> */ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); if (!n) { - addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); + if (clusterBlacklistExists((char*)c->argv[2]->ptr)) + /* Already forgotten. The deletion may have been gossipped by + * another node, so we pretend it succeeded. */ + addReply(c,shared.ok); + else + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); return; } else if (n == myself) { addReplyError(c,"I tried hard but I can't forget myself..."); |