summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-03-31 15:58:54 +0200
committerantirez <antirez@gmail.com>2019-05-06 18:02:51 +0200
commita2245f8ff146629159d8c52d60713a262fa1b69a (patch)
tree952d4c6e92d1418b1f7e6b096a5f7d69b5e1c6ae
parentdd5b105c73a02389987e457cebbeaa801ba16977 (diff)
downloadredis-a2245f8ff146629159d8c52d60713a262fa1b69a.tar.gz
Threaded IO: read side WIP 2.
-rw-r--r--src/networking.c30
1 files changed, 23 insertions, 7 deletions
diff --git a/src/networking.c b/src/networking.c
index fd4e990f4..7d8470489 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -2496,13 +2496,16 @@ int processEventsWhileBlocked(void) {
int tio_debug = 0;
-#define SERVER_MAX_IO_THREADS 32
+#define IO_THREADS_MAX_NUM 128
+#define IO_THREADS_OP_READ 0
+#define IO_THREADS_OP_WRITE 1
-pthread_t io_threads[SERVER_MAX_IO_THREADS];
-pthread_mutex_t io_threads_mutex[SERVER_MAX_IO_THREADS];
-_Atomic unsigned long io_threads_pending[SERVER_MAX_IO_THREADS];
-int io_threads_active;
-list *io_threads_list[SERVER_MAX_IO_THREADS];
+pthread_t io_threads[IO_THREADS_MAX_NUM];
+pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
+_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
+int io_threads_active; /* Are the threads currently spinning waiting I/O? */
+int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
+list *io_threads_list[IO_THREADS_MAX_NUM];
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
@@ -2533,7 +2536,11 @@ void *IOThreadMain(void *myid) {
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
- writeToClient(c->fd,c,0);
+ if (io_threads_op == IO_THREADS_OP_WRITE) {
+ writeToClient(c->fd,c,0);
+ } else {
+ readQueryFromClient(NULL,c->fd,c,0);
+ }
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
@@ -2550,6 +2557,12 @@ void initThreadedIO(void) {
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
+ if (server.io_threads_num > IO_THREADS_MAX_NUM) {
+ serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
+ "The maximum number is %d.", IO_THREADS_MAX_NUM);
+ exit(1);
+ }
+
/* Spawn the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
pthread_t tid;
@@ -2684,3 +2697,6 @@ int postponeClientRead(client *c) {
return 0;
}
}
+
+int handleClientsWithPendingReadsUsingThreads(void) {
+}