summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-08-19 12:18:25 +0300
committerYossi Gottlieb <yossigo@gmail.com>2019-10-07 21:06:30 +0300
commit6b6294807c0bca50041da117c1abb35f5114e972 (patch)
treee13c13d22cab5b36d1ad70c73d4398d30dd6eb15
parent5a477946065bcf05b335ededd6b794e82882ab73 (diff)
downloadredis-6b6294807c0bca50041da117c1abb35f5114e972.tar.gz
TLS: Implement support for write barrier.
-rw-r--r--TLS.md10
-rw-r--r--src/aof.c4
-rw-r--r--src/cluster.c2
-rw-r--r--src/connection.c32
-rw-r--r--src/connection.h14
-rw-r--r--src/debug.c6
-rw-r--r--src/networking.c9
-rw-r--r--src/server.c11
-rw-r--r--src/server.h1
-rw-r--r--src/tls.c47
-rw-r--r--tests/integration/aof.tcl31
11 files changed, 136 insertions, 31 deletions
diff --git a/TLS.md b/TLS.md
index 90ed08e7c..c627f814c 100644
--- a/TLS.md
+++ b/TLS.md
@@ -57,22 +57,18 @@ Connections
Connection abstraction API is mostly done and seems to hold well for hiding
implementation details between TLS and TCP.
-1. Still need to implement the equivalent of AE_BARRIER. Because TLS
- socket-level read/write events don't correspond to logical operations, this
- should probably be done at the Read/Write handler level.
-
-2. Multi-threading I/O is not supported. The main issue to address is the need
+1. Multi-threading I/O is not supported. The main issue to address is the need
to manipulate AE based on OpenSSL return codes. We can either propagate this
out of the thread, or explore ways of further optimizing MT I/O by having
event loops that live inside the thread and borrow connections in/out.
-3. Finish cleaning up the implementation. Make sure all error cases are handled
+2. Finish cleaning up the implementation. Make sure all error cases are handled
and reflected into connection state, connection state validated before
certain operations, etc.
- Clean (non-errno) interface to report would-block.
- Consistent error reporting.
-4. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
+3. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
socket blocking and configuring socket-level timeout. This means the timeout
value may not be so accurate, and there would be a lot of syscall overhead.
However I believe that getting rid of syncio completely in favor of pure
diff --git a/src/aof.c b/src/aof.c
index ef34097ed..181995bbc 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -385,6 +385,10 @@ void flushAppendOnlyFile(int force) {
* there is much to do about the whole server stopping for power problems
* or alike */
+ if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
+ usleep(server.aof_flush_sleep);
+ }
+
latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
diff --git a/src/cluster.c b/src/cluster.c
index 639ab1ea7..1d922e87a 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -2277,7 +2277,7 @@ void clusterReadHandler(connection *conn) {
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
- connSetWriteHandler(link->conn, clusterWriteHandler); /* TODO: Handle AE_BARRIER in conns */
+ connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
diff --git a/src/connection.c b/src/connection.c
index 601f175bc..85bf572ad 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -194,10 +194,14 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand
/* Register a write handler, to be called when the connection is writable.
* If NULL, the existing handler is removed.
*/
-static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
+static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;
conn->write_handler = func;
+ if (barrier)
+ conn->flags |= CONN_FLAG_WRITE_BARRIER;
+ else
+ conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
@@ -247,13 +251,35 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
conn->conn_handler = NULL;
}
+ /* Normally we execute the readable event first, and the writable
+ * event laster. This is useful as sometimes we may be able
+ * to serve the reply of a query immediately after processing the
+ * query.
+ *
+ * However if WRITE_BARRIER is set in the mask, our application is
+ * asking us to do the reverse: never fire the writable event
+ * after the readable. In such a case, we invert the calls.
+ * This is useful when, for instance, we want to do things
+ * in the beforeSleep() hook, like fsynching a file to disk,
+ * before replying to a client. */
+ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
+
+ int call_write = (mask & AE_WRITABLE) && conn->write_handler;
+ int call_read = (mask & AE_READABLE) && conn->read_handler;
+
/* Handle normal I/O flows */
- if ((mask & AE_READABLE) && conn->read_handler) {
+ if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
- if ((mask & AE_WRITABLE) && conn->write_handler) {
+ /* Fire the writable event. */
+ if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
+ /* If we have to invert the call, fire the readable event now
+ * after the writable one. */
+ if (invert && call_read) {
+ if (!callHandler(conn, conn->read_handler)) return;
+ }
}
static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
diff --git a/src/connection.h b/src/connection.h
index 3fdcddebe..97622f8d6 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -47,6 +47,7 @@ typedef enum {
#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
+#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
@@ -57,7 +58,7 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
- int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
+ int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
@@ -144,7 +145,7 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) {
* If NULL, the existing handler is removed.
*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
- return conn->type->set_write_handler(conn, func);
+ return conn->type->set_write_handler(conn, func, 0);
}
/* Register a read handler, to be called when the connection is readable.
@@ -154,6 +155,15 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu
return conn->type->set_read_handler(conn, func);
}
+/* Set a write handler, and possibly enable a write barrier, this flag is
+ * cleared when write handler is changed or removed.
+ * With barroer enabled, we never fire the event if the read handler already
+ * fired in the same event loop iteration. Useful when you want to persist
+ * things to disk before sending replies, and want to do that in a group fashion. */
+static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) {
+ return conn->type->set_write_handler(conn, func, barrier);
+}
+
static inline void connClose(connection *conn) {
conn->type->close(conn);
}
diff --git a/src/debug.c b/src/debug.c
index a2d61d8ab..c1546c4c1 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -319,6 +319,7 @@ void debugCommand(client *c) {
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
"SEGFAULT -- Crash the server with sigsegv.",
"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.",
+"AOF-FLUSH-SLEEP <microsec> -- Server will sleep before flushing the AOF, this is used for testing",
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
"STRUCTSIZE -- Return the size of different Redis core C structures.",
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
@@ -595,6 +596,11 @@ NULL
{
server.active_expire_enabled = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"aof-flush-sleep") &&
+ c->argc == 3)
+ {
+ server.aof_flush_sleep = atoi(c->argv[2]->ptr);
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") &&
c->argc == 3)
{
diff --git a/src/networking.c b/src/networking.c
index d52eb1eba..97035c931 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1305,19 +1305,18 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
- int ae_flags = AE_WRITABLE;
+ int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
- * actual fsync of AOF to disk. AE_BARRIER ensures that. */
+ * actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
- ae_flags |= AE_BARRIER;
+ ae_barrier = 1;
}
- /* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
- if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
+ if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
diff --git a/src/server.c b/src/server.c
index f05ce3151..0f5837bc7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2048,6 +2048,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
+ /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
+ tlsProcessPendingData();
+ /* If tls still has pending unread data don't sleep at all. */
+ aeDontWait(server.el, tlsHasPendingData());
+
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
@@ -2093,11 +2098,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
- /* TODO: How do i handle write barriers flag */
- tlsProcessPendingData();
- /* If tls already has pending unread data don't sleep at all. */
- aeDontWait(server.el, tlsHasPendingData());
-
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
@@ -2286,6 +2286,7 @@ void initServerConfig(void) {
server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
+ server.aof_flush_sleep = 0;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
diff --git a/src/server.h b/src/server.h
index 89f219033..df5e42c41 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1192,6 +1192,7 @@ struct redisServer {
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
+ int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
diff --git a/src/tls.c b/src/tls.c
index c939ebf55..68d15ef9e 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -361,18 +361,47 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
conn->c.conn_handler = NULL;
break;
case CONN_STATE_CONNECTED:
- if ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)) {
+ {
+ int call_read = ((mask & AE_READABLE) && conn->c.read_handler) ||
+ ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE));
+ int call_write = ((mask & AE_WRITABLE) && conn->c.write_handler) ||
+ ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ));
+
+ /* Normally we execute the readable event first, and the writable
+ * event laster. This is useful as sometimes we may be able
+ * to serve the reply of a query immediately after processing the
+ * query.
+ *
+ * However if WRITE_BARRIER is set in the mask, our application is
+ * asking us to do the reverse: never fire the writable event
+ * after the readable. In such a case, we invert the calls.
+ * This is useful when, for instance, we want to do things
+ * in the beforeSleep() hook, like fsynching a file to disk,
+ * before replying to a client. */
+ int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER;
+
+ if (!invert && call_read) {
+ conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
+ if (!callHandler((connection *) conn, conn->c.read_handler)) return;
+ }
+
+ /* Fire the writable event. */
+ if (call_write) {
conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ;
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
}
- if ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)) {
+ /* If we have to invert the call, fire the readable event now
+ * after the writable one. */
+ if (invert && call_read) {
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
}
- if ((mask & AE_READABLE) && conn->c.read_handler) {
- if (!callHandler((connection *) conn, conn->c.read_handler)) return;
+ /* If SSL has pending that, already read from the socket, we're at
+ * risk of not calling the read handler again, make sure to add it
+ * to a list of pending connection that should be handled anyway. */
+ if ((mask & AE_READABLE)) {
if (SSL_has_pending(conn->ssl)) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
@@ -384,10 +413,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
}
}
- if ((mask & AE_WRITABLE) && conn->c.write_handler) {
- if (!callHandler((connection *) conn, conn->c.write_handler)) return;
- }
break;
+ }
default:
break;
}
@@ -535,8 +562,12 @@ static const char *connTLSGetLastError(connection *conn_) {
return NULL;
}
-int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
+int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
conn->write_handler = func;
+ if (barrier)
+ conn->flags |= CONN_FLAG_WRITE_BARRIER;
+ else
+ conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
updateSSLEvent((tls_connection *) conn);
return C_OK;
}
diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl
index e276a6254..2734de7f1 100644
--- a/tests/integration/aof.tcl
+++ b/tests/integration/aof.tcl
@@ -257,4 +257,35 @@ tags {"aof"} {
r expire x -1
}
}
+
+ start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} appendfsync always}} {
+ test {AOF fsync always barrier issue} {
+ set rd [redis_deferring_client]
+ # Set a sleep when aof is flushed, so that we have a chance to look
+ # at the aof size and detect if the response of an incr command
+ # arrives before the data was written (and hopefully fsynced)
+ # We create a big reply, which will hopefully not have room in the
+ # socket buffers, and will install a write handler, then we sleep
+ # a big and issue the incr command, hoping that the last portion of
+ # the output buffer write, and the processing of the incr will happen
+ # in the same event loop cycle.
+ # Since the socket buffers and timing are unpredictable, we fuzz this
+ # test with slightly different sizes and sleeps a few times.
+ for {set i 0} {$i < 10} {incr i} {
+ r debug aof-flush-sleep 0
+ r del x
+ r setrange x [expr {int(rand()*5000000)+10000000}] x
+ r debug aof-flush-sleep 500000
+ set aof [file join [lindex [r config get dir] 1] appendonly.aof]
+ set size1 [file size $aof]
+ $rd get x
+ after [expr {int(rand()*30)}]
+ $rd incr new_value
+ $rd read
+ $rd read
+ set size2 [file size $aof]
+ assert {$size1 != $size2}
+ }
+ }
+ }
}