summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-10-24 08:35:05 +0200
committerantirez <antirez@gmail.com>2019-04-30 15:16:06 +0200
commit387c7cda18500bbb8423b563e32cbb961ebf81e0 (patch)
treef4debb595f19a4e2e6c477b046563786f5b5d6bc
parent843de8b786562d8d77c78d83a971060adc61f77a (diff)
downloadredis-387c7cda18500bbb8423b563e32cbb961ebf81e0.tar.gz
Threaded IO: implement handleClientsWithPendingWritesUsingThreads().
This is just an experiment for now, there are a couple of race conditions, mostly harmless for the performance gain experiment that this commit represents so far. The general idea here is to take Redis single threaded and instead fan-out on expansive kernel calls: write(2) in this case, but the same concept could be easily implemented for read(2) and protcol parsing. However just threading writes like in this commit, is enough to evaluate if the approach is sounding.
-rw-r--r--src/networking.c156
-rw-r--r--src/server.c11
-rw-r--r--src/server.h4
3 files changed, 162 insertions, 9 deletions
diff --git a/src/networking.c b/src/networking.c
index ffb435625..3958e4f5e 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1065,9 +1065,17 @@ void freeClient(client *c) {
* a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) {
+ /* We need to handle concurrent access to the server.clients_to_close list
+ * only in the freeClientAsync() function, since it's the only function that
+ * may access the list while Redis uses I/O threads. All the other accesses
+ * are in the context of the main thread while the other threads are
+ * idle. */
+ static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
+ pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c);
+ pthread_mutex_unlock(&async_free_queue_mutex);
}
void freeClientsInAsyncFreeQueue(void) {
@@ -1091,7 +1099,12 @@ client *lookupClientByID(uint64_t id) {
}
/* Write data in output buffers to client. Return C_OK if the client
- * is still valid after the call, C_ERR if it was freed. */
+ * is still valid after the call, C_ERR if it was freed because of some
+ * error.
+ *
+ * This function is called by threads, but always with handler_installed
+ * set to 0. So when handler_installed is set to 0 the function must be
+ * thread safe. */
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
@@ -1153,14 +1166,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
+ /* FIXME: Fixme, use atomic var for this. */
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
- serverLog(LL_VERBOSE,
- "Error writing to client: %s", strerror(errno));
- freeClient(c);
+ // serverLog(LL_VERBOSE,
+ // "Error writing to client: %s", strerror(errno));
+ freeClientAsync(c);
return C_ERR;
}
}
@@ -1173,11 +1187,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
+ /* Note that writeToClient() is called in a threaded way, but
+ * adDeleteFileEvent() is not thread safe: however writeToClient()
+ * is always called with handler_installed set to 0 from threads
+ * so we are fine. */
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
- freeClient(c);
+ freeClientAsync(c);
return C_ERR;
}
}
@@ -2452,3 +2470,131 @@ int processEventsWhileBlocked(void) {
}
return count;
}
+
+/* =============================================================================
+ * Threaded I/O
+ * =========================================================================== */
+
+#define SERVER_MAX_IO_THREADS 32
+
+pthread_t io_threads[SERVER_MAX_IO_THREADS];
+pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER;
+pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER;
+int io_threads_done = 0; /* Number of threads that completed the work. */
+int io_threads_idle = 0; /* Number of threads in idle state ready to go. */
+list *io_threads_list[SERVER_MAX_IO_THREADS];
+
+void *IOThreadMain(void *myid) {
+ /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
+ * used by the thread to just manipulate a single sub-array of clients. */
+ long id = (unsigned long)myid;
+
+ while(1) {
+ /* ... Wait for start ... */
+ pthread_mutex_lock(&io_threads_idle_mutex);
+ io_threads_idle++;
+ pthread_cond_signal(&io_threads_idle_cond);
+ printf("[%ld] Waiting start...\n", id);
+ pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex);
+ printf("[%ld] Started\n", id);
+ pthread_mutex_unlock(&io_threads_idle_mutex);
+ printf("%d to handle\n", (int)listLength(io_threads_list[id]));
+
+ /* ... Process ... */
+ listIter li;
+ listNode *ln;
+ listRewind(io_threads_list[id],&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ writeToClient(c->fd,c,0);
+ }
+ listEmpty(io_threads_list[id]);
+
+ /* Report success. */
+ pthread_mutex_lock(&io_threads_done_mutex);
+ io_threads_done++;
+ pthread_cond_signal(&io_threads_done_cond);
+ pthread_mutex_unlock(&io_threads_done_mutex);
+ printf("[%ld] Done\n", id);
+ }
+}
+
+/* Initialize the data structures needed for threaded I/O. */
+void initThreadedIO(void) {
+ pthread_t tid;
+
+ server.io_threads_num = 4;
+ for (int i = 0; i < server.io_threads_num; i++) {
+ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
+ serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
+ exit(1);
+ }
+ io_threads[i] = tid;
+ io_threads_list[i] = listCreate();
+ }
+}
+
+int handleClientsWithPendingWritesUsingThreads(void) {
+ int processed = listLength(server.clients_pending_write);
+ if (processed == 0) return 0; /* Return ASAP if there are no clients. */
+
+ printf("%d TOTAL\n", processed);
+
+ /* Wait for all threads to be ready. */
+ pthread_mutex_lock(&io_threads_idle_mutex);
+ while(io_threads_idle < server.io_threads_num) {
+ pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex);
+ }
+ printf("All threads are idle: %d\n", io_threads_idle);
+ io_threads_idle = 0;
+ pthread_mutex_unlock(&io_threads_idle_mutex);
+
+ /* Distribute the clients across N different lists. */
+ listIter li;
+ listNode *ln;
+ listRewind(server.clients_pending_write,&li);
+ int item_id = 0;
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ c->flags &= ~CLIENT_PENDING_WRITE;
+ int target_id = item_id % server.io_threads_num;
+ listAddNodeTail(io_threads_list[target_id],c);
+ item_id++;
+ }
+
+ /* Start all threads. */
+ printf("Send start condition\n");
+ pthread_mutex_lock(&io_threads_done_mutex);
+ io_threads_done = 0;
+ pthread_cond_broadcast(&io_threads_start_cond);
+ pthread_mutex_unlock(&io_threads_done_mutex);
+
+ /* Wait for all threads to end their work. */
+ pthread_mutex_lock(&io_threads_done_mutex);
+ while(io_threads_done < server.io_threads_num) {
+ pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex);
+ }
+ pthread_mutex_unlock(&io_threads_done_mutex);
+ printf("All threads finshed\n");
+
+ /* Run the list of clients again to install the write handler where
+ * needed. */
+ listRewind(server.clients_pending_write,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+
+ /* Install the write handler if there are pending writes in some
+ * of the clients. */
+ if (clientHasPendingReplies(c) &&
+ aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
+ sendReplyToClient, c) == AE_ERR)
+ {
+ freeClientAsync(c);
+ }
+ }
+ listEmpty(server.clients_pending_write);
+ return processed;
+}
diff --git a/src/server.c b/src/server.c
index fb5d679cd..c437880d5 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1981,9 +1981,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
flushAppendOnlyFile(0);
}
- /* Close clients that need to be closed asynchronous */
- freeClientsInAsyncFreeQueue();
-
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect.*/
@@ -2075,7 +2072,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
- handleClientsWithPendingWrites();
+ /* XXX: Put a condition based on number of waiting clients: if we
+ * have less than a given number of clients, use non threaded code. */
+ handleClientsWithPendingWritesUsingThreads();
+
+ /* Close clients that need to be closed asynchronous */
+ freeClientsInAsyncFreeQueue();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
@@ -2861,6 +2863,7 @@ void initServer(void) {
slowlogInit();
latencyMonitorInit();
bioInit();
+ initThreadedIO();
server.initial_memory_usage = zmalloc_used_memory();
}
diff --git a/src/server.h b/src/server.h
index dfd9f7698..d2a563c96 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1062,6 +1062,8 @@ struct redisServer {
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
+ int io_threads_num; /* Number of IO threads to use. */
+
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
off_t loading_total_bytes;
@@ -1576,12 +1578,14 @@ void pauseClients(mstime_t duration);
int clientsArePaused(void);
int processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void);
+int handleClientsWithPendingWritesUsingThreads(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
+void initThreadedIO(void);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)