summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf8
-rw-r--r--src/config.c3
-rw-r--r--src/networking.c6
-rw-r--r--src/replication.c78
-rw-r--r--src/server.h5
-rw-r--r--tests/assets/default.conf1
-rw-r--r--tests/cluster/tests/05-slave-selection.tcl12
-rw-r--r--tests/cluster/tests/06-slave-stop-cond.tcl6
-rw-r--r--tests/cluster/tests/14-consistency-check.tcl6
-rw-r--r--tests/instances.tcl1
-rw-r--r--tests/integration/replication-2.tcl1
-rw-r--r--tests/integration/replication-4.tcl6
-rw-r--r--tests/integration/replication-buffer.tcl3
-rw-r--r--tests/integration/replication.tcl8
-rw-r--r--tests/support/util.tcl8
-rw-r--r--tests/unit/moduleapi/hooks.tcl6
16 files changed, 102 insertions, 56 deletions
diff --git a/redis.conf b/redis.conf
index 0753f3166..ca3d4d6ad 100644
--- a/redis.conf
+++ b/redis.conf
@@ -591,7 +591,7 @@ replica-read-only yes
#
# With slow disks and fast (large bandwidth) networks, diskless replication
# works better.
-repl-diskless-sync no
+repl-diskless-sync yes
# When diskless replication is enabled, it is possible to configure the delay
# the server waits in order to spawn the child that transfers the RDB via socket
@@ -605,6 +605,12 @@ repl-diskless-sync no
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
+# When diskless replication is enabled with a delay, it is possible to let
+# the replication start before the maximum delay is reached if the maximum
+# number of replicas expected have connected. Default of 0 means that the
+# maximum is not defined and Redis will wait the full delay.
+repl-diskless-sync-max-replicas 0
+
# -----------------------------------------------------------------------------
# WARNING: RDB diskless load is experimental. Since in this setup the replica
# does not immediately store an RDB on disk, it may cause data loss during
diff --git a/src/config.c b/src/config.c
index f206bcc8c..d11445775 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2752,7 +2752,7 @@ standardConfig configs[] = {
createBoolConfig("lazyfree-lazy-user-del", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_del , 0, NULL, NULL),
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL),
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
- createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 0, NULL, NULL),
+ createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
@@ -2859,6 +2859,7 @@ standardConfig configs[] = {
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
+ createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
diff --git a/src/networking.c b/src/networking.c
index 22d9c6812..57576032e 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -162,7 +162,7 @@ client *createClient(connection *conn) {
c->ctime = c->lastinteraction = server.unixtime;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
- c->repl_put_online_on_ack = 0;
+ c->repl_start_cmd_stream_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
@@ -225,7 +225,7 @@ void clientInstallWriteHandler(client *c) {
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
- (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
+ (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
@@ -3588,7 +3588,7 @@ void flushSlavesOutputBuffers(void) {
*/
if (slave->replstate == SLAVE_STATE_ONLINE &&
can_receive_writes &&
- !slave->repl_put_online_on_ack &&
+ !slave->repl_start_cmd_stream_on_ack &&
clientHasPendingReplies(slave))
{
writeToClient(slave,0);
diff --git a/src/replication.c b/src/replication.c
index 75472bf9e..e387a8fd4 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -44,7 +44,8 @@
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(connection *conn);
void replicationSendAck(void);
-void putSlaveOnline(client *slave);
+void replicaPutOnline(client *slave);
+void replicaStartCommandStream(client *slave);
int cancelReplicationHandshake(int reconnect);
/* We take a global flag to remember if this instance generated an RDB
@@ -768,7 +769,7 @@ int masterTryPartialResynchronization(client *c) {
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
- c->repl_put_online_on_ack = 0;
+ c->repl_start_cmd_stream_on_ack = 0;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
@@ -1183,8 +1184,8 @@ void replconfCommand(client *c) {
* quick check first (instead of waiting for the next ACK. */
if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
checkChildrenDone();
- if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
- putSlaveOnline(c);
+ if (c->repl_start_cmd_stream_on_ack && c->replstate == SLAVE_STATE_ONLINE)
+ replicaStartCommandStream(c);
/* Note: this command does not reply anything! */
return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
@@ -1238,37 +1239,20 @@ void replconfCommand(client *c) {
}
/* This function puts a replica in the online state, and should be called just
- * after a replica received the RDB file for the initial synchronization, and
- * we are finally ready to send the incremental stream of commands.
+ * after a replica received the RDB file for the initial synchronization.
*
* It does a few things:
- * 1) Close the replica's connection async if it doesn't need replication
- * commands buffer stream, since it actually isn't a valid replica.
- * 2) Put the slave in ONLINE state. Note that the function may also be called
- * for a replicas that are already in ONLINE state, but having the flag
- * repl_put_online_on_ack set to true: we still have to install the write
- * handler in that case. This function will take care of that.
- * 3) Make sure the writable event is re-installed, since calling the SYNC
- * command disables it, so that we can accumulate output buffer without
- * sending it to the replica.
- * 4) Update the count of "good replicas". */
-void putSlaveOnline(client *slave) {
- slave->replstate = SLAVE_STATE_ONLINE;
- slave->repl_put_online_on_ack = 0;
- slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
-
+ * 1) Put the slave in ONLINE state.
+ * 2) Update the count of "good replicas".
+ * 3) Trigger the module event. */
+void replicaPutOnline(client *slave) {
if (slave->flags & CLIENT_REPL_RDBONLY) {
- serverLog(LL_NOTICE,
- "Close the connection with replica %s as RDB transfer is complete",
- replicationGetSlaveName(slave));
- freeClientAsync(slave);
- return;
- }
- if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
- serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
- freeClient(slave);
return;
}
+
+ slave->replstate = SLAVE_STATE_ONLINE;
+ slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
+
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
@@ -1278,6 +1262,30 @@ void putSlaveOnline(client *slave) {
replicationGetSlaveName(slave));
}
+/* This function should be called just after a replica received the RDB file
+ * for the initial synchronization, and we are finally ready to send the
+ * incremental stream of commands.
+ *
+ * It does a few things:
+ * 1) Close the replica's connection async if it doesn't need replication
+ * commands buffer stream, since it actually isn't a valid replica.
+ * 2) Make sure the writable event is re-installed, since when calling the SYNC
+ * command we had no replies and it was disabled, and then we could
+ * accumulate output buffer data without sending it to the replica so it
+ * won't get mixed with the RDB stream. */
+void replicaStartCommandStream(client *slave) {
+ slave->repl_start_cmd_stream_on_ack = 0;
+ if (slave->flags & CLIENT_REPL_RDBONLY) {
+ serverLog(LL_NOTICE,
+ "Close the connection with replica %s as RDB transfer is complete",
+ replicationGetSlaveName(slave));
+ freeClientAsync(slave);
+ return;
+ }
+
+ clientInstallWriteHandler(slave);
+}
+
/* We call this function periodically to remove an RDB file that was
* generated because of replication, in an instance that is otherwise
* without any persistence. We don't want instances without persistence
@@ -1376,7 +1384,8 @@ void sendBulkToSlave(connection *conn) {
close(slave->repldbfd);
slave->repldbfd = -1;
connSetWriteHandler(slave->conn,NULL);
- putSlaveOnline(slave);
+ replicaPutOnline(slave);
+ replicaStartCommandStream(slave);
}
}
@@ -1583,9 +1592,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
* after such final EOF. So we don't want to glue the end of
* the RDB transfer with the start of the other replication
* data. */
- slave->replstate = SLAVE_STATE_ONLINE;
- slave->repl_put_online_on_ack = 1;
- slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
+ replicaPutOnline(slave);
+ slave->repl_start_cmd_stream_on_ack = 1;
} else {
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
@@ -3721,6 +3729,8 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) {
if (slaves_waiting &&
(!server.repl_diskless_sync ||
+ (server.repl_diskless_sync_max_replicas > 0 &&
+ slaves_waiting >= server.repl_diskless_sync_max_replicas) ||
max_idle >= server.repl_diskless_sync_delay))
{
if (mincapa_out)
diff --git a/src/server.h b/src/server.h
index b98883b33..2570302c3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1084,7 +1084,7 @@ typedef struct client {
uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
- int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
+ int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
@@ -1706,6 +1706,8 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
+ int repl_diskless_sync_max_replicas;/* Max replicas for diskless repl BGSAVE
+ * delay (start sooner if they all connect). */
size_t repl_buffer_mem; /* The memory of replication buffer. */
list *repl_buffer_blocks; /* Replication buffers blocks list
* (serving replica clients and repl backlog) */
@@ -2458,6 +2460,7 @@ void unprotectClient(client *c);
void initThreadedIO(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
+void clientInstallWriteHandler(client *c);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
diff --git a/tests/assets/default.conf b/tests/assets/default.conf
index ef15b1041..42297903f 100644
--- a/tests/assets/default.conf
+++ b/tests/assets/default.conf
@@ -11,6 +11,7 @@ loglevel verbose
logfile ''
databases 16
latency-monitor-threshold 1
+repl-diskless-sync-delay 0
save 900 1
save 300 10
diff --git a/tests/cluster/tests/05-slave-selection.tcl b/tests/cluster/tests/05-slave-selection.tcl
index 6769d3070..f0ce863c6 100644
--- a/tests/cluster/tests/05-slave-selection.tcl
+++ b/tests/cluster/tests/05-slave-selection.tcl
@@ -14,7 +14,11 @@ test "Cluster is up" {
}
test "The first master has actually two slaves" {
- assert {[llength [lindex [R 0 role] 2]] == 2}
+ wait_for_condition 1000 50 {
+ [llength [lindex [R 0 role] 2]] == 2
+ } else {
+ fail "replicas didn't connect"
+ }
}
test {Slaves of #0 are instance #5 and #10 as expected} {
@@ -106,7 +110,11 @@ test "Cluster is up" {
}
test "The first master has actually 5 slaves" {
- assert {[llength [lindex [R 0 role] 2]] == 5}
+ wait_for_condition 1000 50 {
+ [llength [lindex [R 0 role] 2]] == 5
+ } else {
+ fail "replicas didn't connect"
+ }
}
test {Slaves of #0 are instance #3, #6, #9, #12 and #15 as expected} {
diff --git a/tests/cluster/tests/06-slave-stop-cond.tcl b/tests/cluster/tests/06-slave-stop-cond.tcl
index f2e67050b..80a2d17c8 100644
--- a/tests/cluster/tests/06-slave-stop-cond.tcl
+++ b/tests/cluster/tests/06-slave-stop-cond.tcl
@@ -14,7 +14,11 @@ test "Cluster is up" {
}
test "The first master has actually one slave" {
- assert {[llength [lindex [R 0 role] 2]] == 1}
+ wait_for_condition 1000 50 {
+ [llength [lindex [R 0 role] 2]] == 1
+ } else {
+ fail "replicas didn't connect"
+ }
}
test {Slaves of #0 is instance #5 as expected} {
diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl
index 98a49dbcd..e3b9a1918 100644
--- a/tests/cluster/tests/14-consistency-check.tcl
+++ b/tests/cluster/tests/14-consistency-check.tcl
@@ -18,12 +18,18 @@ proc find_non_empty_master {} {
foreach_redis_id id {
if {[RI $id role] eq {master} && [R $id dbsize] > 0} {
set master_id_no $id
+ break
}
}
return $master_id_no
}
proc get_one_of_my_replica {id} {
+ wait_for_condition 1000 50 {
+ [llength [lindex [R $id role] 2]] > 0
+ } else {
+ fail "replicas didn't connect"
+ }
set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1]
set replica_id_num [get_instance_id_by_port redis $replica_port]
return $replica_id_num
diff --git a/tests/instances.tcl b/tests/instances.tcl
index d423291ac..1d2cbed49 100644
--- a/tests/instances.tcl
+++ b/tests/instances.tcl
@@ -99,6 +99,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
} else {
puts $cfg "port $port"
}
+ puts $cfg "repl-diskless-sync-delay 0"
puts $cfg "dir ./$dirname"
puts $cfg "logfile log.txt"
# Add additional config files
diff --git a/tests/integration/replication-2.tcl b/tests/integration/replication-2.tcl
index 9a1a1d1d7..f9f259211 100644
--- a/tests/integration/replication-2.tcl
+++ b/tests/integration/replication-2.tcl
@@ -2,6 +2,7 @@ start_server {tags {"repl external:skip"}} {
start_server {} {
test {First server should have role slave after SLAVEOF} {
r -1 slaveof [srv 0 host] [srv 0 port]
+ wait_replica_online r
wait_for_condition 50 100 {
[s -1 master_link_status] eq {up}
} else {
diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl
index 00c5d8ae0..2d3d93222 100644
--- a/tests/integration/replication-4.tcl
+++ b/tests/integration/replication-4.tcl
@@ -48,11 +48,7 @@ start_server {tags {"repl external:skip"}} {
test {First server should have role slave after SLAVEOF} {
$slave slaveof $master_host $master_port
- wait_for_condition 50 100 {
- [s 0 master_link_status] eq {up}
- } else {
- fail "Replication not started."
- }
+ wait_replica_online $master
}
test {With min-slaves-to-write (1,3): master should be writable} {
diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl
index 5d9669f75..07e2cbc1b 100644
--- a/tests/integration/replication-buffer.tcl
+++ b/tests/integration/replication-buffer.tcl
@@ -15,6 +15,8 @@ start_server {} {
$master config set save ""
$master config set repl-backlog-size 16384
+ $master config set repl-diskless-sync-delay 5
+ $master config set repl-diskless-sync-max-replicas 1
$master config set client-output-buffer-limit "replica 0 0 0"
# Make sure replica3 is synchronized with master
@@ -26,6 +28,7 @@ start_server {} {
populate 100 "" 16
# Make sure replica1 and replica2 are waiting bgsave
+ $master config set repl-diskless-sync-max-replicas 2
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 100 {
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index f2f1fe7e9..2af07e806 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -73,7 +73,7 @@ start_server {tags {"repl external:skip"}} {
test {INCRBYFLOAT replication, should not remove expire} {
r set test 1 EX 100
r incrbyfloat test 0.1
- after 1000
+ wait_for_ofs_sync $A $B
assert_equal [$A debug digest] [$B debug digest]
}
@@ -255,7 +255,8 @@ foreach mdl {no yes} {
start_server {tags {"repl external:skip"}} {
set master [srv 0 client]
$master config set repl-diskless-sync $mdl
- $master config set repl-diskless-sync-delay 1
+ $master config set repl-diskless-sync-delay 5
+ $master config set repl-diskless-sync-max-replicas 3
set master_host [srv 0 host]
set master_port [srv 0 port]
set slaves {}
@@ -780,7 +781,8 @@ proc compute_cpu_usage {start end} {
start_server {tags {"repl external:skip"}} {
set master [srv 0 client]
$master config set repl-diskless-sync yes
- $master config set repl-diskless-sync-delay 1
+ $master config set repl-diskless-sync-delay 5
+ $master config set repl-diskless-sync-max-replicas 2
set master_host [srv 0 host]
set master_port [srv 0 port]
set master_pid [srv 0 pid]
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 24199077d..5fc319254 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -118,6 +118,14 @@ proc wait_for_sync r {
}
}
+proc wait_replica_online r {
+ wait_for_condition 50 100 {
+ [string match "*slave0:*,state=online*" [$r info replication]]
+ } else {
+ fail "replica didn't sync in time"
+ }
+}
+
proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl
index 17570d00f..cb36c9f71 100644
--- a/tests/unit/moduleapi/hooks.tcl
+++ b/tests/unit/moduleapi/hooks.tcl
@@ -92,11 +92,7 @@ tags "modules" {
set replica_port [srv 0 port]
$replica replicaof $master_host $master_port
- wait_for_condition 50 100 {
- [string match {*master_link_status:up*} [r info replication]]
- } else {
- fail "Can't turn the instance into a replica"
- }
+ wait_replica_online $master
test {Test master link up hook} {
assert_equal [r hooks.event_count masterlink-up] 1