diff options
-rw-r--r-- | redis.conf | 8 | ||||
-rw-r--r-- | src/config.c | 3 | ||||
-rw-r--r-- | src/networking.c | 6 | ||||
-rw-r--r-- | src/replication.c | 78 | ||||
-rw-r--r-- | src/server.h | 5 | ||||
-rw-r--r-- | tests/assets/default.conf | 1 | ||||
-rw-r--r-- | tests/cluster/tests/05-slave-selection.tcl | 12 | ||||
-rw-r--r-- | tests/cluster/tests/06-slave-stop-cond.tcl | 6 | ||||
-rw-r--r-- | tests/cluster/tests/14-consistency-check.tcl | 6 | ||||
-rw-r--r-- | tests/instances.tcl | 1 | ||||
-rw-r--r-- | tests/integration/replication-2.tcl | 1 | ||||
-rw-r--r-- | tests/integration/replication-4.tcl | 6 | ||||
-rw-r--r-- | tests/integration/replication-buffer.tcl | 3 | ||||
-rw-r--r-- | tests/integration/replication.tcl | 8 | ||||
-rw-r--r-- | tests/support/util.tcl | 8 | ||||
-rw-r--r-- | tests/unit/moduleapi/hooks.tcl | 6 |
16 files changed, 102 insertions, 56 deletions
diff --git a/redis.conf b/redis.conf index 0753f3166..ca3d4d6ad 100644 --- a/redis.conf +++ b/redis.conf @@ -591,7 +591,7 @@ replica-read-only yes # # With slow disks and fast (large bandwidth) networks, diskless replication # works better. -repl-diskless-sync no +repl-diskless-sync yes # When diskless replication is enabled, it is possible to configure the delay # the server waits in order to spawn the child that transfers the RDB via socket @@ -605,6 +605,12 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 +# When diskless replication is enabled with a delay, it is possible to let +# the replication start before the maximum delay is reached if the maximum +# number of replicas expected have connected. Default of 0 means that the +# maximum is not defined and Redis will wait the full delay. +repl-diskless-sync-max-replicas 0 + # ----------------------------------------------------------------------------- # WARNING: RDB diskless load is experimental. Since in this setup the replica # does not immediately store an RDB on disk, it may cause data loss during diff --git a/src/config.c b/src/config.c index f206bcc8c..d11445775 100644 --- a/src/config.c +++ b/src/config.c @@ -2752,7 +2752,7 @@ standardConfig configs[] = { createBoolConfig("lazyfree-lazy-user-del", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_del , 0, NULL, NULL), createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL), createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL), - createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 0, NULL, NULL), + createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL), createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL), createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL), createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL), @@ -2859,6 +2859,7 @@ standardConfig configs[] = { createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod), createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL), + createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/networking.c b/src/networking.c index 22d9c6812..57576032e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -162,7 +162,7 @@ client *createClient(connection *conn) { c->ctime = c->lastinteraction = server.unixtime; clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; - c->repl_put_online_on_ack = 0; + c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; c->read_reploff = 0; c->repl_ack_off = 0; @@ -225,7 +225,7 @@ void clientInstallWriteHandler(client *c) { * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || - (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) + (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something @@ -3588,7 +3588,7 @@ void flushSlavesOutputBuffers(void) { */ if (slave->replstate == SLAVE_STATE_ONLINE && can_receive_writes && - !slave->repl_put_online_on_ack && + !slave->repl_start_cmd_stream_on_ack && clientHasPendingReplies(slave)) { writeToClient(slave,0); diff --git a/src/replication.c b/src/replication.c index 75472bf9e..e387a8fd4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -44,7 +44,8 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); -void putSlaveOnline(client *slave); +void replicaPutOnline(client *slave); +void replicaStartCommandStream(client *slave); int cancelReplicationHandshake(int reconnect); /* We take a global flag to remember if this instance generated an RDB @@ -768,7 +769,7 @@ int masterTryPartialResynchronization(client *c) { c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; - c->repl_put_online_on_ack = 0; + c->repl_start_cmd_stream_on_ack = 0; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is @@ -1183,8 +1184,8 @@ void replconfCommand(client *c) { * quick check first (instead of waiting for the next ACK. */ if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END) checkChildrenDone(); - if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) - putSlaveOnline(c); + if (c->repl_start_cmd_stream_on_ack && c->replstate == SLAVE_STATE_ONLINE) + replicaStartCommandStream(c); /* Note: this command does not reply anything! */ return; } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { @@ -1238,37 +1239,20 @@ void replconfCommand(client *c) { } /* This function puts a replica in the online state, and should be called just - * after a replica received the RDB file for the initial synchronization, and - * we are finally ready to send the incremental stream of commands. + * after a replica received the RDB file for the initial synchronization. * * It does a few things: - * 1) Close the replica's connection async if it doesn't need replication - * commands buffer stream, since it actually isn't a valid replica. - * 2) Put the slave in ONLINE state. Note that the function may also be called - * for a replicas that are already in ONLINE state, but having the flag - * repl_put_online_on_ack set to true: we still have to install the write - * handler in that case. This function will take care of that. - * 3) Make sure the writable event is re-installed, since calling the SYNC - * command disables it, so that we can accumulate output buffer without - * sending it to the replica. - * 4) Update the count of "good replicas". */ -void putSlaveOnline(client *slave) { - slave->replstate = SLAVE_STATE_ONLINE; - slave->repl_put_online_on_ack = 0; - slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ - + * 1) Put the slave in ONLINE state. + * 2) Update the count of "good replicas". + * 3) Trigger the module event. */ +void replicaPutOnline(client *slave) { if (slave->flags & CLIENT_REPL_RDBONLY) { - serverLog(LL_NOTICE, - "Close the connection with replica %s as RDB transfer is complete", - replicationGetSlaveName(slave)); - freeClientAsync(slave); - return; - } - if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) { - serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); - freeClient(slave); return; } + + slave->replstate = SLAVE_STATE_ONLINE; + slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ + refreshGoodSlavesCount(); /* Fire the replica change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, @@ -1278,6 +1262,30 @@ void putSlaveOnline(client *slave) { replicationGetSlaveName(slave)); } +/* This function should be called just after a replica received the RDB file + * for the initial synchronization, and we are finally ready to send the + * incremental stream of commands. + * + * It does a few things: + * 1) Close the replica's connection async if it doesn't need replication + * commands buffer stream, since it actually isn't a valid replica. + * 2) Make sure the writable event is re-installed, since when calling the SYNC + * command we had no replies and it was disabled, and then we could + * accumulate output buffer data without sending it to the replica so it + * won't get mixed with the RDB stream. */ +void replicaStartCommandStream(client *slave) { + slave->repl_start_cmd_stream_on_ack = 0; + if (slave->flags & CLIENT_REPL_RDBONLY) { + serverLog(LL_NOTICE, + "Close the connection with replica %s as RDB transfer is complete", + replicationGetSlaveName(slave)); + freeClientAsync(slave); + return; + } + + clientInstallWriteHandler(slave); +} + /* We call this function periodically to remove an RDB file that was * generated because of replication, in an instance that is otherwise * without any persistence. We don't want instances without persistence @@ -1376,7 +1384,8 @@ void sendBulkToSlave(connection *conn) { close(slave->repldbfd); slave->repldbfd = -1; connSetWriteHandler(slave->conn,NULL); - putSlaveOnline(slave); + replicaPutOnline(slave); + replicaStartCommandStream(slave); } } @@ -1583,9 +1592,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { * after such final EOF. So we don't want to glue the end of * the RDB transfer with the start of the other replication * data. */ - slave->replstate = SLAVE_STATE_ONLINE; - slave->repl_put_online_on_ack = 1; - slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ + replicaPutOnline(slave); + slave->repl_start_cmd_stream_on_ack = 1; } else { if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { @@ -3721,6 +3729,8 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { if (slaves_waiting && (!server.repl_diskless_sync || + (server.repl_diskless_sync_max_replicas > 0 && + slaves_waiting >= server.repl_diskless_sync_max_replicas) || max_idle >= server.repl_diskless_sync_delay)) { if (mincapa_out) diff --git a/src/server.h b/src/server.h index b98883b33..2570302c3 100644 --- a/src/server.h +++ b/src/server.h @@ -1084,7 +1084,7 @@ typedef struct client { uint64_t flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ - int repl_put_online_on_ack; /* Install slave write handler on first ACK. */ + int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ @@ -1706,6 +1706,8 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + int repl_diskless_sync_max_replicas;/* Max replicas for diskless repl BGSAVE + * delay (start sooner if they all connect). */ size_t repl_buffer_mem; /* The memory of replication buffer. */ list *repl_buffer_blocks; /* Replication buffers blocks list * (serving replica clients and repl backlog) */ @@ -2458,6 +2460,7 @@ void unprotectClient(client *c); void initThreadedIO(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); +void clientInstallWriteHandler(client *c); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...) diff --git a/tests/assets/default.conf b/tests/assets/default.conf index ef15b1041..42297903f 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -11,6 +11,7 @@ loglevel verbose logfile '' databases 16 latency-monitor-threshold 1 +repl-diskless-sync-delay 0 save 900 1 save 300 10 diff --git a/tests/cluster/tests/05-slave-selection.tcl b/tests/cluster/tests/05-slave-selection.tcl index 6769d3070..f0ce863c6 100644 --- a/tests/cluster/tests/05-slave-selection.tcl +++ b/tests/cluster/tests/05-slave-selection.tcl @@ -14,7 +14,11 @@ test "Cluster is up" { } test "The first master has actually two slaves" { - assert {[llength [lindex [R 0 role] 2]] == 2} + wait_for_condition 1000 50 { + [llength [lindex [R 0 role] 2]] == 2 + } else { + fail "replicas didn't connect" + } } test {Slaves of #0 are instance #5 and #10 as expected} { @@ -106,7 +110,11 @@ test "Cluster is up" { } test "The first master has actually 5 slaves" { - assert {[llength [lindex [R 0 role] 2]] == 5} + wait_for_condition 1000 50 { + [llength [lindex [R 0 role] 2]] == 5 + } else { + fail "replicas didn't connect" + } } test {Slaves of #0 are instance #3, #6, #9, #12 and #15 as expected} { diff --git a/tests/cluster/tests/06-slave-stop-cond.tcl b/tests/cluster/tests/06-slave-stop-cond.tcl index f2e67050b..80a2d17c8 100644 --- a/tests/cluster/tests/06-slave-stop-cond.tcl +++ b/tests/cluster/tests/06-slave-stop-cond.tcl @@ -14,7 +14,11 @@ test "Cluster is up" { } test "The first master has actually one slave" { - assert {[llength [lindex [R 0 role] 2]] == 1} + wait_for_condition 1000 50 { + [llength [lindex [R 0 role] 2]] == 1 + } else { + fail "replicas didn't connect" + } } test {Slaves of #0 is instance #5 as expected} { diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl index 98a49dbcd..e3b9a1918 100644 --- a/tests/cluster/tests/14-consistency-check.tcl +++ b/tests/cluster/tests/14-consistency-check.tcl @@ -18,12 +18,18 @@ proc find_non_empty_master {} { foreach_redis_id id { if {[RI $id role] eq {master} && [R $id dbsize] > 0} { set master_id_no $id + break } } return $master_id_no } proc get_one_of_my_replica {id} { + wait_for_condition 1000 50 { + [llength [lindex [R $id role] 2]] > 0 + } else { + fail "replicas didn't connect" + } set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1] set replica_id_num [get_instance_id_by_port redis $replica_port] return $replica_id_num diff --git a/tests/instances.tcl b/tests/instances.tcl index d423291ac..1d2cbed49 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -99,6 +99,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} { } else { puts $cfg "port $port" } + puts $cfg "repl-diskless-sync-delay 0" puts $cfg "dir ./$dirname" puts $cfg "logfile log.txt" # Add additional config files diff --git a/tests/integration/replication-2.tcl b/tests/integration/replication-2.tcl index 9a1a1d1d7..f9f259211 100644 --- a/tests/integration/replication-2.tcl +++ b/tests/integration/replication-2.tcl @@ -2,6 +2,7 @@ start_server {tags {"repl external:skip"}} { start_server {} { test {First server should have role slave after SLAVEOF} { r -1 slaveof [srv 0 host] [srv 0 port] + wait_replica_online r wait_for_condition 50 100 { [s -1 master_link_status] eq {up} } else { diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 00c5d8ae0..2d3d93222 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -48,11 +48,7 @@ start_server {tags {"repl external:skip"}} { test {First server should have role slave after SLAVEOF} { $slave slaveof $master_host $master_port - wait_for_condition 50 100 { - [s 0 master_link_status] eq {up} - } else { - fail "Replication not started." - } + wait_replica_online $master } test {With min-slaves-to-write (1,3): master should be writable} { diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 5d9669f75..07e2cbc1b 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -15,6 +15,8 @@ start_server {} { $master config set save "" $master config set repl-backlog-size 16384 + $master config set repl-diskless-sync-delay 5 + $master config set repl-diskless-sync-max-replicas 1 $master config set client-output-buffer-limit "replica 0 0 0" # Make sure replica3 is synchronized with master @@ -26,6 +28,7 @@ start_server {} { populate 100 "" 16 # Make sure replica1 and replica2 are waiting bgsave + $master config set repl-diskless-sync-max-replicas 2 $replica1 replicaof $master_host $master_port $replica2 replicaof $master_host $master_port wait_for_condition 50 100 { diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index f2f1fe7e9..2af07e806 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -73,7 +73,7 @@ start_server {tags {"repl external:skip"}} { test {INCRBYFLOAT replication, should not remove expire} { r set test 1 EX 100 r incrbyfloat test 0.1 - after 1000 + wait_for_ofs_sync $A $B assert_equal [$A debug digest] [$B debug digest] } @@ -255,7 +255,8 @@ foreach mdl {no yes} { start_server {tags {"repl external:skip"}} { set master [srv 0 client] $master config set repl-diskless-sync $mdl - $master config set repl-diskless-sync-delay 1 + $master config set repl-diskless-sync-delay 5 + $master config set repl-diskless-sync-max-replicas 3 set master_host [srv 0 host] set master_port [srv 0 port] set slaves {} @@ -780,7 +781,8 @@ proc compute_cpu_usage {start end} { start_server {tags {"repl external:skip"}} { set master [srv 0 client] $master config set repl-diskless-sync yes - $master config set repl-diskless-sync-delay 1 + $master config set repl-diskless-sync-delay 5 + $master config set repl-diskless-sync-max-replicas 2 set master_host [srv 0 host] set master_port [srv 0 port] set master_pid [srv 0 pid] diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 24199077d..5fc319254 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -118,6 +118,14 @@ proc wait_for_sync r { } } +proc wait_replica_online r { + wait_for_condition 50 100 { + [string match "*slave0:*,state=online*" [$r info replication]] + } else { + fail "replica didn't sync in time" + } +} + proc wait_for_ofs_sync {r1 r2} { wait_for_condition 50 100 { [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index 17570d00f..cb36c9f71 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -92,11 +92,7 @@ tags "modules" { set replica_port [srv 0 port] $replica replicaof $master_host $master_port - wait_for_condition 50 100 { - [string match {*master_link_status:up*} [r info replication]] - } else { - fail "Can't turn the instance into a replica" - } + wait_replica_online $master test {Test master link up hook} { assert_equal [r hooks.event_count masterlink-up] 1 |