From a2245f8ff146629159d8c52d60713a262fa1b69a Mon Sep 17 00:00:00 2001 From: antirez Date: Sun, 31 Mar 2019 15:58:54 +0200 Subject: Threaded IO: read side WIP 2. --- src/networking.c | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/networking.c b/src/networking.c index fd4e990f4..7d8470489 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2496,13 +2496,16 @@ int processEventsWhileBlocked(void) { int tio_debug = 0; -#define SERVER_MAX_IO_THREADS 32 +#define IO_THREADS_MAX_NUM 128 +#define IO_THREADS_OP_READ 0 +#define IO_THREADS_OP_WRITE 1 -pthread_t io_threads[SERVER_MAX_IO_THREADS]; -pthread_mutex_t io_threads_mutex[SERVER_MAX_IO_THREADS]; -_Atomic unsigned long io_threads_pending[SERVER_MAX_IO_THREADS]; -int io_threads_active; -list *io_threads_list[SERVER_MAX_IO_THREADS]; +pthread_t io_threads[IO_THREADS_MAX_NUM]; +pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; +_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; +int io_threads_active; /* Are the threads currently spinning waiting I/O? */ +int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ +list *io_threads_list[IO_THREADS_MAX_NUM]; void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is @@ -2533,7 +2536,11 @@ void *IOThreadMain(void *myid) { listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); + if (io_threads_op == IO_THREADS_OP_WRITE) { + writeToClient(c->fd,c,0); + } else { + readQueryFromClient(NULL,c->fd,c,0); + } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; @@ -2550,6 +2557,12 @@ void initThreadedIO(void) { * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; + if (server.io_threads_num > IO_THREADS_MAX_NUM) { + serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " + "The maximum number is %d.", IO_THREADS_MAX_NUM); + exit(1); + } + /* Spawn the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { pthread_t tid; @@ -2684,3 +2697,6 @@ int postponeClientRead(client *c) { return 0; } } + +int handleClientsWithPendingReadsUsingThreads(void) { +} -- cgit v1.2.1