summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-08-11 16:07:53 +0300
committerYossi Gottlieb <yossigo@gmail.com>2019-10-07 21:06:30 +0300
commit5a477946065bcf05b335ededd6b794e82882ab73 (patch)
tree53dee2990f0d86e042f979322d998c25ade4879d
parentb087dd1db60ed23d9e59304deb0b1599437f6e23 (diff)
downloadredis-5a477946065bcf05b335ededd6b794e82882ab73.tar.gz
diskless replication rdb transfer uses pipe, and writes to sockets form the parent process.
misc: - handle SSL_has_pending by iterating though these in beforeSleep, and setting timeout of 0 to aeProcessEvents - fix issue with epoll signaling EPOLLHUP and EPOLLERR only to the write handlers. (needed to detect the rdb pipe was closed) - add key-load-delay config for testing - trim connShutdown which is no longer needed - rioFdsetWrite -> rioFdWrite - simplified since there's no longer need to write to multiple FDs - don't detect rdb child exited (don't call wait3) until we detect the pipe is closed - Cleanup bad optimization from rio.c, add another one
-rw-r--r--TLS.md21
-rw-r--r--src/ae.c14
-rw-r--r--src/ae.h2
-rw-r--r--src/ae_epoll.c4
-rw-r--r--src/aof.c2
-rw-r--r--src/config.c10
-rw-r--r--src/connection.c5
-rw-r--r--src/connection.h9
-rw-r--r--src/networking.c27
-rw-r--r--src/rdb.c192
-rw-r--r--src/replication.c150
-rw-r--r--src/rio.c142
-rw-r--r--src/rio.h14
-rw-r--r--src/server.c85
-rw-r--r--src/server.h15
-rw-r--r--src/tls.c71
-rw-r--r--tests/integration/replication.tcl170
17 files changed, 580 insertions, 353 deletions
diff --git a/TLS.md b/TLS.md
index ee24a8df5..90ed08e7c 100644
--- a/TLS.md
+++ b/TLS.md
@@ -81,23 +81,6 @@ implementation details between TLS and TCP.
difficult, but there are probably other good reasons to improve that part
anyway.
-5. A mechanism to re-trigger read callbacks for connections with unread buffers
- (the case of reading partial TLS frames):
-
- a) Before sleep should iterate connections looking for those with a read handler,
- SSL_pending() != 0 and no read event.
- b) If found, trigger read handler for these conns.
- c) After iteration if this state persists, epoll should be called in a way
- that won't block so the process continues and this behave the same as a
- level trigerred epoll.
-
-Replication
------------
-
-Diskless master replication is broken, until child/parent connection proxying is
-implemented.
-
-
TLS Features
------------
@@ -119,6 +102,10 @@ most actions.
This will need to be cleaned up for proper TLS support. The best approach is
probably to migrate to hiredis async mode.
+redis-cli
+---------
+1. Support tls in --slave and --rdb
+
Others
------
diff --git a/src/ae.c b/src/ae.c
index 53629ef77..54c0d994e 100644
--- a/src/ae.c
+++ b/src/ae.c
@@ -76,6 +76,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
+ eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
@@ -97,6 +98,14 @@ int aeGetSetSize(aeEventLoop *eventLoop) {
return eventLoop->setsize;
}
+/* Tells the next iteration/s of the event processing to set timeout of 0. */
+void aeDontWait(aeEventLoop *eventLoop, int noWait) {
+ if (noWait)
+ eventLoop->flags |= AE_DONT_WAIT;
+ else
+ eventLoop->flags &= ~AE_DONT_WAIT;
+}
+
/* Resize the maximum set size of the event loop.
* If the requested set size is smaller than the current set size, but
* there is already a file descriptor in use that is >= the requested
@@ -406,6 +415,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
}
}
+ if (eventLoop->flags & AE_DONT_WAIT) {
+ tv.tv_sec = tv.tv_usec = 0;
+ tvp = &tv;
+ }
+
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
diff --git a/src/ae.h b/src/ae.h
index 184fe3d1b..bbb43fe6e 100644
--- a/src/ae.h
+++ b/src/ae.h
@@ -106,6 +106,7 @@ typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
+ int flags;
} aeEventLoop;
/* Prototypes */
@@ -128,5 +129,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
+void aeDontWait(aeEventLoop *eventLoop, int noWait);
#endif
diff --git a/src/ae_epoll.c b/src/ae_epoll.c
index 410aac70d..fa197297e 100644
--- a/src/ae_epoll.c
+++ b/src/ae_epoll.c
@@ -121,8 +121,8 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
- if (e->events & EPOLLERR) mask |= AE_WRITABLE;
- if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
+ if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
+ if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
diff --git a/src/aof.c b/src/aof.c
index eed994bf2..ef34097ed 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -836,6 +836,8 @@ int loadAppendOnlyFile(char *filename) {
freeFakeClientArgv(fakeClient);
fakeClient->cmd = NULL;
if (server.aof_load_truncated) valid_up_to = ftello(fp);
+ if (server.key_load_delay)
+ usleep(server.key_load_delay);
}
/* This point can only be reached when EOF is reached without errors.
diff --git a/src/config.c b/src/config.c
index 456fb0226..792e45057 100644
--- a/src/config.c
+++ b/src/config.c
@@ -522,6 +522,12 @@ void loadServerConfigFromString(char *config) {
err = "rdb-key-save-delay can't be negative";
goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"key-load-delay") && argc==2) {
+ server.key_load_delay = atoi(argv[1]);
+ if (server.key_load_delay < 0) {
+ err = "key-load-delay can't be negative";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
@@ -1192,6 +1198,8 @@ void configSetCommand(client *c) {
} config_set_numerical_field(
"rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) {
} config_set_numerical_field(
+ "key-load-delay",server.key_load_delay,0,LLONG_MAX) {
+ } config_set_numerical_field(
"slave-announce-port",server.slave_announce_port,0,65535) {
} config_set_numerical_field(
"replica-announce-port",server.slave_announce_port,0,65535) {
@@ -1452,6 +1460,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay);
+ config_get_numerical_field("key-load-delay",server.key_load_delay);
config_get_numerical_field("tcp-keepalive",server.tcpkeepalive);
/* Bool (yes/no) values */
@@ -2272,6 +2281,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY);
+ rewriteConfigNumericalOption(state,"key-load-delay",server.key_load_delay,CONFIG_DEFAULT_KEY_LOAD_DELAY);
rewriteConfigStringOption(state,"tls-cert-file",server.tls_cert_file,NULL);
rewriteConfigStringOption(state,"tls-key-file",server.tls_key_file,NULL);
rewriteConfigStringOption(state,"tls-dh-params-file",server.tls_dh_params_file,NULL);
diff --git a/src/connection.c b/src/connection.c
index 62c6f3506..601f175bc 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -140,10 +140,6 @@ void *connGetPrivateData(connection *conn) {
* move here as we implement additional connection types.
*/
-static int connSocketShutdown(connection *conn, int how) {
- return shutdown(conn->fd, how);
-}
-
/* Close the connection and free resources. */
static void connSocketClose(connection *conn) {
if (conn->fd != -1) {
@@ -298,7 +294,6 @@ static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size,
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
- .shutdown = connSocketShutdown,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
diff --git a/src/connection.h b/src/connection.h
index e3e844f95..3fdcddebe 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -55,7 +55,6 @@ typedef struct ConnectionType {
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
- int (*shutdown)(struct connection *conn, int how);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
@@ -159,10 +158,6 @@ static inline void connClose(connection *conn) {
conn->type->close(conn);
}
-static inline int connShutdown(connection *conn, int how) {
- return conn->type->shutdown(conn, how);
-}
-
/* Returns the last error encountered by the connection, as a string. If no error,
* a NULL is returned.
*/
@@ -208,4 +203,8 @@ int connFormatPeer(connection *conn, char *buf, size_t buf_len);
int connSockName(connection *conn, char *ip, size_t ip_len, int *port);
const char *connGetInfo(connection *conn, char *buf, size_t buf_len);
+/* Helpers for tls special considerations */
+int tlsHasPendingData();
+void tlsProcessPendingData();
+
#endif /* __REDIS_CONNECTION_H */
diff --git a/src/networking.c b/src/networking.c
index 2d00b0e74..d52eb1eba 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -881,7 +881,7 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) {
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
- connClose(conn);
+ freeClient(connGetPrivateData(conn));
return;
}
}
@@ -984,14 +984,21 @@ void unlinkClient(client *c) {
c->client_list_node = NULL;
}
- /* In the case of diskless replication the fork is writing to the
- * sockets and just closing the fd isn't enough, if we don't also
- * shutdown the socket the fork will continue to write to the slave
- * and the salve will only find out that it was disconnected when
- * it will finish reading the rdb. */
- int need_shutdown = ((c->flags & CLIENT_SLAVE) &&
- (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END));
- if (need_shutdown) connShutdown(c->conn, SHUT_RDWR);
+ /* Check if this is a replica waiting for diskless replication (rdb pipe),
+ * in which case it needs to be cleaned from that list */
+ if (c->flags & CLIENT_SLAVE &&
+ c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
+ server.rdb_pipe_conns)
+ {
+ int i;
+ for (i=0; i < server.rdb_pipe_numconns; i++) {
+ if (server.rdb_pipe_conns[i] == c->conn) {
+ rdbPipeWriteHandlerConnRemoved(c->conn);
+ server.rdb_pipe_conns[i] = NULL;
+ break;
+ }
+ }
+ }
connClose(c->conn);
c->conn = NULL;
}
@@ -1309,7 +1316,7 @@ int handleClientsWithPendingWrites(void) {
{
ae_flags |= AE_BARRIER;
}
- /* TODO: Handle write barriers in connection */
+ /* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
freeClientAsync(c);
}
diff --git a/src/rdb.c b/src/rdb.c
index 3b98f57bb..59f22916b 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -2211,6 +2211,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* own reference. */
decrRefCount(key);
}
+ if (server.key_load_delay)
+ usleep(server.key_load_delay);
/* Reset the state that is key-specified and is populated by
* opcodes before the key, so that we start from scratch again. */
@@ -2306,8 +2308,6 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
* This function covers the case of RDB -> Salves socket transfers for
* diskless replication. */
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
- uint64_t *ok_slaves;
-
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE,
"Background RDB transfer terminated with success");
@@ -2321,79 +2321,6 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_save_time_start = -1;
- /* If the child returns an OK exit code, read the set of slave client
- * IDs and the associated status code. We'll terminate all the slaves
- * in error state.
- *
- * If the process returned an error, consider the list of slaves that
- * can continue to be empty, so that it's just a special case of the
- * normal code path. */
- ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
- ok_slaves[0] = 0;
- if (!bysignal && exitcode == 0) {
- int readlen = sizeof(uint64_t);
-
- if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
- readlen)
- {
- readlen = ok_slaves[0]*sizeof(uint64_t)*2;
-
- /* Make space for enough elements as specified by the first
- * uint64_t element in the array. */
- ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
- if (readlen &&
- read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
- readlen) != readlen)
- {
- ok_slaves[0] = 0;
- }
- }
- }
-
- close(server.rdb_pipe_read_result_from_child);
- close(server.rdb_pipe_write_result_to_parent);
-
- /* We can continue the replication process with all the slaves that
- * correctly received the full payload. Others are terminated. */
- listNode *ln;
- listIter li;
-
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
- uint64_t j;
- int errorcode = 0;
-
- /* Search for the slave ID in the reply. In order for a slave to
- * continue the replication process, we need to find it in the list,
- * and it must have an error code set to 0 (which means success). */
- for (j = 0; j < ok_slaves[0]; j++) {
- if (slave->id == ok_slaves[2*j+1]) {
- errorcode = ok_slaves[2*j+2];
- break; /* Found in slaves list. */
- }
- }
- if (j == ok_slaves[0] || errorcode != 0) {
- serverLog(LL_WARNING,
- "Closing slave %s: child->slave RDB transfer failed: %s",
- replicationGetSlaveName(slave),
- (errorcode == 0) ? "RDB transfer child aborted"
- : strerror(errorcode));
- freeClient(slave);
- } else {
- serverLog(LL_WARNING,
- "Slave %s correctly received the streamed RDB file.",
- replicationGetSlaveName(slave));
- /* Restore the socket as non-blocking. */
- connNonBlock(slave->conn);
- connSendTimeout(slave->conn,0);
- }
- }
- }
- zfree(ok_slaves);
-
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
}
@@ -2425,9 +2352,6 @@ void killRDBChild(void) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
- connection **conns;
- uint64_t *clientids;
- int numconns;
listNode *ln;
listIter li;
pid_t childpid;
@@ -2436,35 +2360,30 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
- /* Before to fork, create a pipe that will be used in order to
- * send back to the parent the IDs of the slaves that successfully
- * received all the writes. */
+ /* Even if the previous fork child exited, don't start a new one until we
+ * drained the pipe. */
+ if (server.rdb_pipe_conns) return C_ERR;
+
+ /* Before to fork, create a pipe that is used to transfer the rdb bytes to
+ * the parant, we can't let it write directly to the sockets, since in case
+ * of TLS we must let the parent handle a contineous TLS state when the
+ * child terminates and parent takes over. */
if (pipe(pipefds) == -1) return C_ERR;
- server.rdb_pipe_read_result_from_child = pipefds[0];
- server.rdb_pipe_write_result_to_parent = pipefds[1];
+ server.rdb_pipe_read = pipefds[0];
+ server.rdb_pipe_write = pipefds[1];
+ anetNonBlock(NULL, server.rdb_pipe_read);
- /* Collect the file descriptors of the slaves we want to transfer
+ /* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
- conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
- /* We also allocate an array of corresponding client IDs. This will
- * be useful for the child process in order to build the report
- * (sent via unix pipe) that will be sent to the parent. */
- clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
- numconns = 0;
-
+ server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
+ server.rdb_pipe_numconns = 0;
+ server.rdb_pipe_numconns_writing = 0;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
-
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
- clientids[numconns] = slave->id;
- conns[numconns++] = slave->conn;
+ server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
- /* Put the socket in blocking mode to simplify RDB transfer.
- * We'll restore it when the children returns (since duped socket
- * will share the O_NONBLOCK attribute with the parent). */
- connBlock(slave->conn);
- connSendTimeout(slave->conn,server.repl_timeout*1000);
}
}
@@ -2474,16 +2393,15 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
if ((childpid = fork()) == 0) {
/* Child */
int retval;
- rio slave_sockets;
+ rio rdb;
- rioInitWithConnset(&slave_sockets,conns,numconns);
- zfree(conns);
+ rioInitWithFd(&rdb,server.rdb_pipe_write);
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");
- retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
- if (retval == C_OK && rioFlush(&slave_sockets) == 0)
+ retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
+ if (retval == C_OK && rioFlush(&rdb) == 0)
retval = C_ERR;
if (retval == C_OK) {
@@ -2497,48 +2415,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
-
- /* If we are returning OK, at least one slave was served
- * with the RDB file as expected, so we need to send a report
- * to the parent via the pipe. The format of the message is:
- *
- * <len> <slave[0].id> <slave[0].error> ...
- *
- * len, slave IDs, and slave errors, are all uint64_t integers,
- * so basically the reply is composed of 64 bits for the len field
- * plus 2 additional 64 bit integers for each entry, for a total
- * of 'len' entries.
- *
- * The 'id' represents the slave's client ID, so that the master
- * can match the report with a specific slave, and 'error' is
- * set to 0 if the replication process terminated with a success
- * or the error code if an error occurred. */
- void *msg = zmalloc(sizeof(uint64_t)*(1+2*numconns));
- uint64_t *len = msg;
- uint64_t *ids = len+1;
- int j, msglen;
-
- *len = numconns;
- for (j = 0; j < numconns; j++) {
- *ids++ = clientids[j];
- *ids++ = slave_sockets.io.connset.state[j];
- }
-
- /* Write the message to the parent. If we have no good slaves or
- * we are unable to transfer the message to the parent, we exit
- * with an error so that the parent will abort the replication
- * process with all the childre that were waiting. */
- msglen = sizeof(uint64_t)*(1+2*numconns);
- if (*len == 0 ||
- write(server.rdb_pipe_write_result_to_parent,msg,msglen)
- != msglen)
- {
- retval = C_ERR;
- }
- zfree(msg);
}
- zfree(clientids);
- rioFreeConnset(&slave_sockets);
+ rioFreeFd(&rdb);
+ close(server.rdb_pipe_write); /* wake up the reader, tell it we're done. */
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
@@ -2552,17 +2431,16 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
- int j;
-
- for (j = 0; j < numconns; j++) {
- if (slave->id == clientids[j]) {
- slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
- break;
- }
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
+ slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
- close(pipefds[0]);
- close(pipefds[1]);
+ close(server.rdb_pipe_write);
+ close(server.rdb_pipe_read);
+ zfree(server.rdb_pipe_conns);
+ server.rdb_pipe_conns = NULL;
+ server.rdb_pipe_numconns = 0;
+ server.rdb_pipe_numconns_writing = 0;
closeChildInfoPipe();
} else {
server.stat_fork_time = ustime()-start;
@@ -2574,10 +2452,12 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
+ close(server.rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
+ if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
+ serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
+ }
updateDictResizePolicy();
}
- zfree(clientids);
- zfree(conns);
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
diff --git a/src/replication.c b/src/replication.c
index cbc52cb2e..9f5c19053 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -884,7 +884,7 @@ void sendBulkToSlave(connection *conn) {
nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
- strerror(errno));
+ connGetLastError(conn));
freeClient(slave);
return;
}
@@ -911,7 +911,7 @@ void sendBulkToSlave(connection *conn) {
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
- strerror(errno));
+ connGetLastError(conn));
freeClient(slave);
}
return;
@@ -926,6 +926,152 @@ void sendBulkToSlave(connection *conn) {
}
}
+/* Remove one write handler from the list of connections waiting to be writable
+ * during rdb pipe transfer. */
+void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
+ if (!connHasWriteHandler(conn))
+ return;
+ connSetWriteHandler(conn, NULL);
+ server.rdb_pipe_numconns_writing--;
+ /* if there are no more writes for now for this conn, or write error: */
+ if (server.rdb_pipe_numconns_writing == 0) {
+ if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
+ serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
+ }
+ }
+}
+
+/* Called in diskless master during transfer of data from the rdb pipe, when
+ * the replica becomes writable again. */
+void rdbPipeWriteHandler(struct connection *conn) {
+ serverAssert(server.rdb_pipe_bufflen>0);
+ client *slave = connGetPrivateData(conn);
+ int nwritten;
+ if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
+ server.rdb_pipe_bufflen - slave->repldboff)) == -1)
+ {
+ if (connGetState(conn) == CONN_STATE_CONNECTED)
+ return; /* equivalent to EAGAIN */
+ serverLog(LL_WARNING,"Write error sending DB to replica: %s",
+ connGetLastError(conn));
+ freeClient(slave);
+ return;
+ } else {
+ slave->repldboff += nwritten;
+ server.stat_net_output_bytes += nwritten;
+ if (slave->repldboff < server.rdb_pipe_bufflen)
+ return; /* more data to write.. */
+ }
+ rdbPipeWriteHandlerConnRemoved(conn);
+}
+
+/* When the the pipe serving diskless rdb transfer is drained (write end was
+ * closed), we can clean up all the temporary variables, and cleanup after the
+ * fork child. */
+void RdbPipeCleanup() {
+ close(server.rdb_pipe_read);
+ zfree(server.rdb_pipe_conns);
+ server.rdb_pipe_conns = NULL;
+ server.rdb_pipe_numconns = 0;
+ server.rdb_pipe_numconns_writing = 0;
+ zfree(server.rdb_pipe_buff);
+ server.rdb_pipe_buff = NULL;
+ server.rdb_pipe_bufflen = 0;
+
+ /* Since we're avoiding to detect the child exited as long as the pipe is
+ * not drained, so now is the time to check. */
+ checkChildrenDone();
+}
+
+/* Called in diskless master, when there's data to read from the child's rdb pipe */
+void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
+ UNUSED(mask);
+ UNUSED(clientData);
+ UNUSED(eventLoop);
+ int i;
+ if (!server.rdb_pipe_buff)
+ server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
+ serverAssert(server.rdb_pipe_numconns_writing==0);
+
+ while (1) {
+ server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
+ if (server.rdb_pipe_bufflen < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return;
+ serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));
+ for (i=0; i < server.rdb_pipe_numconns; i++) {
+ connection *conn = server.rdb_pipe_conns[i];
+ if (!conn)
+ continue;
+ client *slave = connGetPrivateData(conn);
+ freeClient(slave);
+ server.rdb_pipe_conns[i] = NULL;
+ }
+ killRDBChild();
+ return;
+ }
+
+ if (server.rdb_pipe_bufflen == 0) {
+ /* EOF - write end was closed. */
+ int stillUp = 0;
+ aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
+ for (i=0; i < server.rdb_pipe_numconns; i++)
+ {
+ connection *conn = server.rdb_pipe_conns[i];
+ if (!conn)
+ continue;
+ stillUp++;
+ }
+ serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);
+ RdbPipeCleanup();
+ return;
+ }
+
+ int stillAlive = 0;
+ for (i=0; i < server.rdb_pipe_numconns; i++)
+ {
+ int nwritten;
+ connection *conn = server.rdb_pipe_conns[i];
+ if (!conn)
+ continue;
+
+ client *slave = connGetPrivateData(conn);
+ if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
+ if (connGetState(conn) != CONN_STATE_CONNECTED) {
+ serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
+ connGetLastError(conn));
+ freeClient(slave);
+ server.rdb_pipe_conns[i] = NULL;
+ continue;
+ }
+ /* An error and still in connected state, is equivalent to EAGAIN */
+ slave->repldboff = 0;
+ } else {
+ slave->repldboff = nwritten;
+ server.stat_net_output_bytes += nwritten;
+ }
+ /* If we were unable to write all the data to one of the replicas,
+ * setup write handler (and disable pipe read handler, below) */
+ if (nwritten != server.rdb_pipe_bufflen) {
+ server.rdb_pipe_numconns_writing++;
+ connSetWriteHandler(conn, rdbPipeWriteHandler);
+ }
+ stillAlive++;
+ }
+
+ if (stillAlive == 0) {
+ serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");
+ killRDBChild();
+ RdbPipeCleanup();
+ }
+ /* Remove the pipe read handler if at least one write handler was set. */
+ if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
+ aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
+ break;
+ }
+ }
+}
+
/* This function is called at the end of every background saving,
* or when the replication RDB transfer strategy is modified from
* disk to socket or the other way around.
diff --git a/src/rio.c b/src/rio.c
index 5e3b4a06e..c8c924380 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -272,85 +272,67 @@ void rioFreeConn(rio *r, sds *remaining) {
r->io.conn.buf = NULL;
}
-/* ------------------- File descriptors set implementation ------------------
- * This target is used to write the RDB file to N different replicas via
- * sockets, when the master just streams the data to the replicas without
- * creating an RDB on-disk image (diskless replication option).
+/* ------------------- File descriptor implementation ------------------
+ * This target is used to write the RDB file to pipe, when the master just
+ * streams the data to the replicas without creating an RDB on-disk image
+ * (diskless replication option).
* It only implements writes. */
/* Returns 1 or 0 for success/failure.
- * The function returns success as long as we are able to correctly write
- * to at least one file descriptor.
*
* When buf is NULL and len is 0, the function performs a flush operation
* if there is some pending buffer, so this function is also used in order
- * to implement rioFdsetFlush(). */
-static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
+ * to implement rioFdFlush(). */
+static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
ssize_t retval;
- int j;
unsigned char *p = (unsigned char*) buf;
int doflush = (buf == NULL && len == 0);
- /* To start we always append to our buffer. If it gets larger than
- * a given size, we actually write to the sockets. */
- if (len) {
- r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len);
- len = 0; /* Prevent entering the while below if we don't flush. */
- if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1;
- }
-
- if (doflush) {
- p = (unsigned char*) r->io.connset.buf;
- len = sdslen(r->io.connset.buf);
+ /* For small writes, we rather keep the data in user-space buffer, and flush
+ * it only when it grows. however for larger writes, we prefer to flush
+ * any pre-existing buffer, and write the new one directly without reallocs
+ * and memory copying. */
+ if (len > PROTO_IOBUF_LEN) {
+ /* First, flush any pre-existing buffered data. */
+ if (sdslen(r->io.fd.buf)) {
+ if (rioFdWrite(r, NULL, 0) == 0)
+ return 0;
+ }
+ /* Write the new data, keeping 'p' and 'len' from the input. */
+ } else {
+ if (len) {
+ r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
+ if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
+ doflush = 1;
+ if (!doflush)
+ return 1;
+ }
+ /* Flusing the buffered data. set 'p' and 'len' accordintly. */
+ p = (unsigned char*) r->io.fd.buf;
+ len = sdslen(r->io.fd.buf);
}
- /* Write in little chunchs so that when there are big writes we
- * parallelize while the kernel is sending data in background to
- * the TCP socket. */
- while(len) {
- size_t count = len < 1024 ? len : 1024;
- int broken = 0;
- for (j = 0; j < r->io.connset.numconns; j++) {
- if (r->io.connset.state[j] != 0) {
- /* Skip FDs alraedy in error. */
- broken++;
- continue;
- }
-
- /* Make sure to write 'count' bytes to the socket regardless
- * of short writes. */
- size_t nwritten = 0;
- while(nwritten != count) {
- retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten);
- if (retval <= 0) {
- /* With blocking sockets, which is the sole user of this
- * rio target, EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option, so we translate the error
- * into one more recognizable by the user. */
- if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
- break;
- }
- nwritten += retval;
- }
-
- if (nwritten != count) {
- /* Mark this FD as broken. */
- r->io.connset.state[j] = errno;
- if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO;
- }
+ size_t nwritten = 0;
+ while(nwritten != len) {
+ retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
+ if (retval <= 0) {
+ /* With blocking io, which is the sole user of this
+ * rio target, EWOULDBLOCK is returned only because of
+ * the SO_SNDTIMEO socket option, so we translate the error
+ * into one more recognizable by the user. */
+ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
+ return 0; /* error. */
}
- if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */
- p += count;
- len -= count;
- r->io.connset.pos += count;
+ nwritten += retval;
}
- if (doflush) sdsclear(r->io.connset.buf);
+ r->io.fd.pos += len;
+ sdsclear(r->io.fd.buf);
return 1;
}
/* Returns 1 or 0 for success/failure. */
-static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
+static size_t rioFdRead(rio *r, void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
@@ -358,23 +340,23 @@ static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
}
/* Returns read/write position in file. */
-static off_t rioFdsetTell(rio *r) {
- return r->io.connset.pos;
+static off_t rioFdTell(rio *r) {
+ return r->io.fd.pos;
}
/* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */
-static int rioFdsetFlush(rio *r) {
+static int rioFdFlush(rio *r) {
/* Our flush is implemented by the write method, that recognizes a
* buffer set to NULL with a count of zero as a flush request. */
- return rioFdsetWrite(r,NULL,0);
+ return rioFdWrite(r,NULL,0);
}
-static const rio rioFdsetIO = {
- rioFdsetRead,
- rioFdsetWrite,
- rioFdsetTell,
- rioFdsetFlush,
+static const rio rioFdIO = {
+ rioFdRead,
+ rioFdWrite,
+ rioFdTell,
+ rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
@@ -383,24 +365,16 @@ static const rio rioFdsetIO = {
{ { NULL, 0 } } /* union for io-specific vars */
};
-void rioInitWithConnset(rio *r, connection **conns, int numconns) {
- int j;
-
- *r = rioFdsetIO;
- r->io.connset.conns = zmalloc(sizeof(connection *)*numconns);
- r->io.connset.state = zmalloc(sizeof(int)*numconns);
- memcpy(r->io.connset.conns,conns,sizeof(connection *)*numconns);
- for (j = 0; j < numconns; j++) r->io.connset.state[j] = 0;
- r->io.connset.numconns = numconns;
- r->io.connset.pos = 0;
- r->io.connset.buf = sdsempty();
+void rioInitWithFd(rio *r, int fd) {
+ *r = rioFdIO;
+ r->io.fd.fd = fd;
+ r->io.fd.pos = 0;
+ r->io.fd.buf = sdsempty();
}
/* release the rio stream. */
-void rioFreeConnset(rio *r) {
- zfree(r->io.connset.conns);
- zfree(r->io.connset.state);
- sdsfree(r->io.connset.buf);
+void rioFreeFd(rio *r) {
+ sdsfree(r->io.fd.buf);
}
/* ---------------------------- Generic functions ---------------------------- */
diff --git a/src/rio.h b/src/rio.h
index fdde7c20e..9576335e8 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -77,7 +77,7 @@ struct _rio {
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
- /* Connection object */
+ /* Connection object (used to read from socket) */
struct {
connection *conn; /* Connection */
off_t pos; /* pos in buf that was returned */
@@ -85,14 +85,12 @@ struct _rio {
size_t read_limit; /* don't allow to buffer/read more than that */
size_t read_so_far; /* amount of data read from the rio (not buffered) */
} conn;
- /* Multiple FDs target (used to write to N sockets). */
+ /* FD target (used to write to pipe). */
struct {
- connection **conns; /* Connections */
- int *state; /* Error state of each fd. 0 (if ok) or errno. */
- int numconns;
+ int fd; /* File descriptor. */
off_t pos;
sds buf;
- } connset;
+ } fd;
} io;
};
@@ -161,9 +159,9 @@ static inline void rioClearErrors(rio *r) {
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithConn(rio *r, connection *conn, size_t read_limit);
-void rioInitWithConnset(rio *r, connection **conns, int numconns);
+void rioInitWithFd(rio *r, int fd);
-void rioFreeConnset(rio *r);
+void rioFreeFd(rio *r);
void rioFreeConn(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, long count);
diff --git a/src/server.c b/src/server.c
index 930a01723..f05ce3151 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1746,6 +1746,48 @@ void updateCachedTime(void) {
server.daylight_active = tm.tm_isdst;
}
+void checkChildrenDone(void) {
+ int statloc;
+ pid_t pid;
+
+ /* If we have a diskless rdb child (note that we support only one concurrent
+ * child), we want to avoid collecting it's exit status and acting on it
+ * as long as we didn't finish to drain the pipe, since then we're at risk
+ * of starting a new fork and a new pipe before we're done with the previous
+ * one. */
+ if (server.rdb_child_pid != -1 && server.rdb_pipe_conns)
+ return;
+
+ if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
+ int exitcode = WEXITSTATUS(statloc);
+ int bysignal = 0;
+
+ if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
+
+ if (pid == -1) {
+ serverLog(LL_WARNING,"wait3() returned an error: %s. "
+ "rdb_child_pid = %d, aof_child_pid = %d",
+ strerror(errno),
+ (int) server.rdb_child_pid,
+ (int) server.aof_child_pid);
+ } else if (pid == server.rdb_child_pid) {
+ backgroundSaveDoneHandler(exitcode,bysignal);
+ if (!bysignal && exitcode == 0) receiveChildInfo();
+ } else if (pid == server.aof_child_pid) {
+ backgroundRewriteDoneHandler(exitcode,bysignal);
+ if (!bysignal && exitcode == 0) receiveChildInfo();
+ } else {
+ if (!ldbRemoveChild(pid)) {
+ serverLog(LL_WARNING,
+ "Warning, detected child with unmatched pid: %ld",
+ (long)pid);
+ }
+ }
+ updateDictResizePolicy();
+ closeChildInfoPipe();
+ }
+}
+
/* This is our timer interrupt, called server.hz times per second.
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
@@ -1898,37 +1940,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
- int statloc;
- pid_t pid;
-
- if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
- int exitcode = WEXITSTATUS(statloc);
- int bysignal = 0;
-
- if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
-
- if (pid == -1) {
- serverLog(LL_WARNING,"wait3() returned an error: %s. "
- "rdb_child_pid = %d, aof_child_pid = %d",
- strerror(errno),
- (int) server.rdb_child_pid,
- (int) server.aof_child_pid);
- } else if (pid == server.rdb_child_pid) {
- backgroundSaveDoneHandler(exitcode,bysignal);
- if (!bysignal && exitcode == 0) receiveChildInfo();
- } else if (pid == server.aof_child_pid) {
- backgroundRewriteDoneHandler(exitcode,bysignal);
- if (!bysignal && exitcode == 0) receiveChildInfo();
- } else {
- if (!ldbRemoveChild(pid)) {
- serverLog(LL_WARNING,
- "Warning, detected child with unmatched pid: %ld",
- (long)pid);
- }
- }
- updateDictResizePolicy();
- closeChildInfoPipe();
- }
+ checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
@@ -2081,6 +2093,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
+ /* TODO: How do i handle write barriers flag */
+ tlsProcessPendingData();
+ /* If tls already has pending unread data don't sleep at all. */
+ aeDontWait(server.el, tlsHasPendingData());
+
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
@@ -2280,6 +2297,7 @@ void initServerConfig(void) {
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY;
+ server.key_load_delay = CONFIG_DEFAULT_KEY_LOAD_DELAY;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL;
@@ -2813,6 +2831,11 @@ void initServer(void) {
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
+ server.rdb_pipe_conns = NULL;
+ server.rdb_pipe_numconns = 0;
+ server.rdb_pipe_numconns_writing = 0;
+ server.rdb_pipe_buff = NULL;
+ server.rdb_pipe_bufflen = 0;
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
diff --git a/src/server.h b/src/server.h
index 1b41c9ac5..89f219033 100644
--- a/src/server.h
+++ b/src/server.h
@@ -135,6 +135,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0
+#define CONFIG_DEFAULT_KEY_LOAD_DELAY 0
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
#define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1
@@ -1236,10 +1237,17 @@ struct redisServer {
int rdb_child_type; /* Type of save by active child. */
int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
- int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
- int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
+ int rdb_pipe_write; /* RDB pipes used to transfer the rdb */
+ int rdb_pipe_read; /* data to the parent process in diskless repl. */
+ connection **rdb_pipe_conns; /* Connections which are currently the */
+ int rdb_pipe_numconns; /* target of diskless rdb fork child. */
+ int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */
+ char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */
+ int rdb_pipe_bufflen; /* that was read from the the rdb pipe. */
int rdb_key_save_delay; /* Delay in microseconds between keys while
* writing the RDB. (for testings) */
+ int key_load_delay; /* Delay in microseconds between keys while
+ * loading aof or rdb. (for testings) */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
@@ -1779,6 +1787,8 @@ void clearReplicationId2(void);
void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
+void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
+void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename);
@@ -1944,6 +1954,7 @@ unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
struct redisMemOverhead *getMemoryOverheadData(void);
void freeMemoryOverheadData(struct redisMemOverhead *mh);
+void checkChildrenDone(void);
#define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */
diff --git a/src/tls.c b/src/tls.c
index dabb2ee0f..c939ebf55 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -30,6 +30,7 @@
#include "server.h"
#include "connhelpers.h"
+#include "adlist.h"
#ifdef USE_OPENSSL
@@ -41,6 +42,10 @@ extern ConnectionType CT_Socket;
SSL_CTX *redis_tls_ctx;
+/* list of connections with pending data already read from the socket, but not
+ * served to the reader yet. */
+static list *pending_list = NULL;
+
void tlsInit(void) {
ERR_load_crypto_strings();
SSL_load_error_strings();
@@ -49,6 +54,8 @@ void tlsInit(void) {
if (!RAND_poll()) {
serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator.");
}
+
+ pending_list = listCreate();
}
int tlsConfigureServer(void) {
@@ -188,6 +195,7 @@ typedef struct tls_connection {
int flags;
SSL *ssl;
char *ssl_error;
+ listNode *pending_list_node;
} tls_connection;
connection *connCreateTLS(void) {
@@ -288,11 +296,7 @@ void updateSSLEvent(tls_connection *conn) {
aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE);
}
-
-static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
- UNUSED(el);
- UNUSED(fd);
- tls_connection *conn = clientData;
+static void tlsHandleEvent(tls_connection *conn, int mask) {
int ret;
TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d",
@@ -369,6 +373,15 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in
if ((mask & AE_READABLE) && conn->c.read_handler) {
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
+ if (SSL_has_pending(conn->ssl)) {
+ if (!conn->pending_list_node) {
+ listAddNodeTail(pending_list, conn);
+ conn->pending_list_node = listLast(pending_list);
+ }
+ } else if (conn->pending_list_node) {
+ listDelNode(pending_list, conn->pending_list_node);
+ conn->pending_list_node = NULL;
+ }
}
if ((mask & AE_WRITABLE) && conn->c.write_handler) {
@@ -382,6 +395,13 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in
updateSSLEvent(conn);
}
+static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
+ UNUSED(el);
+ UNUSED(fd);
+ tls_connection *conn = clientData;
+ tlsHandleEvent(conn, mask);
+}
+
static void connTLSClose(connection *conn_) {
tls_connection *conn = (tls_connection *) conn_;
@@ -395,6 +415,11 @@ static void connTLSClose(connection *conn_) {
conn->ssl_error = NULL;
}
+ if (conn->pending_list_node) {
+ listDelNode(pending_list, conn->pending_list_node);
+ conn->pending_list_node = NULL;
+ }
+
CT_Socket.close(conn_);
}
@@ -610,17 +635,6 @@ exit:
return nread;
}
-/* TODO: This is probably not the right thing to do, but as we handle proxying from child
- * processes we'll probably not need any shutdown mechanism anyway so this is just a
- * place holder for now.
- */
-static int connTLSShutdown(connection *conn_, int how) {
- UNUSED(how);
- tls_connection *conn = (tls_connection *) conn_;
-
- return SSL_shutdown(conn->ssl);
-}
-
ConnectionType CT_TLS = {
.ae_handler = tlsEventHandler,
.accept = connTLSAccept,
@@ -635,9 +649,25 @@ ConnectionType CT_TLS = {
.sync_write = connTLSSyncWrite,
.sync_read = connTLSSyncRead,
.sync_readline = connTLSSyncReadLine,
- .shutdown = connTLSShutdown
};
+int tlsHasPendingData() {
+ if (!pending_list)
+ return 0;
+ return listLength(pending_list) > 0;
+}
+
+void tlsProcessPendingData() {
+ listIter li;
+ listNode *ln;
+
+ listRewind(pending_list,&li);
+ while((ln = listNext(&li))) {
+ tls_connection *conn = listNodeValue(ln);
+ tlsHandleEvent(conn, AE_READABLE);
+ }
+}
+
#else /* USE_OPENSSL */
void tlsInit(void) {
@@ -666,4 +696,11 @@ connection *connCreateAcceptedTLS(int fd, int require_auth) {
return NULL;
}
+int tlsHasPendingData() {
+ return 0;
+}
+
+void tlsProcessPendingData() {
+}
+
#endif
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 43bc684b4..192137e87 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -29,9 +29,6 @@ start_server {tags {"repl"}} {
$slave slaveof $master_host $master_port
test {Slave enters handshake} {
- if {$::tls} {
- fail "TLS with repl-diskless-sync not supported yet."
- }
wait_for_condition 50 1000 {
[string match *handshake* [$slave role]]
} else {
@@ -187,10 +184,6 @@ start_server {tags {"repl"}} {
}
foreach mdl {no yes} {
- if {$::tls && $mdl eq "yes"} {
- puts "** Skipping test: TLS with repl-diskless-sync not supported yet."
- continue
- }
foreach sdl {disabled swapdb} {
start_server {tags {"repl"}} {
set master [srv 0 client]
@@ -327,9 +320,6 @@ start_server {tags {"repl"}} {
}
test {slave fails full sync and diskless load swapdb recoveres it} {
- if {$::tls} {
- fail ""
- }
start_server {tags {"repl"}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
@@ -397,10 +387,6 @@ test {slave fails full sync and diskless load swapdb recoveres it} {
}
test {diskless loading short read} {
- if {$::tls} {
- fail "TLS with repl-diskless-sync not supported yet."
- }
-
start_server {tags {"repl"}} {
set replica [srv 0 client]
set replica_host [srv 0 host]
@@ -480,3 +466,159 @@ test {diskless loading short read} {
}
}
+# get current stime and utime metrics for a thread (since it's creation)
+proc get_cpu_metrics { statfile } {
+ if { [ catch {
+ set fid [ open $statfile r ]
+ set data [ read $fid 1024 ]
+ ::close $fid
+ set data [ split $data ]
+
+ ;## number of jiffies it has been scheduled...
+ set utime [ lindex $data 13 ]
+ set stime [ lindex $data 14 ]
+ } err ] } {
+ error "assertion:can't parse /proc: $err"
+ }
+ set mstime [clock milliseconds]
+ return [ list $mstime $utime $stime ]
+}
+
+# compute %utime and %stime of a thread between two measurements
+proc compute_cpu_usage {start end} {
+ set clock_ticks [exec getconf CLK_TCK]
+ # convert ms time to jiffies and calc delta
+ set dtime [ expr { ([lindex $end 0] - [lindex $start 0]) * double($clock_ticks) / 1000 } ]
+ set utime [ expr { [lindex $end 1] - [lindex $start 1] } ]
+ set stime [ expr { [lindex $end 2] - [lindex $start 2] } ]
+ set pucpu [ expr { ($utime / $dtime) * 100 } ]
+ set pscpu [ expr { ($stime / $dtime) * 100 } ]
+ return [ list $pucpu $pscpu ]
+}
+
+
+# test diskless rdb pipe with multiple replicas, which may drop half way
+start_server {tags {"repl"}} {
+ set master [srv 0 client]
+ $master config set repl-diskless-sync yes
+ $master config set repl-diskless-sync-delay 1
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ # put enough data in the db that the rdb file will be bigger than the socket buffers
+ # and since we'll have key-load-delay of 100, 10000 keys will take at least 1 second
+ # we also need the replica to process requests during transfer (which it does only once in 2mb)
+ $master debug populate 10000 test 10000
+ $master config set rdbcompression no
+ foreach all_drop {no slow fast all} {
+ test "diskless $all_drop replicas drop during rdb pipe" {
+ set replicas {}
+ set replicas_alive {}
+ # start one replica that will read the rdb fast, and one that will be slow
+ start_server {} {
+ lappend replicas [srv 0 client]
+ lappend replicas_alive [srv 0 client]
+ start_server {} {
+ lappend replicas [srv 0 client]
+ lappend replicas_alive [srv 0 client]
+
+ # start replication
+ # it's enough for just one replica to be slow, and have it's write handler enabled
+ # so that the whole rdb generation process is bound to that
+ [lindex $replicas 0] config set repl-diskless-load swapdb
+ [lindex $replicas 0] config set key-load-delay 100
+ [lindex $replicas 0] replicaof $master_host $master_port
+ [lindex $replicas 1] replicaof $master_host $master_port
+
+ # wait for the replicas to start reading the rdb
+ # using the log file since the replica only responds to INFO once in 2mb
+ wait_for_log_message -1 "*Loading DB in memory*" 8 800 10
+
+ set master_statfile [format "/proc/%s/stat" [srv -2 pid]]
+ set master_start_metrics [get_cpu_metrics $master_statfile]
+ set start_time [clock seconds]
+
+ # wait a while so that the pipe socket writer will be
+ # blocked on write (since replica 0 is slow to read from the socket)
+ after 500
+
+ # add some command to be present in the command stream after the rdb.
+ $master incr $all_drop
+
+ # disconnect replicas depending on the current test
+ if {$all_drop == "all" || $all_drop == "fast"} {
+ exec kill [srv 0 pid]
+ set replicas_alive [lreplace $replicas_alive 1 1]
+ }
+ if {$all_drop == "all" || $all_drop == "slow"} {
+ exec kill [srv -1 pid]
+ set replicas_alive [lreplace $replicas_alive 0 0]
+ }
+
+ # wait for rdb child to exit
+ wait_for_condition 500 100 {
+ [s -2 rdb_bgsave_in_progress] == 0
+ } else {
+ fail "rdb child didn't terminate"
+ }
+
+ # make sure we got what we were aiming for, by looking for the message in the log file
+ if {$all_drop == "all"} {
+ wait_for_log_message -2 "*Diskless rdb transfer, last replica dropped, killing fork child*" 12 1 1
+ }
+ if {$all_drop == "no"} {
+ wait_for_log_message -2 "*Diskless rdb transfer, done reading from pipe, 2 replicas still up*" 12 1 1
+ }
+ if {$all_drop == "slow" || $all_drop == "fast"} {
+ wait_for_log_message -2 "*Diskless rdb transfer, done reading from pipe, 1 replicas still up*" 12 1 1
+ }
+
+ # make sure we don't have a busy loop going thought epoll_wait
+ set master_end_metrics [get_cpu_metrics $master_statfile]
+ set time_elapsed [expr {[clock seconds]-$start_time}]
+ set master_cpu [compute_cpu_usage $master_start_metrics $master_end_metrics]
+ set master_utime [lindex $master_cpu 0]
+ set master_stime [lindex $master_cpu 1]
+ if {$::verbose} {
+ puts "elapsed: $time_elapsed"
+ puts "master utime: $master_utime"
+ puts "master stime: $master_stime"
+ }
+ if {$all_drop == "all" || $all_drop == "slow"} {
+ assert {$master_utime < 30}
+ assert {$master_stime < 30}
+ }
+ if {$all_drop == "none" || $all_drop == "fast"} {
+ assert {$master_utime < 15}
+ assert {$master_stime < 15}
+ }
+
+ # verify the data integrity
+ foreach replica $replicas_alive {
+ # Wait that replicas acknowledge they are online so
+ # we are sure that DBSIZE and DEBUG DIGEST will not
+ # fail because of timing issues.
+ wait_for_condition 50 100 {
+ [lindex [$replica role] 3] eq {connected}
+ } else {
+ fail "replicas still not connected after some time"
+ }
+
+ # Make sure that replicas and master have same
+ # number of keys
+ wait_for_condition 50 100 {
+ [$master dbsize] == [$replica dbsize]
+ } else {
+ fail "Different number of keys between master and replicas after too long time."
+ }
+
+ # Check digests
+ set digest [$master debug digest]
+ set digest0 [$replica debug digest]
+ assert {$digest ne 0000000000000000000000000000000000000000}
+ assert {$digest eq $digest0}
+ }
+ }
+ }
+ }
+ }
+}