summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/networking.c156
-rw-r--r--src/server.c11
-rw-r--r--src/server.h4
3 files changed, 162 insertions, 9 deletions
diff --git a/src/networking.c b/src/networking.c
index ffb435625..3958e4f5e 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1065,9 +1065,17 @@ void freeClient(client *c) {
* a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) {
+ /* We need to handle concurrent access to the server.clients_to_close list
+ * only in the freeClientAsync() function, since it's the only function that
+ * may access the list while Redis uses I/O threads. All the other accesses
+ * are in the context of the main thread while the other threads are
+ * idle. */
+ static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
+ pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c);
+ pthread_mutex_unlock(&async_free_queue_mutex);
}
void freeClientsInAsyncFreeQueue(void) {
@@ -1091,7 +1099,12 @@ client *lookupClientByID(uint64_t id) {
}
/* Write data in output buffers to client. Return C_OK if the client
- * is still valid after the call, C_ERR if it was freed. */
+ * is still valid after the call, C_ERR if it was freed because of some
+ * error.
+ *
+ * This function is called by threads, but always with handler_installed
+ * set to 0. So when handler_installed is set to 0 the function must be
+ * thread safe. */
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
@@ -1153,14 +1166,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
+ /* FIXME: Fixme, use atomic var for this. */
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
- serverLog(LL_VERBOSE,
- "Error writing to client: %s", strerror(errno));
- freeClient(c);
+ // serverLog(LL_VERBOSE,
+ // "Error writing to client: %s", strerror(errno));
+ freeClientAsync(c);
return C_ERR;
}
}
@@ -1173,11 +1187,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
+ /* Note that writeToClient() is called in a threaded way, but
+ * adDeleteFileEvent() is not thread safe: however writeToClient()
+ * is always called with handler_installed set to 0 from threads
+ * so we are fine. */
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
- freeClient(c);
+ freeClientAsync(c);
return C_ERR;
}
}
@@ -2452,3 +2470,131 @@ int processEventsWhileBlocked(void) {
}
return count;
}
+
+/* =============================================================================
+ * Threaded I/O
+ * =========================================================================== */
+
+#define SERVER_MAX_IO_THREADS 32
+
+pthread_t io_threads[SERVER_MAX_IO_THREADS];
+pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER;
+pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER;
+int io_threads_done = 0; /* Number of threads that completed the work. */
+int io_threads_idle = 0; /* Number of threads in idle state ready to go. */
+list *io_threads_list[SERVER_MAX_IO_THREADS];
+
+void *IOThreadMain(void *myid) {
+ /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
+ * used by the thread to just manipulate a single sub-array of clients. */
+ long id = (unsigned long)myid;
+
+ while(1) {
+ /* ... Wait for start ... */
+ pthread_mutex_lock(&io_threads_idle_mutex);
+ io_threads_idle++;
+ pthread_cond_signal(&io_threads_idle_cond);
+ printf("[%ld] Waiting start...\n", id);
+ pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex);
+ printf("[%ld] Started\n", id);
+ pthread_mutex_unlock(&io_threads_idle_mutex);
+ printf("%d to handle\n", (int)listLength(io_threads_list[id]));
+
+ /* ... Process ... */
+ listIter li;
+ listNode *ln;
+ listRewind(io_threads_list[id],&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ writeToClient(c->fd,c,0);
+ }
+ listEmpty(io_threads_list[id]);
+
+ /* Report success. */
+ pthread_mutex_lock(&io_threads_done_mutex);
+ io_threads_done++;
+ pthread_cond_signal(&io_threads_done_cond);
+ pthread_mutex_unlock(&io_threads_done_mutex);
+ printf("[%ld] Done\n", id);
+ }
+}
+
+/* Initialize the data structures needed for threaded I/O. */
+void initThreadedIO(void) {
+ pthread_t tid;
+
+ server.io_threads_num = 4;
+ for (int i = 0; i < server.io_threads_num; i++) {
+ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
+ serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
+ exit(1);
+ }
+ io_threads[i] = tid;
+ io_threads_list[i] = listCreate();
+ }
+}
+
+int handleClientsWithPendingWritesUsingThreads(void) {
+ int processed = listLength(server.clients_pending_write);
+ if (processed == 0) return 0; /* Return ASAP if there are no clients. */
+
+ printf("%d TOTAL\n", processed);
+
+ /* Wait for all threads to be ready. */
+ pthread_mutex_lock(&io_threads_idle_mutex);
+ while(io_threads_idle < server.io_threads_num) {
+ pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex);
+ }
+ printf("All threads are idle: %d\n", io_threads_idle);
+ io_threads_idle = 0;
+ pthread_mutex_unlock(&io_threads_idle_mutex);
+
+ /* Distribute the clients across N different lists. */
+ listIter li;
+ listNode *ln;
+ listRewind(server.clients_pending_write,&li);
+ int item_id = 0;
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ c->flags &= ~CLIENT_PENDING_WRITE;
+ int target_id = item_id % server.io_threads_num;
+ listAddNodeTail(io_threads_list[target_id],c);
+ item_id++;
+ }
+
+ /* Start all threads. */
+ printf("Send start condition\n");
+ pthread_mutex_lock(&io_threads_done_mutex);
+ io_threads_done = 0;
+ pthread_cond_broadcast(&io_threads_start_cond);
+ pthread_mutex_unlock(&io_threads_done_mutex);
+
+ /* Wait for all threads to end their work. */
+ pthread_mutex_lock(&io_threads_done_mutex);
+ while(io_threads_done < server.io_threads_num) {
+ pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex);
+ }
+ pthread_mutex_unlock(&io_threads_done_mutex);
+ printf("All threads finshed\n");
+
+ /* Run the list of clients again to install the write handler where
+ * needed. */
+ listRewind(server.clients_pending_write,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+
+ /* Install the write handler if there are pending writes in some
+ * of the clients. */
+ if (clientHasPendingReplies(c) &&
+ aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
+ sendReplyToClient, c) == AE_ERR)
+ {
+ freeClientAsync(c);
+ }
+ }
+ listEmpty(server.clients_pending_write);
+ return processed;
+}
diff --git a/src/server.c b/src/server.c
index fb5d679cd..c437880d5 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1981,9 +1981,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
flushAppendOnlyFile(0);
}
- /* Close clients that need to be closed asynchronous */
- freeClientsInAsyncFreeQueue();
-
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect.*/
@@ -2075,7 +2072,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
- handleClientsWithPendingWrites();
+ /* XXX: Put a condition based on number of waiting clients: if we
+ * have less than a given number of clients, use non threaded code. */
+ handleClientsWithPendingWritesUsingThreads();
+
+ /* Close clients that need to be closed asynchronous */
+ freeClientsInAsyncFreeQueue();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
@@ -2861,6 +2863,7 @@ void initServer(void) {
slowlogInit();
latencyMonitorInit();
bioInit();
+ initThreadedIO();
server.initial_memory_usage = zmalloc_used_memory();
}
diff --git a/src/server.h b/src/server.h
index dfd9f7698..d2a563c96 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1062,6 +1062,8 @@ struct redisServer {
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
+ int io_threads_num; /* Number of IO threads to use. */
+
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
off_t loading_total_bytes;
@@ -1576,12 +1578,14 @@ void pauseClients(mstime_t duration);
int clientsArePaused(void);
int processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void);
+int handleClientsWithPendingWritesUsingThreads(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
+void initThreadedIO(void);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)