summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-08-19 12:18:25 +0300
committerYossi Gottlieb <yossigo@gmail.com>2019-10-07 21:06:30 +0300
commit6b6294807c0bca50041da117c1abb35f5114e972 (patch)
treee13c13d22cab5b36d1ad70c73d4398d30dd6eb15 /src
parent5a477946065bcf05b335ededd6b794e82882ab73 (diff)
downloadredis-6b6294807c0bca50041da117c1abb35f5114e972.tar.gz
TLS: Implement support for write barrier.
Diffstat (limited to 'src')
-rw-r--r--src/aof.c4
-rw-r--r--src/cluster.c2
-rw-r--r--src/connection.c32
-rw-r--r--src/connection.h14
-rw-r--r--src/debug.c6
-rw-r--r--src/networking.c9
-rw-r--r--src/server.c11
-rw-r--r--src/server.h1
-rw-r--r--src/tls.c47
9 files changed, 102 insertions, 24 deletions
diff --git a/src/aof.c b/src/aof.c
index ef34097ed..181995bbc 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -385,6 +385,10 @@ void flushAppendOnlyFile(int force) {
* there is much to do about the whole server stopping for power problems
* or alike */
+ if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
+ usleep(server.aof_flush_sleep);
+ }
+
latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
diff --git a/src/cluster.c b/src/cluster.c
index 639ab1ea7..1d922e87a 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -2277,7 +2277,7 @@ void clusterReadHandler(connection *conn) {
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
- connSetWriteHandler(link->conn, clusterWriteHandler); /* TODO: Handle AE_BARRIER in conns */
+ connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
diff --git a/src/connection.c b/src/connection.c
index 601f175bc..85bf572ad 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -194,10 +194,14 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand
/* Register a write handler, to be called when the connection is writable.
* If NULL, the existing handler is removed.
*/
-static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
+static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;
conn->write_handler = func;
+ if (barrier)
+ conn->flags |= CONN_FLAG_WRITE_BARRIER;
+ else
+ conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
@@ -247,13 +251,35 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
conn->conn_handler = NULL;
}
+ /* Normally we execute the readable event first, and the writable
+ * event laster. This is useful as sometimes we may be able
+ * to serve the reply of a query immediately after processing the
+ * query.
+ *
+ * However if WRITE_BARRIER is set in the mask, our application is
+ * asking us to do the reverse: never fire the writable event
+ * after the readable. In such a case, we invert the calls.
+ * This is useful when, for instance, we want to do things
+ * in the beforeSleep() hook, like fsynching a file to disk,
+ * before replying to a client. */
+ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
+
+ int call_write = (mask & AE_WRITABLE) && conn->write_handler;
+ int call_read = (mask & AE_READABLE) && conn->read_handler;
+
/* Handle normal I/O flows */
- if ((mask & AE_READABLE) && conn->read_handler) {
+ if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
- if ((mask & AE_WRITABLE) && conn->write_handler) {
+ /* Fire the writable event. */
+ if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
+ /* If we have to invert the call, fire the readable event now
+ * after the writable one. */
+ if (invert && call_read) {
+ if (!callHandler(conn, conn->read_handler)) return;
+ }
}
static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
diff --git a/src/connection.h b/src/connection.h
index 3fdcddebe..97622f8d6 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -47,6 +47,7 @@ typedef enum {
#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
+#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
@@ -57,7 +58,7 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
- int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
+ int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
@@ -144,7 +145,7 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) {
* If NULL, the existing handler is removed.
*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
- return conn->type->set_write_handler(conn, func);
+ return conn->type->set_write_handler(conn, func, 0);
}
/* Register a read handler, to be called when the connection is readable.
@@ -154,6 +155,15 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu
return conn->type->set_read_handler(conn, func);
}
+/* Set a write handler, and possibly enable a write barrier, this flag is
+ * cleared when write handler is changed or removed.
+ * With barroer enabled, we never fire the event if the read handler already
+ * fired in the same event loop iteration. Useful when you want to persist
+ * things to disk before sending replies, and want to do that in a group fashion. */
+static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) {
+ return conn->type->set_write_handler(conn, func, barrier);
+}
+
static inline void connClose(connection *conn) {
conn->type->close(conn);
}
diff --git a/src/debug.c b/src/debug.c
index a2d61d8ab..c1546c4c1 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -319,6 +319,7 @@ void debugCommand(client *c) {
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
"SEGFAULT -- Crash the server with sigsegv.",
"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.",
+"AOF-FLUSH-SLEEP <microsec> -- Server will sleep before flushing the AOF, this is used for testing",
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
"STRUCTSIZE -- Return the size of different Redis core C structures.",
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
@@ -595,6 +596,11 @@ NULL
{
server.active_expire_enabled = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"aof-flush-sleep") &&
+ c->argc == 3)
+ {
+ server.aof_flush_sleep = atoi(c->argv[2]->ptr);
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") &&
c->argc == 3)
{
diff --git a/src/networking.c b/src/networking.c
index d52eb1eba..97035c931 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1305,19 +1305,18 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
- int ae_flags = AE_WRITABLE;
+ int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
- * actual fsync of AOF to disk. AE_BARRIER ensures that. */
+ * actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
- ae_flags |= AE_BARRIER;
+ ae_barrier = 1;
}
- /* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
- if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
+ if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
diff --git a/src/server.c b/src/server.c
index f05ce3151..0f5837bc7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2048,6 +2048,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
+ /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
+ tlsProcessPendingData();
+ /* If tls still has pending unread data don't sleep at all. */
+ aeDontWait(server.el, tlsHasPendingData());
+
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
@@ -2093,11 +2098,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
- /* TODO: How do i handle write barriers flag */
- tlsProcessPendingData();
- /* If tls already has pending unread data don't sleep at all. */
- aeDontWait(server.el, tlsHasPendingData());
-
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
@@ -2286,6 +2286,7 @@ void initServerConfig(void) {
server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
+ server.aof_flush_sleep = 0;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
diff --git a/src/server.h b/src/server.h
index 89f219033..df5e42c41 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1192,6 +1192,7 @@ struct redisServer {
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
+ int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
diff --git a/src/tls.c b/src/tls.c
index c939ebf55..68d15ef9e 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -361,18 +361,47 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
conn->c.conn_handler = NULL;
break;
case CONN_STATE_CONNECTED:
- if ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)) {
+ {
+ int call_read = ((mask & AE_READABLE) && conn->c.read_handler) ||
+ ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE));
+ int call_write = ((mask & AE_WRITABLE) && conn->c.write_handler) ||
+ ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ));
+
+ /* Normally we execute the readable event first, and the writable
+ * event laster. This is useful as sometimes we may be able
+ * to serve the reply of a query immediately after processing the
+ * query.
+ *
+ * However if WRITE_BARRIER is set in the mask, our application is
+ * asking us to do the reverse: never fire the writable event
+ * after the readable. In such a case, we invert the calls.
+ * This is useful when, for instance, we want to do things
+ * in the beforeSleep() hook, like fsynching a file to disk,
+ * before replying to a client. */
+ int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER;
+
+ if (!invert && call_read) {
+ conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
+ if (!callHandler((connection *) conn, conn->c.read_handler)) return;
+ }
+
+ /* Fire the writable event. */
+ if (call_write) {
conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ;
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
}
- if ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)) {
+ /* If we have to invert the call, fire the readable event now
+ * after the writable one. */
+ if (invert && call_read) {
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
}
- if ((mask & AE_READABLE) && conn->c.read_handler) {
- if (!callHandler((connection *) conn, conn->c.read_handler)) return;
+ /* If SSL has pending that, already read from the socket, we're at
+ * risk of not calling the read handler again, make sure to add it
+ * to a list of pending connection that should be handled anyway. */
+ if ((mask & AE_READABLE)) {
if (SSL_has_pending(conn->ssl)) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
@@ -384,10 +413,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
}
}
- if ((mask & AE_WRITABLE) && conn->c.write_handler) {
- if (!callHandler((connection *) conn, conn->c.write_handler)) return;
- }
break;
+ }
default:
break;
}
@@ -535,8 +562,12 @@ static const char *connTLSGetLastError(connection *conn_) {
return NULL;
}
-int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
+int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
conn->write_handler = func;
+ if (barrier)
+ conn->flags |= CONN_FLAG_WRITE_BARRIER;
+ else
+ conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
updateSSLEvent((tls_connection *) conn);
return C_OK;
}