diff options
author | Oran Agra <oran@redislabs.com> | 2019-08-19 12:18:25 +0300 |
---|---|---|
committer | Yossi Gottlieb <yossigo@gmail.com> | 2019-10-07 21:06:30 +0300 |
commit | 6b6294807c0bca50041da117c1abb35f5114e972 (patch) | |
tree | e13c13d22cab5b36d1ad70c73d4398d30dd6eb15 /src | |
parent | 5a477946065bcf05b335ededd6b794e82882ab73 (diff) | |
download | redis-6b6294807c0bca50041da117c1abb35f5114e972.tar.gz |
TLS: Implement support for write barrier.
Diffstat (limited to 'src')
-rw-r--r-- | src/aof.c | 4 | ||||
-rw-r--r-- | src/cluster.c | 2 | ||||
-rw-r--r-- | src/connection.c | 32 | ||||
-rw-r--r-- | src/connection.h | 14 | ||||
-rw-r--r-- | src/debug.c | 6 | ||||
-rw-r--r-- | src/networking.c | 9 | ||||
-rw-r--r-- | src/server.c | 11 | ||||
-rw-r--r-- | src/server.h | 1 | ||||
-rw-r--r-- | src/tls.c | 47 |
9 files changed, 102 insertions, 24 deletions
@@ -385,6 +385,10 @@ void flushAppendOnlyFile(int force) { * there is much to do about the whole server stopping for power problems * or alike */ + if (server.aof_flush_sleep && sdslen(server.aof_buf)) { + usleep(server.aof_flush_sleep); + } + latencyStartMonitor(latency); nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); diff --git a/src/cluster.c b/src/cluster.c index 639ab1ea7..1d922e87a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2277,7 +2277,7 @@ void clusterReadHandler(connection *conn) { * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { if (sdslen(link->sndbuf) == 0 && msglen != 0) - connSetWriteHandler(link->conn, clusterWriteHandler); /* TODO: Handle AE_BARRIER in conns */ + connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); diff --git a/src/connection.c b/src/connection.c index 601f175bc..85bf572ad 100644 --- a/src/connection.c +++ b/src/connection.c @@ -194,10 +194,14 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand /* Register a write handler, to be called when the connection is writable. * If NULL, the existing handler is removed. */ -static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func) { +static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { if (func == conn->write_handler) return C_OK; conn->write_handler = func; + if (barrier) + conn->flags |= CONN_FLAG_WRITE_BARRIER; + else + conn->flags &= ~CONN_FLAG_WRITE_BARRIER; if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); else @@ -247,13 +251,35 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD conn->conn_handler = NULL; } + /* Normally we execute the readable event first, and the writable + * event laster. This is useful as sometimes we may be able + * to serve the reply of a query immediately after processing the + * query. + * + * However if WRITE_BARRIER is set in the mask, our application is + * asking us to do the reverse: never fire the writable event + * after the readable. In such a case, we invert the calls. + * This is useful when, for instance, we want to do things + * in the beforeSleep() hook, like fsynching a file to disk, + * before replying to a client. */ + int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; + + int call_write = (mask & AE_WRITABLE) && conn->write_handler; + int call_read = (mask & AE_READABLE) && conn->read_handler; + /* Handle normal I/O flows */ - if ((mask & AE_READABLE) && conn->read_handler) { + if (!invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } - if ((mask & AE_WRITABLE) && conn->write_handler) { + /* Fire the writable event. */ + if (call_write) { if (!callHandler(conn, conn->write_handler)) return; } + /* If we have to invert the call, fire the readable event now + * after the writable one. */ + if (invert && call_read) { + if (!callHandler(conn, conn->read_handler)) return; + } } static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { diff --git a/src/connection.h b/src/connection.h index 3fdcddebe..97622f8d6 100644 --- a/src/connection.h +++ b/src/connection.h @@ -47,6 +47,7 @@ typedef enum { #define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */ #define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */ +#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); @@ -57,7 +58,7 @@ typedef struct ConnectionType { int (*read)(struct connection *conn, void *buf, size_t buf_len); void (*close)(struct connection *conn); int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); - int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler); + int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); const char *(*get_last_error)(struct connection *conn); int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); @@ -144,7 +145,7 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) { * If NULL, the existing handler is removed. */ static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) { - return conn->type->set_write_handler(conn, func); + return conn->type->set_write_handler(conn, func, 0); } /* Register a read handler, to be called when the connection is readable. @@ -154,6 +155,15 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu return conn->type->set_read_handler(conn, func); } +/* Set a write handler, and possibly enable a write barrier, this flag is + * cleared when write handler is changed or removed. + * With barroer enabled, we never fire the event if the read handler already + * fired in the same event loop iteration. Useful when you want to persist + * things to disk before sending replies, and want to do that in a group fashion. */ +static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) { + return conn->type->set_write_handler(conn, func, barrier); +} + static inline void connClose(connection *conn) { conn->type->close(conn); } diff --git a/src/debug.c b/src/debug.c index a2d61d8ab..c1546c4c1 100644 --- a/src/debug.c +++ b/src/debug.c @@ -319,6 +319,7 @@ void debugCommand(client *c) { "SDSLEN <key> -- Show low level SDS string info representing key and value.", "SEGFAULT -- Crash the server with sigsegv.", "SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.", +"AOF-FLUSH-SLEEP <microsec> -- Server will sleep before flushing the AOF, this is used for testing", "SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.", "STRUCTSIZE -- Return the size of different Redis core C structures.", "ZIPLIST <key> -- Show low level info about the ziplist encoding.", @@ -595,6 +596,11 @@ NULL { server.active_expire_enabled = atoi(c->argv[2]->ptr); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"aof-flush-sleep") && + c->argc == 3) + { + server.aof_flush_sleep = atoi(c->argv[2]->ptr); + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") && c->argc == 3) { diff --git a/src/networking.c b/src/networking.c index d52eb1eba..97035c931 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1305,19 +1305,18 @@ int handleClientsWithPendingWrites(void) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - int ae_flags = AE_WRITABLE; + int ae_barrier = 0; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, * so that in the middle of receiving the query, and serving it * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. AE_BARRIER ensures that. */ + * actual fsync of AOF to disk. the write barrier ensures that. */ if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { - ae_flags |= AE_BARRIER; + ae_barrier = 1; } - /* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */ - if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) { + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { freeClientAsync(c); } } diff --git a/src/server.c b/src/server.c index f05ce3151..0f5837bc7 100644 --- a/src/server.c +++ b/src/server.c @@ -2048,6 +2048,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ + tlsProcessPendingData(); + /* If tls still has pending unread data don't sleep at all. */ + aeDontWait(server.el, tlsHasPendingData()); + /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients @@ -2093,11 +2098,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); - /* TODO: How do i handle write barriers flag */ - tlsProcessPendingData(); - /* If tls already has pending unread data don't sleep at all. */ - aeDontWait(server.el, tlsHasPendingData()); - /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); @@ -2286,6 +2286,7 @@ void initServerConfig(void) { server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; server.aof_rewrite_base_size = 0; server.aof_rewrite_scheduled = 0; + server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; diff --git a/src/server.h b/src/server.h index 89f219033..df5e42c41 100644 --- a/src/server.h +++ b/src/server.h @@ -1192,6 +1192,7 @@ struct redisServer { off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */ + int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ @@ -361,18 +361,47 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { conn->c.conn_handler = NULL; break; case CONN_STATE_CONNECTED: - if ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)) { + { + int call_read = ((mask & AE_READABLE) && conn->c.read_handler) || + ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)); + int call_write = ((mask & AE_WRITABLE) && conn->c.write_handler) || + ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)); + + /* Normally we execute the readable event first, and the writable + * event laster. This is useful as sometimes we may be able + * to serve the reply of a query immediately after processing the + * query. + * + * However if WRITE_BARRIER is set in the mask, our application is + * asking us to do the reverse: never fire the writable event + * after the readable. In such a case, we invert the calls. + * This is useful when, for instance, we want to do things + * in the beforeSleep() hook, like fsynching a file to disk, + * before replying to a client. */ + int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER; + + if (!invert && call_read) { + conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE; + if (!callHandler((connection *) conn, conn->c.read_handler)) return; + } + + /* Fire the writable event. */ + if (call_write) { conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ; if (!callHandler((connection *) conn, conn->c.write_handler)) return; } - if ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)) { + /* If we have to invert the call, fire the readable event now + * after the writable one. */ + if (invert && call_read) { conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE; if (!callHandler((connection *) conn, conn->c.read_handler)) return; } - if ((mask & AE_READABLE) && conn->c.read_handler) { - if (!callHandler((connection *) conn, conn->c.read_handler)) return; + /* If SSL has pending that, already read from the socket, we're at + * risk of not calling the read handler again, make sure to add it + * to a list of pending connection that should be handled anyway. */ + if ((mask & AE_READABLE)) { if (SSL_has_pending(conn->ssl)) { if (!conn->pending_list_node) { listAddNodeTail(pending_list, conn); @@ -384,10 +413,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { } } - if ((mask & AE_WRITABLE) && conn->c.write_handler) { - if (!callHandler((connection *) conn, conn->c.write_handler)) return; - } break; + } default: break; } @@ -535,8 +562,12 @@ static const char *connTLSGetLastError(connection *conn_) { return NULL; } -int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func) { +int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { conn->write_handler = func; + if (barrier) + conn->flags |= CONN_FLAG_WRITE_BARRIER; + else + conn->flags &= ~CONN_FLAG_WRITE_BARRIER; updateSSLEvent((tls_connection *) conn); return C_OK; } |