summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2020-01-13 12:46:40 +0100
committerGitHub <noreply@github.com>2020-01-13 12:46:40 +0100
commit24896427fc88e944d466874954503b1100263b65 (patch)
tree274e641c4d4c0d7d81f0cd6d992e7b16da5e06e3 /src/networking.c
parent8105f91a0202829c685da7967c2f60d477c2d1e9 (diff)
parent1398fac3f1bd190c572a695aa898d71d1573f7e5 (diff)
downloadredis-24896427fc88e944d466874954503b1100263b65.tar.gz
Merge pull request #6110 from soloestoy/enhance-io-threaded
Enhance IO Threaded: use main thread to handle read/write work
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c21
1 files changed, 18 insertions, 3 deletions
diff --git a/src/networking.c b/src/networking.c
index a558ae91a..f1a6b9910 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -2656,7 +2656,7 @@ pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_active; /* Are the threads currently spinning waiting I/O? */
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
-list *io_threads_list[IO_THREADS_MAX_NUM];
+list *io_threads_list[IO_THREADS_MAX_NUM+1];
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
@@ -2729,6 +2729,7 @@ void initThreadedIO(void) {
}
io_threads[i] = tid;
}
+ io_threads_list[server.io_threads_num] = listCreate();
}
void startThreadedIO(void) {
@@ -2800,7 +2801,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
- int target_id = item_id % server.io_threads_num;
+ int target_id = item_id % (server.io_threads_num+1);
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
@@ -2813,6 +2814,13 @@ int handleClientsWithPendingWritesUsingThreads(void) {
io_threads_pending[j] = count;
}
+ listRewind(io_threads_list[server.io_threads_num],&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ writeToClient(c->fd,c,0);
+ }
+ listEmpty(io_threads_list[server.io_threads_num]);
+
/* Wait for all threads to end their work. */
while(1) {
unsigned long pending = 0;
@@ -2877,7 +2885,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
- int target_id = item_id % server.io_threads_num;
+ int target_id = item_id % (server.io_threads_num+1);
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
@@ -2890,6 +2898,13 @@ int handleClientsWithPendingReadsUsingThreads(void) {
io_threads_pending[j] = count;
}
+ listRewind(io_threads_list[server.io_threads_num],&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ readQueryFromClient(NULL,c->fd,c,0);
+ }
+ listEmpty(io_threads_list[server.io_threads_num]);
+
/* Wait for all threads to end their work. */
while(1) {
unsigned long pending = 0;