summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf26
-rw-r--r--src/cluster.c312
-rw-r--r--src/cluster.h33
-rw-r--r--src/config.c39
-rw-r--r--src/debug.c8
-rw-r--r--src/server.c1
-rw-r--r--src/server.h14
-rw-r--r--tests/cluster/cluster.tcl3
-rw-r--r--tests/cluster/tests/27-endpoints.tcl221
-rw-r--r--tests/cluster/tests/includes/init-tests.tcl2
10 files changed, 609 insertions, 50 deletions
diff --git a/redis.conf b/redis.conf
index 8804aac37..9d0f0dfb7 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1632,6 +1632,32 @@ lua-time-limit 5000
# PubSub message by default. (client-query-buffer-limit default value is 1gb)
#
# cluster-link-sendbuf-limit 0
+
+# Clusters can configure their announced hostname using this config. This is a common use case for
+# applications that need to use TLS Server Name Indication (SNI) or dealing with DNS based
+# routing. By default this value is only shown as additional metadata in the CLUSTER SLOTS
+# command, but can be changed using 'cluster-preferred-endpoint-type' config. This value is
+# communicated along the clusterbus to all nodes, setting it to an empty string will remove
+# the hostname and also propgate the removal.
+#
+# cluster-announce-hostname ""
+
+# Clusters can advertise how clients should connect to them using either their IP address,
+# a user defined hostname, or by declaring they have no endpoint. Which endpoint is
+# shown as the preferred endpoint is set by using the cluster-preferred-endpoint-type
+# config with values 'ip', 'hostname', or 'unknown-endpoint'. This value controls how
+# the endpoint returned for MOVED/ASKING requests as well as the first field of CLUSTER SLOTS.
+# If the preferred endpoint type is set to hostname, but no announced hostname is set, a '?'
+# will be returned instead.
+#
+# When a cluster advertises itself as having an unknown endpoint, it's indicating that
+# the server doesn't know how clients can reach the cluster. This can happen in certain
+# networking situations where there are multiple possible routes to the node, and the
+# server doesn't know which one the client took. In this case, the server is expecting
+# the client to reach out on the same endpoint it used for making the last request, but use
+# the port provided in the response.
+#
+# cluster-preferred-endpoint-type ip
# In order to setup your cluster make sure to read the documentation
# available at https://redis.io web site.
diff --git a/src/cluster.c b/src/cluster.c
index 81322a8aa..87a965d96 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -215,6 +215,9 @@ int clusterLoadConfig(char *filename) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}
+ /* Format for the node address information:
+ * ip:port[@cport][,hostname] */
+
/* Address and port */
if ((p = strrchr(argv[1],':')) == NULL) {
sdsfreesplitres(argv,argc);
@@ -234,6 +237,18 @@ int clusterLoadConfig(char *filename) {
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
+ /* Hostname is an optional argument that defines the endpoint
+ * that can be reported to clients instead of IP. */
+ char *hostname = strchr(p, ',');
+ if (hostname) {
+ *hostname = '\0';
+ hostname++;
+ zfree(n->hostname);
+ n->hostname = zstrdup(hostname);
+ } else {
+ n->hostname = NULL;
+ }
+
/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */
@@ -553,6 +568,31 @@ void clusterUpdateMyselfIp(void) {
}
}
+/* Update the hostname for the specified node with the provided C string. */
+static void updateAnnouncedHostname(clusterNode *node, char *new) {
+ if (!node->hostname && !new) {
+ return;
+ }
+
+ /* Previous and new hostname are the same, no need to update. */
+ if (new && node->hostname && !strcmp(new, node->hostname)) {
+ return;
+ }
+
+ if (node->hostname) zfree(node->hostname);
+ if (new) {
+ node->hostname = zstrdup(new);
+ } else {
+ node->hostname = NULL;
+ }
+}
+
+/* Update my hostname based on server configuration values */
+void clusterUpdateMyselfHostname(void) {
+ if (!myself) return;
+ updateAnnouncedHostname(myself, server.cluster_announce_hostname);
+}
+
void clusterInit(void) {
int saveconf = 0;
@@ -646,6 +686,7 @@ void clusterInit(void) {
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
+ clusterUpdateMyselfHostname();
}
/* Reset a node performing a soft or hard reset:
@@ -918,6 +959,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->link = NULL;
node->inbound_link = NULL;
memset(node->ip,0,sizeof(node->ip));
+ node->hostname = NULL;
node->port = 0;
node->cport = 0;
node->pport = 0;
@@ -1083,6 +1125,7 @@ void freeClusterNode(clusterNode *n) {
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);
+ zfree(n->hostname);
/* Release links and associated data structures. */
if (n->link) freeClusterLink(n->link);
@@ -1871,6 +1914,93 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
}
}
+/* Cluster ping extensions.
+ *
+ * The ping/pong/meet messages support arbitrary extensions to add additional
+ * metadata to the messages that are sent between the various nodes in the
+ * cluster. The extensions take the form:
+ * [ Header length + type (8 bytes) ]
+ * [ Extension information (Arbitrary length, but must be 8 byte padded) ]
+ */
+
+
+/* Returns the length of a given extension */
+static uint32_t getPingExtLength(clusterMsgPingExt *ext) {
+ return ntohl(ext->length);
+}
+
+/* Returns the initial position of ping extensions. May return an invalid
+ * address if there are no ping extensions. */
+static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, uint16_t count) {
+ clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]);
+ return initial;
+}
+
+/* Given a current ping extension, returns the start of the next extension. May return
+ * an invalid address if there are no further ping extensions. */
+static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) {
+ clusterMsgPingExt *next = (clusterMsgPingExt *) (((char *) ext) + getPingExtLength(ext));
+ return next;
+}
+
+/* Returns the exact size needed to store the hostname. The returned value
+ * will be 8 byte padded. */
+int getHostnamePingExtSize() {
+ /* If hostname is not set, we don't send this extension */
+ if (!myself->hostname) return 0;
+
+ int totlen = sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(strlen(myself->hostname) + 1);
+ return totlen;
+}
+
+/* Write the hostname ping extension at the start of the cursor. This function
+ * will update the cursor to point to the end of the written extension and
+ * will return the amount of bytes written. */
+int writeHostnamePingExt(clusterMsgPingExt **cursor) {
+ /* If hostname is not set, we don't send this extension */
+ if (!myself->hostname) return 0;
+
+ /* Add the hostname information at the extension cursor */
+ clusterMsgPingExtHostname *ext = &(*cursor)->ext[0].hostname;
+ size_t hostname_len = strlen(myself->hostname);
+ memcpy(ext->hostname, myself->hostname, hostname_len);
+ uint32_t extension_size = getHostnamePingExtSize();
+
+ /* Move the write cursor */
+ (*cursor)->type = CLUSTERMSG_EXT_TYPE_HOSTNAME;
+ (*cursor)->length = htonl(extension_size);
+ /* Make sure the string is NULL terminated by adding 1 */
+ *cursor = (clusterMsgPingExt *) (ext->hostname + EIGHT_BYTE_ALIGN(strlen(myself->hostname) + 1));
+ return extension_size;
+}
+
+/* We previously validated the extensions, so this function just needs to
+ * handle the extensions. */
+void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
+ clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
+ char *ext_hostname = NULL;
+ uint16_t extensions = ntohs(hdr->extensions);
+ /* Loop through all the extensions and process them */
+ clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count));
+ while (extensions--) {
+ uint16_t type = ntohs(ext->type);
+ if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
+ clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
+ ext_hostname = hostname_ext->hostname;
+ } else {
+ /* Unknown type, we will ignore it but log what happened. */
+ serverLog(LL_WARNING, "Received unknown extension type %d", type);
+ }
+
+ /* We know this will be valid since we validated it ahead of time */
+ ext = getNextPingExt(ext);
+ }
+ /* If the node did not send us a hostname extension, assume
+ * they don't have an announced hostname. Otherwise, we'll
+ * set it now. */
+ updateAnnouncedHostname(sender, ext_hostname);
+}
+
static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
clusterNode *sender;
if (link->node && !nodeInHandshake(link->node)) {
@@ -1920,52 +2050,78 @@ int clusterProcessPacket(clusterLink *link) {
return 1;
}
+ if (type == server.cluster_drop_packet_filter) {
+ serverLog(LL_WARNING, "Dropping packet that matches debug drop filter");
+ return 1;
+ }
+
uint16_t flags = ntohs(hdr->flags);
+ uint16_t extensions = ntohs(hdr->extensions);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
+ uint32_t explen; /* expected length of this packet */
clusterNode *sender;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
uint16_t count = ntohs(hdr->count);
- uint32_t explen; /* expected length of this packet */
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += (sizeof(clusterMsgDataGossip)*count);
- if (totlen != explen) return 1;
- } else if (type == CLUSTERMSG_TYPE_FAIL) {
- uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+ /* If there is extension data, which doesn't have a fixed length,
+ * loop through them and validate the length of it now. */
+ if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
+ clusterMsgPingExt *ext = getInitialPingExt(hdr, count);
+ while (extensions--) {
+ uint16_t extlen = getPingExtLength(ext);
+ if (extlen % 8 != 0) {
+ serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
+ clusterGetMessageTypeString(type), (int) extlen);
+ return 1;
+ }
+ if ((totlen - explen) < extlen) {
+ serverLog(LL_WARNING, "Received invalid %s packet with extension data that exceeds "
+ "total packet length (%lld)", clusterGetMessageTypeString(type),
+ (unsigned long long) totlen);
+ return 1;
+ }
+ explen += extlen;
+ ext = getNextPingExt(ext);
+ }
+ }
+ } else if (type == CLUSTERMSG_TYPE_FAIL) {
+ explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
- if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
- uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-
+ explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
8 +
ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
- if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART)
{
- uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-
- if (totlen != explen) return 1;
+ explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
- uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-
+ explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
- if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_MODULE) {
- uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
-
+ explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) -
3 + ntohl(hdr->data.module.msg.len);
- if (totlen != explen) return 1;
+ } else {
+ /* We don't know this type of packet, so we assume it's well formed. */
+ explen = totlen;
}
+ if (totlen != explen) {
+ serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld",
+ clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen);
+ return 1;
+ }
+
sender = getNodeFromLinkAndMsg(link, hdr);
/* Update the last time we saw any data from this node. We
@@ -2272,7 +2428,10 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Get info from the gossip section */
- if (sender) clusterProcessGossipSection(hdr,link);
+ if (sender) {
+ clusterProcessGossipSection(hdr,link);
+ clusterProcessPingExtensions(hdr,link);
+ }
} else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;
@@ -2695,7 +2854,7 @@ void clusterSendPing(clusterLink *link, int type) {
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
- int totlen; /* Total packet length. */
+ int estlen; /* Upper bound on estimated packet length */
/* freshnodes is the max number of nodes we can hope to append at all:
* nodes available minus two (ourself and the node we are sending the
* message to). However practically there may be less valid nodes since
@@ -2736,15 +2895,17 @@ void clusterSendPing(clusterLink *link, int type) {
* faster to propagate to go from PFAIL to FAIL state. */
int pfail_wanted = server.cluster->stats_pfail_nodes;
- /* Compute the maximum totlen to allocate our buffer. We'll fix the totlen
+ /* Compute the maximum estlen to allocate our buffer. We'll fix the estlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
- totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
- totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
+ estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
+ estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
+ estlen += sizeof(clusterMsgPingExt) + getHostnamePingExtSize();
+
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
- if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
- buf = zcalloc(totlen);
+ if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg);
+ buf = zcalloc(estlen);
hdr = (clusterMsg*) buf;
/* Populate the header. */
@@ -2808,11 +2969,23 @@ void clusterSendPing(clusterLink *link, int type) {
dictReleaseIterator(di);
}
- /* Ready to send... fix the totlen field and queue the message in the
- * output buffer. */
- totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+
+ int totlen = 0;
+ int extensions = 0;
+ /* Set the initial extension position */
+ clusterMsgPingExt *cursor = getInitialPingExt(hdr, gossipcount);
+ /* Add in the extensions */
+ if (myself->hostname) {
+ hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
+ totlen += writeHostnamePingExt(&cursor);
+ extensions++;
+ }
+
+ /* Compute the actual total length and send! */
+ totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
+ hdr->extensions = htons(extensions);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
@@ -3786,6 +3959,7 @@ void clusterCron(void) {
iteration++; /* Number of times this function was called so far. */
+ updateAnnouncedHostname(myself, server.cluster_announce_hostname);
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
@@ -4404,10 +4578,18 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
/* Node coordinates */
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
- ci = sdscatfmt(ci," %s:%i@%i ",
- node->ip,
- port,
- node->cport);
+ if (node->hostname) {
+ ci = sdscatfmt(ci," %s:%i@%i,%s ",
+ node->ip,
+ port,
+ node->cport,
+ node->hostname);
+ } else {
+ ci = sdscatfmt(ci," %s:%i@%i ",
+ node->ip,
+ port,
+ node->cport);
+ }
/* Flags */
ci = representClusterNodeFlags(ci, node->flags);
@@ -4619,6 +4801,15 @@ void addReplyClusterLinksDescription(client *c) {
* CLUSTER command
* -------------------------------------------------------------------------- */
+const char *getPreferredEndpoint(clusterNode *n) {
+ switch(server.cluster_preferred_endpoint_type) {
+ case CLUSTER_ENDPOINT_TYPE_IP: return n->ip;
+ case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return n->hostname ? n->hostname : "?";
+ case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return "";
+ }
+ return "unknown";
+}
+
const char *clusterGetMessageTypeString(int type) {
switch(type) {
case CLUSTERMSG_TYPE_PING: return "ping";
@@ -4702,31 +4893,56 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}
-void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
- int i, nested_elements = 3; /* slots (2) + master addr (1) */
- void *nested_replylen = addReplyDeferredLen(c);
- addReplyLongLong(c, start_slot);
- addReplyLongLong(c, end_slot);
- addReplyArrayLen(c, 3);
- addReplyBulkCString(c, node->ip);
+void addNodeToNodeReply(client *c, clusterNode *node) {
+ addReplyArrayLen(c, 4);
+ if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
+ addReplyBulkCString(c, node->ip);
+ } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
+ addReplyBulkCString(c, node->hostname ? node->hostname : "?");
+ } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
+ addReplyNull(c);
+ } else {
+ serverPanic("Unrecognized preferred endpoint type");
+ }
+
/* Report non-TLS ports to non-TLS client in TLS cluster if available. */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
+ /* Add the additional endpoint information, this is all the known networking information
+ * that is not the preferred endpoint. */
+ void *deflen = addReplyDeferredLen(c);
+ int length = 0;
+ if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
+ addReplyBulkCString(c, "ip");
+ addReplyBulkCString(c, node->ip);
+ length++;
+ }
+ if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
+ && node->hostname)
+ {
+ addReplyBulkCString(c, "hostname");
+ addReplyBulkCString(c, node->hostname);
+ length++;
+ }
+ setDeferredMapLen(c, deflen, length);
+}
+
+void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
+ int i, nested_elements = 3; /* slots (2) + master addr (1) */
+ void *nested_replylen = addReplyDeferredLen(c);
+ addReplyLongLong(c, start_slot);
+ addReplyLongLong(c, end_slot);
+ addNodeToNodeReply(c, node);
+
/* Remaining nodes in reply are replicas for slot range */
for (i = 0; i < node->numslaves; i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(node->slaves[i])) continue;
- addReplyArrayLen(c, 3);
- addReplyBulkCString(c, node->slaves[i]->ip);
- /* Report slave's non-TLS port to non-TLS client in TLS cluster */
- addReplyLongLong(c, (use_pport && node->slaves[i]->pport ?
- node->slaves[i]->pport :
- node->slaves[i]->port));
- addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
+ addNodeToNodeReply(c, node->slaves[i]);
nested_elements++;
}
setDeferredArrayLen(c, nested_replylen, nested_elements);
@@ -4864,7 +5080,7 @@ NULL
/* Report plaintext ports, only if cluster is TLS but client is known to
* be non-TLS). */
int use_pport = (server.tls_cluster &&
- c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
+ c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
sds nodes = clusterGenNodesDescription(0, use_pport);
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
sdsfree(nodes);
@@ -6391,12 +6607,12 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
/* Redirect to IP:port. Include plaintext port if cluster is TLS but
* client is non-TLS. */
int use_pport = (server.tls_cluster &&
- c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
+ c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
int port = use_pport && n->pport ? n->pport : n->port;
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
- hashslot, n->ip, port));
+ hashslot, getPreferredEndpoint(n), port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
diff --git a/src/cluster.h b/src/cluster.h
index a28176e4b..8b76ed19e 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -135,6 +135,7 @@ typedef struct clusterNode {
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
+ char *hostname; /* The known hostname for this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
@@ -245,11 +246,38 @@ typedef struct {
unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */
} clusterMsgModule;
+/* The cluster supports optional extension messages that can be sent
+ * along with ping/pong/meet messages to give additional info in a
+ * consistent manner. */
+typedef enum {
+ CLUSTERMSG_EXT_TYPE_HOSTNAME,
+} clusterMsgPingtypes;
+
+/* Helper function for making sure extensions are eight byte aligned. */
+#define EIGHT_BYTE_ALIGN(size) ((((size) + 7) / 8) * 8)
+
+typedef struct {
+ char hostname[1]; /* The announced hostname, ends with \0. */
+} clusterMsgPingExtHostname;
+
+typedef struct {
+ uint32_t length; /* Total length of this extension message (including this header) */
+ uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
+ uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
+ union {
+ clusterMsgPingExtHostname hostname;
+ } ext[]; /* Actual extension information, formatted so that the data is 8
+ * byte aligned, regardless of its content. */
+} clusterMsgPingExt;
+
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
+ /* Extension data that can optionally be sent for ping/meet/pong
+ * messages. We can't explicitly define them here though, since
+ * the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
@@ -292,7 +320,8 @@ typedef struct {
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
- char notused1[32]; /* 32 bytes reserved for future usage. */
+ uint16_t extensions; /* Number of extensions sent along with this packet. */
+ char notused1[16]; /* 16 bytes reserved for future usage. */
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
@@ -308,6 +337,7 @@ typedef struct {
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if
master is up. */
+#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
/* ---------------------- API exported outside cluster.c -------------------- */
void clusterInit(void);
@@ -334,5 +364,6 @@ void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
void slotToChannelAdd(sds channel);
void slotToChannelDel(sds channel);
+void clusterUpdateMyselfHostname(void);
#endif /* __CLUSTER_H */
diff --git a/src/config.c b/src/config.c
index 317b92ea2..0fc93d1de 100644
--- a/src/config.c
+++ b/src/config.c
@@ -141,6 +141,13 @@ configEnum protected_action_enum[] = {
{NULL, 0}
};
+configEnum cluster_preferred_endpoint_type_enum[] = {
+ {"ip", CLUSTER_ENDPOINT_TYPE_IP},
+ {"hostname", CLUSTER_ENDPOINT_TYPE_HOSTNAME},
+ {"unknown-endpoint", CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT},
+ {NULL, 0}
+};
+
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@@ -2150,6 +2157,30 @@ static int isValidAOFfilename(char *val, const char **err) {
return 1;
}
+static int isValidAnnouncedHostname(char *val, const char **err) {
+ if (strlen(val) >= NET_HOST_STR_LEN) {
+ *err = "Hostnames must be less than "
+ STRINGIFY(NET_HOST_STR_LEN) " characters";
+ return 0;
+ }
+
+ int i = 0;
+ char c;
+ while ((c = val[i])) {
+ /* We just validate the character set to make sure that everything
+ * is parsed and handled correctly. */
+ if (!((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
+ || (c >= '0' && c <= '9') || (c == '-') || (c == '.')))
+ {
+ *err = "Hostnames may only contain alphanumeric characters, "
+ "hyphens or dots";
+ return 0;
+ }
+ c = val[i++];
+ }
+ return 1;
+}
+
/* Validate specified string is a valid proc-title-template */
static int isValidProcTitleTemplate(char *val, const char **err) {
if (!validateProcTitleTemplate(val)) {
@@ -2305,6 +2336,12 @@ static int updateClusterIp(const char **err) {
return 1;
}
+int updateClusterHostname(const char **err) {
+ UNUSED(err);
+ clusterUpdateMyselfHostname();
+ return 1;
+}
+
#ifdef USE_OPENSSL
static int applyTlsCfg(const char **err) {
UNUSED(err);
@@ -2652,6 +2689,7 @@ standardConfig configs[] = {
createStringConfig("masteruser", NULL, MODIFIABLE_CONFIG | SENSITIVE_CONFIG, EMPTY_STRING_IS_NULL, server.masteruser, NULL, NULL, NULL),
createStringConfig("cluster-announce-ip", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_ip, NULL, NULL, updateClusterIp),
createStringConfig("cluster-config-file", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.cluster_configfile, "nodes.conf", NULL, NULL),
+ createStringConfig("cluster-announce-hostname", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_hostname, NULL, isValidAnnouncedHostname, updateClusterHostname),
createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL),
createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG | PROTECTED_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL),
createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL),
@@ -2681,6 +2719,7 @@ standardConfig configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
+ createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
diff --git a/src/debug.c b/src/debug.c
index f01509e84..0b38b1856 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -425,6 +425,8 @@ void debugCommand(client *c) {
#endif
"OBJECT <key>",
" Show low level info about `key` and associated value.",
+"DROP-CLUSTER-PACKET-FILTER <packet-type>",
+" Drop all packets that match the filtered type. Set to -1 allow all packets.",
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
@@ -575,6 +577,12 @@ NULL
server.dirty = 0; /* Prevent AOF / replication */
serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF");
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"drop-cluster-packet-filter") && c->argc == 3) {
+ long packet_type;
+ if (getLongFromObjectOrReply(c, c->argv[2], &packet_type, NULL) != C_OK)
+ return;
+ server.cluster_drop_packet_filter = packet_type;
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
dictEntry *de;
robj *val;
diff --git a/src/server.c b/src/server.c
index 4770a5df0..78f3c8bfb 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2293,6 +2293,7 @@ void initServer(void) {
server.blocked_last_cron = 0;
server.blocking_op_nesting = 0;
server.thp_enabled = 0;
+ server.cluster_drop_packet_filter = -1;
resetReplicationBuffer();
if ((server.tls_port || server.tls_replication || server.tls_cluster)
diff --git a/src/server.h b/src/server.h
index c1a0af355..fb22b3cf7 100644
--- a/src/server.h
+++ b/src/server.h
@@ -527,6 +527,13 @@ typedef struct {
mstime_t end;
} pause_event;
+/* Ways that a clusters endpoint can be described */
+typedef enum {
+ CLUSTER_ENDPOINT_TYPE_IP = 0, /* Show IP address */
+ CLUSTER_ENDPOINT_TYPE_HOSTNAME, /* Show hostname */
+ CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT /* Show NULL or empty */
+} cluster_endpoint_type;
+
/* RDB active child save type. */
#define RDB_CHILD_TYPE_NONE 0
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
@@ -1771,6 +1778,8 @@ struct redisServer {
int cluster_slave_no_failover; /* Prevent slave from starting a failover
if the master is in failure state. */
char *cluster_announce_ip; /* IP address to announce on cluster bus. */
+ char *cluster_announce_hostname; /* IP address to announce on cluster bus. */
+ int cluster_preferred_endpoint_type; /* Use the announced hostname when available. */
int cluster_announce_port; /* base port to announce on cluster bus. */
int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
@@ -1782,6 +1791,8 @@ struct redisServer {
is down? */
int cluster_config_file_lock_fd; /* cluster config fd, will be flock */
unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/
+ int cluster_drop_packet_filter; /* Debug config that allows tactically
+ * dropping packets of a specific type */
/* Scripting */
client *script_caller; /* The client running script right now, or NULL */
mstime_t script_time_limit; /* Script timeout in milliseconds */
@@ -3334,4 +3345,7 @@ int isTlsConfigured(void);
int iAmMaster(void);
+#define STRINGIFY_(x) #x
+#define STRINGIFY(x) STRINGIFY_(x)
+
#endif
diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl
index 31e0c3667..531e90d6c 100644
--- a/tests/cluster/cluster.tcl
+++ b/tests/cluster/cluster.tcl
@@ -142,7 +142,8 @@ proc cluster_allocate_with_continuous_slots {n} {
}
}
-# Create a cluster composed of the specified number of masters and slaves with continuous slots.
+# Create a cluster composed of the specified number of masters and slaves,
+# but with a continuous slot range.
proc cluster_create_with_continuous_slots {masters slaves} {
cluster_allocate_with_continuous_slots $masters
if {$slaves} {
diff --git a/tests/cluster/tests/27-endpoints.tcl b/tests/cluster/tests/27-endpoints.tcl
new file mode 100644
index 000000000..0cadee2eb
--- /dev/null
+++ b/tests/cluster/tests/27-endpoints.tcl
@@ -0,0 +1,221 @@
+source "../tests/includes/init-tests.tcl"
+
+# Check if cluster's view of hostnames is consistent
+proc are_hostnames_propagated {match_string} {
+ for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
+ set cfg [R $j cluster slots]
+ foreach node $cfg {
+ for {set i 2} {$i < [llength $node]} {incr i} {
+ if {! [string match $match_string [lindex [lindex [lindex $node $i] 3] 1]] } {
+ return 0
+ }
+ }
+ }
+ }
+ return 1
+}
+
+# Isolate a node from the cluster and give it a new nodeid
+proc isolate_node {id} {
+ set node_id [R $id CLUSTER MYID]
+ R 6 CLUSTER RESET HARD
+ for {set j 0} {$j < 20} {incr j} {
+ if { $j eq $id } {
+ continue
+ }
+ R $j CLUSTER FORGET $node_id
+ }
+}
+
+proc get_slot_field {slot_output shard_id node_id attrib_id} {
+ return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
+}
+
+test "Create a 6 nodes cluster" {
+ cluster_create_with_continuous_slots 3 3
+}
+
+test "Cluster should start ok" {
+ assert_cluster_state ok
+ wait_for_cluster_propagation
+}
+
+test "Set cluster hostnames and verify they are propagated" {
+ for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
+ R $j config set cluster-announce-hostname "host-$j.com"
+ }
+
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated "host-*.com"] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+
+ # Now that everything is propagated, assert everyone agrees
+ wait_for_cluster_propagation
+}
+
+test "Update hostnames and make sure they are all eventually propagated" {
+ for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
+ R $j config set cluster-announce-hostname "host-updated-$j.com"
+ }
+
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated "host-updated-*.com"] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+
+ # Now that everything is propagated, assert everyone agrees
+ wait_for_cluster_propagation
+}
+
+test "Remove hostnames and make sure they are all eventually propagated" {
+ for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
+ R $j config set cluster-announce-hostname ""
+ }
+
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated ""] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+
+ # Now that everything is propagated, assert everyone agrees
+ wait_for_cluster_propagation
+}
+
+test "Verify cluster-preferred-endpoint-type behavior for redirects and info" {
+ R 0 config set cluster-announce-hostname "me.com"
+ R 1 config set cluster-announce-hostname ""
+ R 2 config set cluster-announce-hostname "them.com"
+
+ wait_for_cluster_propagation
+
+ # Verify default behavior
+ set slot_result [R 0 cluster slots]
+ assert_equal "" [lindex [get_slot_field $slot_result 0 2 0] 1]
+ assert_equal "" [lindex [get_slot_field $slot_result 2 2 0] 1]
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 0]
+ assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 1]
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 0]
+ assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 1]
+
+ # Redirect will use the IP address
+ catch {R 0 set foo foo} redir_err
+ assert_match "MOVED * 127.0.0.1:*" $redir_err
+
+ # Verify prefer hostname behavior
+ R 0 config set cluster-preferred-endpoint-type hostname
+
+ set slot_result [R 0 cluster slots]
+ assert_equal "me.com" [get_slot_field $slot_result 0 2 0]
+ assert_equal "them.com" [get_slot_field $slot_result 2 2 0]
+
+ # Redirect should use hostname
+ catch {R 0 set foo foo} redir_err
+ assert_match "MOVED * them.com:*" $redir_err
+
+ # Redirect to an unknown hostname returns ?
+ catch {R 0 set barfoo bar} redir_err
+ assert_match "MOVED * ?:*" $redir_err
+
+ # Verify unknown hostname behavior
+ R 0 config set cluster-preferred-endpoint-type unknown-endpoint
+
+ # Verify default behavior
+ set slot_result [R 0 cluster slots]
+ assert_equal "ip" [lindex [get_slot_field $slot_result 0 2 3] 0]
+ assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 0 2 3] 1]
+ assert_equal "ip" [lindex [get_slot_field $slot_result 2 2 3] 0]
+ assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 2 2 3] 1]
+ assert_equal "ip" [lindex [get_slot_field $slot_result 1 2 3] 0]
+ assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 1 2 3] 1]
+ # Not required by the protocol, but IP comes before hostname
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 2]
+ assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 3]
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 2]
+ assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 3]
+
+ # This node doesn't have a hostname
+ assert_equal 2 [llength [get_slot_field $slot_result 1 2 3]]
+
+ # Redirect should use empty string
+ catch {R 0 set foo foo} redir_err
+ assert_match "MOVED * :*" $redir_err
+
+ R 0 config set cluster-preferred-endpoint-type ip
+}
+
+test "Verify the nodes configured with prefer hostname only show hostname for new nodes" {
+ # Have everyone forget node 6 and isolate it from the cluster.
+ isolate_node 6
+
+ # Set hostnames for the primaries, now that the node is isolated
+ R 0 config set cluster-announce-hostname "shard-1.com"
+ R 1 config set cluster-announce-hostname "shard-2.com"
+ R 2 config set cluster-announce-hostname "shard-3.com"
+
+ # Prevent Node 0 and Node 6 from properly meeting,
+ # they'll hang in the handshake phase. This allows us to
+ # test the case where we "know" about it but haven't
+ # successfully retrieved information about it yet.
+ R 0 DEBUG DROP-CLUSTER-PACKET-FILTER 0
+ R 6 DEBUG DROP-CLUSTER-PACKET-FILTER 0
+
+ # Have a replica meet the isolated node
+ R 3 cluster meet 127.0.0.1 [get_instance_attrib redis 6 port]
+
+ # Now, we wait until the two nodes that aren't filtering packets
+ # to accept our isolated nodes connections. At this point they will
+ # start showing up in cluster slots.
+ wait_for_condition 50 100 {
+ [llength [R 6 CLUSTER SLOTS]] eq 2
+ } else {
+ fail "Node did not learn about the 2 shards it can talk to"
+ }
+ set slot_result [R 6 CLUSTER SLOTS]
+ assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "shard-2.com"
+ assert_equal [lindex [get_slot_field $slot_result 1 2 3] 1] "shard-3.com"
+
+ # Also make sure we know about the isolated primary, we
+ # just can't reach it.
+ set primary_id [R 0 CLUSTER MYID]
+ assert_match "*$primary_id*" [R 6 CLUSTER NODES]
+
+ # Stop dropping cluster packets, and make sure everything
+ # stabilizes
+ R 0 DEBUG DROP-CLUSTER-PACKET-FILTER -1
+ R 6 DEBUG DROP-CLUSTER-PACKET-FILTER -1
+
+ wait_for_condition 50 100 {
+ [llength [R 6 CLUSTER SLOTS]] eq 3
+ } else {
+ fail "Node did not learn about the 2 shards it can talk to"
+ }
+ set slot_result [R 6 CLUSTER SLOTS]
+ assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "shard-1.com"
+ assert_equal [lindex [get_slot_field $slot_result 1 2 3] 1] "shard-2.com"
+ assert_equal [lindex [get_slot_field $slot_result 2 2 3] 1] "shard-3.com"
+}
+
+test "Test restart will keep hostname information" {
+ # Set a new hostname, reboot and make sure it sticks
+ R 0 config set cluster-announce-hostname "restart-1.com"
+ restart_instance redis 0
+ set slot_result [R 0 CLUSTER SLOTS]
+ assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "restart-1.com"
+
+ # As a sanity check, make sure everyone eventually agrees
+ wait_for_cluster_propagation
+}
+
+test "Test hostname validation" {
+ catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err
+ assert_match "*Hostnames must be less than 256 characters*" $err
+ catch {R 0 config set cluster-announce-hostname "?.com"} err
+ assert_match "*Hostnames may only contain alphanumeric characters, hyphens or dots*" $err
+
+ # Note this isn't a valid hostname, but it passes our internal validation
+ R 0 config set cluster-announce-hostname "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-."
+} \ No newline at end of file
diff --git a/tests/cluster/tests/includes/init-tests.tcl b/tests/cluster/tests/includes/init-tests.tcl
index b787962a8..fc5897a1a 100644
--- a/tests/cluster/tests/includes/init-tests.tcl
+++ b/tests/cluster/tests/includes/init-tests.tcl
@@ -42,6 +42,8 @@ test "Cluster nodes hard reset" {
R $id config set loading-process-events-interval-bytes 2097152
R $id config set key-load-delay 0
R $id config set repl-diskless-load disabled
+ R $id config set cluster-announce-hostname ""
+ R $id DEBUG DROP-CLUSTER-PACKET-FILTER -1
R $id config rewrite
}
}