summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/redis-cli.c111
-rw-r--r--tests/support/util.tcl4
-rw-r--r--tests/unit/cluster.tcl94
3 files changed, 153 insertions, 56 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 444d9af0a..0de78c67e 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -81,6 +81,7 @@
#define REDIS_CLI_CLUSTER_YES_ENV "REDISCLI_CLUSTER_YES"
#define CLUSTER_MANAGER_SLOTS 16384
+#define CLUSTER_MANAGER_PORT_INCR 10000 /* same as CLUSTER_PORT_INCR */
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
#define CLUSTER_MANAGER_MIGRATE_PIPELINE 10
#define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
@@ -2867,6 +2868,7 @@ typedef struct clusterManagerNode {
sds name;
char *ip;
int port;
+ int bus_port; /* cluster-port */
uint64_t current_epoch;
time_t ping_sent;
time_t ping_recv;
@@ -2937,7 +2939,7 @@ typedef int (*clusterManagerOnReplyError)(redisReply *reply,
/* Cluster Manager helper functions */
-static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
+static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
static void clusterManagerNodeResetSlots(clusterManagerNode *node);
@@ -2997,15 +2999,15 @@ typedef struct clusterManagerCommandDef {
clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"},
- {"check", clusterManagerCommandCheck, -1, "host:port",
+ {"check", clusterManagerCommandCheck, -1, "<host:port> or <host> <port> - separated by either colon or space",
"search-multiple-owners"},
- {"info", clusterManagerCommandInfo, -1, "host:port", NULL},
- {"fix", clusterManagerCommandFix, -1, "host:port",
+ {"info", clusterManagerCommandInfo, -1, "<host:port> or <host> <port> - separated by either colon or space", NULL},
+ {"fix", clusterManagerCommandFix, -1, "<host:port> or <host> <port> - separated by either colon or space",
"search-multiple-owners,fix-with-unreachable-masters"},
- {"reshard", clusterManagerCommandReshard, -1, "host:port",
+ {"reshard", clusterManagerCommandReshard, -1, "<host:port> or <host> <port> - separated by either colon or space",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
"replace"},
- {"rebalance", clusterManagerCommandRebalance, -1, "host:port",
+ {"rebalance", clusterManagerCommandRebalance, -1, "<host:port> or <host> <port> - separated by either colon or space",
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
{"add-node", clusterManagerCommandAddNode, 2,
@@ -3094,6 +3096,7 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) {
static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
int *bus_port_ptr)
{
+ /* ip:port[@bus_port] */
char *c = strrchr(addr, '@');
if (c != NULL) {
*c = '\0';
@@ -3203,12 +3206,15 @@ static void freeClusterManager(void) {
dictRelease(clusterManagerUncoveredSlots);
}
-static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
+static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port) {
clusterManagerNode *node = zmalloc(sizeof(*node));
node->context = NULL;
node->name = NULL;
node->ip = ip;
node->port = port;
+ /* We don't need to know the bus_port, at this point this value may be wrong.
+ * If it is used, it will be corrected in clusterManagerLoadInfoFromNode. */
+ node->bus_port = bus_port ? bus_port : port + CLUSTER_MANAGER_PORT_INCR;
node->current_epoch = 0;
node->ping_sent = 0;
node->ping_recv = 0;
@@ -4611,9 +4617,20 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
success = 0;
goto cleanup;
}
+
+ char *ip = NULL;
+ int port = 0, bus_port = 0;
+ if (addr == NULL || !parseClusterNodeAddress(addr, &ip, &port, &bus_port)) {
+ fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
+ success = 0;
+ goto cleanup;
+ }
+
int myself = (strstr(flags, "myself") != NULL);
clusterManagerNode *currentNode = NULL;
if (myself) {
+ /* bus-port could be wrong, correct it here, see clusterManagerNewNode. */
+ node->bus_port = bus_port;
node->flags |= CLUSTER_MANAGER_FLAG_MYSELF;
currentNode = node;
clusterManagerNodeResetSlots(node);
@@ -4681,22 +4698,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
if (!(node->flags & CLUSTER_MANAGER_FLAG_MYSELF)) continue;
else break;
} else {
- if (addr == NULL) {
- fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
- success = 0;
- goto cleanup;
- }
- char *c = strrchr(addr, '@');
- if (c != NULL) *c = '\0';
- c = strrchr(addr, ':');
- if (c == NULL) {
- fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
- success = 0;
- goto cleanup;
- }
- *c = '\0';
- int port = atoi(++c);
- currentNode = clusterManagerNewNode(sdsnew(addr), port);
+ currentNode = clusterManagerNewNode(sdsnew(ip), port, bus_port);
currentNode->flags |= CLUSTER_MANAGER_FLAG_FRIEND;
if (node->friends == NULL) node->friends = listCreate();
listAddNodeTail(node->friends, currentNode);
@@ -6110,17 +6112,14 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
cluster_manager.nodes = listCreate();
for (i = 0; i < argc; i++) {
char *addr = argv[i];
- char *c = strrchr(addr, '@');
- if (c != NULL) *c = '\0';
- c = strrchr(addr, ':');
- if (c == NULL) {
+ char *ip = NULL;
+ int port = 0;
+ if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) {
fprintf(stderr, "Invalid address format: %s\n", addr);
return 0;
}
- *c = '\0';
- char *ip = addr;
- int port = atoi(++c);
- clusterManagerNode *node = clusterManagerNewNode(ip, port);
+
+ clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerNodeConnect(node)) {
freeClusterManagerNode(node);
return 0;
@@ -6327,8 +6326,16 @@ assign_replicas:
continue;
}
redisReply *reply = NULL;
- reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
- first_ip, first->port);
+ if (first->bus_port == 0 || (first->bus_port == first->port + CLUSTER_MANAGER_PORT_INCR)) {
+ /* CLUSTER MEET bus-port parameter was added in 4.0.
+ * So if (bus_port == 0) or (bus_port == port + CLUSTER_MANAGER_PORT_INCR),
+ * we just call CLUSTER MEET with 2 arguments, using the old form. */
+ reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
+ first->ip, first->port);
+ } else {
+ reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d %d",
+ first->ip, first->port, first->bus_port);
+ }
int is_err = 0;
if (reply != NULL) {
if ((is_err = reply->type == REDIS_REPLY_ERROR))
@@ -6362,6 +6369,8 @@ assign_replicas:
}
success = 0;
goto cleanup;
+ } else if (err != NULL) {
+ zfree(err);
}
}
// Reset Nodes
@@ -6405,7 +6414,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port,
ref_ip, ref_port);
// Check the existing cluster
- clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port);
+ clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;
@@ -6429,7 +6438,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
}
// Add the new node
- clusterManagerNode *new_node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *new_node = clusterManagerNewNode(ip, port, 0);
int added = 0;
if (!clusterManagerNodeConnect(new_node)) {
clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n",
@@ -6507,8 +6516,18 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
success = 0;
goto cleanup;
}
- reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
- first_ip, first->port);
+
+ if (first->bus_port == 0 || (first->bus_port == first->port + CLUSTER_MANAGER_PORT_INCR)) {
+ /* CLUSTER MEET bus-port parameter was added in 4.0.
+ * So if (bus_port == 0) or (bus_port == port + CLUSTER_MANAGER_PORT_INCR),
+ * we just call CLUSTER MEET with 2 arguments, using the old form. */
+ reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
+ first_ip, first->port);
+ } else {
+ reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d %d",
+ first->ip, first->port, first->bus_port);
+ }
+
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
goto cleanup;
@@ -6545,7 +6564,7 @@ static int clusterManagerCommandDeleteNode(int argc, char **argv) {
char *node_id = argv[1];
clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n",
node_id, ip, port);
- clusterManagerNode *ref_node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *ref_node = clusterManagerNewNode(ip, port, 0);
clusterManagerNode *node = NULL;
// Load cluster information
@@ -6607,7 +6626,7 @@ static int clusterManagerCommandInfo(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
- clusterManagerNode *node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerShowClusterInfo();
return 1;
@@ -6620,7 +6639,7 @@ static int clusterManagerCommandCheck(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
- clusterManagerNode *node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerShowClusterInfo();
return clusterManagerCheckCluster(0);
@@ -6638,7 +6657,7 @@ static int clusterManagerCommandReshard(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
- clusterManagerNode *node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerCheckCluster(0);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
@@ -6827,7 +6846,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
clusterManagerNode **weightedNodes = NULL;
list *involved = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
- clusterManagerNode *node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int result = 1, i;
if (config.cluster_manager_command.weight != NULL) {
@@ -7028,7 +7047,7 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) {
return 0;
}
// Load cluster information
- clusterManagerNode *node = clusterManagerNewNode(ip, port);
+ clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int ok_count = 0, err_count = 0;
@@ -7098,7 +7117,7 @@ static int clusterManagerCommandImport(int argc, char **argv) {
clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n",
src_ip, src_port, ip, port);
- clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
+ clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;
char *reply_err = NULL;
@@ -7233,7 +7252,7 @@ static int clusterManagerCommandCall(int argc, char **argv) {
int port = 0, i;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
- clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
+ clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
argc--;
argv++;
@@ -7278,7 +7297,7 @@ static int clusterManagerCommandBackup(int argc, char **argv) {
int success = 1, port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
- clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
+ clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
int no_issues = clusterManagerCheckCluster(0);
int cluster_errors_count = (no_issues ? 0 :
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 6741b719a..fd72dcf75 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -72,8 +72,8 @@ proc sanitizer_errors_from_file {filename} {
}
proc getInfoProperty {infostr property} {
- if {[regexp "\r\n$property:(.*?)\r\n" $infostr _ value]} {
- set _ $value
+ if {[regexp -lineanchor "^$property:(.*?)\r\n" $infostr _ value]} {
+ return $value
}
}
diff --git a/tests/unit/cluster.tcl b/tests/unit/cluster.tcl
index 9d49a2dee..180a19257 100644
--- a/tests/unit/cluster.tcl
+++ b/tests/unit/cluster.tcl
@@ -3,9 +3,7 @@
source tests/support/cli.tcl
proc cluster_info {r field} {
- if {[regexp "^$field:(.*?)\r\n" [$r cluster info] _ value]} {
- set _ $value
- }
+ set _ [getInfoProperty [$r cluster info] $field]
}
# Provide easy access to CLUSTER INFO properties. Same semantic as "proc s".
@@ -110,7 +108,7 @@ start_multiple_servers 3 [list overrides $base_conf] {
}
$node3_rd close
-
+
test "Run blocking command again on cluster node1" {
$node1 del key9184688
# key9184688 is mapped to slot 10923 which has been moved to node1
@@ -123,9 +121,9 @@ start_multiple_servers 3 [list overrides $base_conf] {
fail "Client not blocked"
}
}
-
+
test "Kill a cluster node and wait for fail state" {
- # kill node3 in cluster
+ # kill node3 in cluster
exec kill -SIGSTOP $node3_pid
wait_for_condition 1000 50 {
@@ -135,7 +133,7 @@ start_multiple_servers 3 [list overrides $base_conf] {
fail "Cluster doesn't fail"
}
}
-
+
test "Verify command got unblocked after cluster failure" {
assert_error {*CLUSTERDOWN*} {$node1_rd read}
@@ -208,7 +206,7 @@ start_multiple_servers 5 [list overrides $base_conf] {
127.0.0.1:[srv -4 port] \
127.0.0.1:[srv 0 port]
} e
- assert_match {*node already contains functions*} $e
+ assert_match {*node already contains functions*} $e
}
} ;# stop servers
@@ -315,6 +313,86 @@ test {Migrate the last slot away from a node using redis-cli} {
}
}
+# Test redis-cli --cluster create, add-node with cluster-port.
+# Create five nodes, three with custom cluster_port and two with default values.
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
+
+ # The first three are used to test --cluster create.
+ # The last two are used to test --cluster add-node
+ set node1_rd [redis_client 0]
+ set node2_rd [redis_client -1]
+ set node3_rd [redis_client -2]
+ set node4_rd [redis_client -3]
+ set node5_rd [redis_client -4]
+
+ test {redis-cli --cluster create with cluster-port} {
+ exec src/redis-cli --cluster-yes --cluster create \
+ 127.0.0.1:[srv 0 port] \
+ 127.0.0.1:[srv -1 port] \
+ 127.0.0.1:[srv -2 port]
+
+ wait_for_condition 1000 50 {
+ [csi 0 cluster_state] eq {ok} &&
+ [csi -1 cluster_state] eq {ok} &&
+ [csi -2 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Make sure each node can meet other nodes
+ assert_equal 3 [csi 0 cluster_known_nodes]
+ assert_equal 3 [csi -1 cluster_known_nodes]
+ assert_equal 3 [csi -2 cluster_known_nodes]
+ }
+
+ test {redis-cli --cluster add-node with cluster-port} {
+ # Adding node to the cluster (without cluster-port)
+ exec src/redis-cli --cluster-yes --cluster add-node \
+ 127.0.0.1:[srv -3 port] \
+ 127.0.0.1:[srv 0 port]
+
+ wait_for_condition 1000 50 {
+ [csi 0 cluster_state] eq {ok} &&
+ [csi -1 cluster_state] eq {ok} &&
+ [csi -2 cluster_state] eq {ok} &&
+ [csi -3 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Adding node to the cluster (with cluster-port)
+ exec src/redis-cli --cluster-yes --cluster add-node \
+ 127.0.0.1:[srv -4 port] \
+ 127.0.0.1:[srv 0 port]
+
+ wait_for_condition 1000 50 {
+ [csi 0 cluster_state] eq {ok} &&
+ [csi -1 cluster_state] eq {ok} &&
+ [csi -2 cluster_state] eq {ok} &&
+ [csi -3 cluster_state] eq {ok} &&
+ [csi -4 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Make sure each node can meet other nodes
+ assert_equal 5 [csi 0 cluster_known_nodes]
+ assert_equal 5 [csi -1 cluster_known_nodes]
+ assert_equal 5 [csi -2 cluster_known_nodes]
+ assert_equal 5 [csi -3 cluster_known_nodes]
+ assert_equal 5 [csi -4 cluster_known_nodes]
+ }
+# stop 5 servers
+}
+}
+}
+}
+}
+
} ;# tags
set ::singledb $old_singledb