summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2019-03-31 21:59:50 +0200
committerantirez <antirez@gmail.com>2019-05-06 18:02:51 +0200
commit63a0ffd36a99083b909e2110a7604fe335656a8d (patch)
tree362fcae4326f24752dc1556551650044fc0b97a8
parenta2245f8ff146629159d8c52d60713a262fa1b69a (diff)
downloadredis-63a0ffd36a99083b909e2110a7604fe335656a8d.tar.gz
Threaded IO: read side WIP 3.
-rw-r--r--src/networking.c59
-rw-r--r--src/server.c1
-rw-r--r--src/server.h1
3 files changed, 55 insertions, 6 deletions
diff --git a/src/networking.c b/src/networking.c
index 7d8470489..3a36badb8 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1711,12 +1711,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
- freeClient(c);
+ freeClientAsync(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
- freeClient(c);
+ freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
@@ -1739,7 +1739,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
- freeClient(c);
+ freeClientAsync(c);
return;
}
@@ -2538,8 +2538,10 @@ void *IOThreadMain(void *myid) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c->fd,c,0);
- } else {
+ } else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(NULL,c->fd,c,0);
+ } else {
+ serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
@@ -2632,7 +2634,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
- if (tio_debug) printf("%d TOTAL pending clients\n", processed);
+ if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
@@ -2649,6 +2651,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
+ io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 0; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
@@ -2661,7 +2664,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
pending += io_threads_pending[j];
if (pending == 0) break;
}
- if (tio_debug) printf("All threads finshed\n");
+ if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
@@ -2699,4 +2702,48 @@ int postponeClientRead(client *c) {
}
int handleClientsWithPendingReadsUsingThreads(void) {
+ if (!io_threads_active) return 0;
+ int processed = listLength(server.clients_pending_read);
+ if (processed == 0) return 0;
+
+ if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
+
+ /* Distribute the clients across N different lists. */
+ listIter li;
+ listNode *ln;
+ listRewind(server.clients_pending_read,&li);
+ int item_id = 0;
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ int target_id = item_id % server.io_threads_num;
+ listAddNodeTail(io_threads_list[target_id],c);
+ item_id++;
+ }
+
+ /* Give the start condition to the waiting threads, by setting the
+ * start condition atomic var. */
+ io_threads_op = IO_THREADS_OP_READ;
+ for (int j = 0; j < server.io_threads_num; j++) {
+ int count = listLength(io_threads_list[j]);
+ io_threads_pending[j] = count;
+ }
+
+ /* Wait for all threads to end their work. */
+ while(1) {
+ unsigned long pending = 0;
+ for (int j = 0; j < server.io_threads_num; j++)
+ pending += io_threads_pending[j];
+ if (pending == 0) break;
+ }
+ if (tio_debug) printf("I/O READ All threads finshed\n");
+
+ /* Run the list of clients again to process the new buffers. */
+ listRewind(server.clients_pending_read,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ c->flags &= ~CLIENT_PENDING_READ;
+ processInputBufferAndReplicate(c);
+ }
+ listEmpty(server.clients_pending_read);
+ return processed;
}
diff --git a/src/server.c b/src/server.c
index ef6b85c44..e0c48b097 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2092,6 +2092,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL();
+ handleClientsWithPendingReadsUsingThreads();
}
/* =========================== Server initialization ======================== */
diff --git a/src/server.h b/src/server.h
index dcfcb55fb..0d7882419 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1578,6 +1578,7 @@ int clientsArePaused(void);
int processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void);
+int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);