summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-05-14 11:28:08 +0200
committerantirez <antirez@gmail.com>2020-05-14 11:28:08 +0200
commitc38fd1f66152109a2d2801b1d181cdc1901d6b3b (patch)
treea8ebb2dce4cd47799789934f5b4895f28d750de6
parentdec6fd3adc59ecb6e3dc8320c448e9f20ea00b9d (diff)
parentbc4667acbcbeced700d0f64735e8ba8c7ffb8357 (diff)
downloadredis-c38fd1f66152109a2d2801b1d181cdc1901d6b3b.tar.gz
Merge branch 'free_clients_during_loading' into unstable
-rw-r--r--src/ae.c10
-rw-r--r--src/ae.h9
-rw-r--r--src/connection.h2
-rw-r--r--src/networking.c22
-rw-r--r--src/server.c43
-rw-r--r--src/server.h5
-rw-r--r--src/tls.c7
-rw-r--r--tests/integration/rdb.tcl52
8 files changed, 126 insertions, 24 deletions
diff --git a/src/ae.c b/src/ae.c
index 1bf6cbfbf..379cfd1e6 100644
--- a/src/ae.c
+++ b/src/ae.c
@@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
+ * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
@@ -428,6 +429,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
tvp = &tv;
}
+ if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
+ eventLoop->beforesleep(eventLoop);
+
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
@@ -522,9 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) {
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
- if (eventLoop->beforesleep != NULL)
- eventLoop->beforesleep(eventLoop);
- aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
+ aeProcessEvents(eventLoop, AE_ALL_EVENTS|
+ AE_CALL_BEFORE_SLEEP|
+ AE_CALL_AFTER_SLEEP);
}
}
diff --git a/src/ae.h b/src/ae.h
index 9acd72434..63b306615 100644
--- a/src/ae.h
+++ b/src/ae.h
@@ -47,11 +47,12 @@
things to disk before sending replies, and want
to do that in a group fashion. */
-#define AE_FILE_EVENTS 1
-#define AE_TIME_EVENTS 2
+#define AE_FILE_EVENTS (1<<0)
+#define AE_TIME_EVENTS (1<<1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
-#define AE_DONT_WAIT 4
-#define AE_CALL_AFTER_SLEEP 8
+#define AE_DONT_WAIT (1<<2)
+#define AE_CALL_BEFORE_SLEEP (1<<3)
+#define AE_CALL_AFTER_SLEEP (1<<4)
#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1
diff --git a/src/connection.h b/src/connection.h
index db09dfd83..0fd6c5f24 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len);
/* Helpers for tls special considerations */
int tlsHasPendingData();
-void tlsProcessPendingData();
+int tlsProcessPendingData();
#endif /* __REDIS_CONNECTION_H */
diff --git a/src/networking.c b/src/networking.c
index 75c0c16b1..671e374f4 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) {
pthread_mutex_unlock(&async_free_queue_mutex);
}
-void freeClientsInAsyncFreeQueue(void) {
+/* Free the clietns marked as CLOSE_ASAP, return the number of clients
+ * freed. */
+int freeClientsInAsyncFreeQueue(void) {
+ int freed = listLength(server.clients_to_close);
while (listLength(server.clients_to_close)) {
listNode *ln = listFirst(server.clients_to_close);
client *c = listNodeValue(ln);
@@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) {
freeClient(c);
listDelNode(server.clients_to_close,ln);
}
+ return freed;
}
/* Return a client by ID, or NULL if the client ID is not in the set
@@ -2852,9 +2856,8 @@ int clientsArePaused(void) {
* write, close sequence needed to serve a client.
*
* The function returns the total number of events processed. */
-int processEventsWhileBlocked(void) {
+void processEventsWhileBlocked(void) {
int iterations = 4; /* See the function top-comment. */
- int count = 0;
/* Note: when we are processing events while blocked (for instance during
* busy Lua scripts), we set a global flag. When such flag is set, we
@@ -2862,14 +2865,17 @@ int processEventsWhileBlocked(void) {
* See https://github.com/antirez/redis/issues/6988 for more info. */
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
- int events = 0;
- events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
- events += handleClientsWithPendingWrites();
+ long long startval = server.events_processed_while_blocked;
+ long long ae_events = aeProcessEvents(server.el,
+ AE_FILE_EVENTS|AE_DONT_WAIT|
+ AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
+ /* Note that server.events_processed_while_blocked will also get
+ * incremeted by callbacks called by the event loop handlers. */
+ server.events_processed_while_blocked += ae_events;
+ long long events = server.events_processed_while_blocked - startval;
if (!events) break;
- count += events;
}
ProcessingEventsWhileBlocked = 0;
- return count;
}
/* ==========================================================================
diff --git a/src/server.c b/src/server.c
index 847a6a95a..5bc4666ee 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2087,12 +2087,40 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
return 1000/server.hz;
}
+extern int ProcessingEventsWhileBlocked;
+
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
- * for ready file descriptors. */
+ * for ready file descriptors.
+ *
+ * Note: This function is (currently) called from two functions:
+ * 1. aeMain - The main server loop
+ * 2. processEventsWhileBlocked - Process clients during RDB/AOF load
+ *
+ * If it was called from processEventsWhileBlocked we don't want
+ * to perform all actions (For example, we don't want to expire
+ * keys), but we do need to perform some actions.
+ *
+ * The most important is freeClientsInAsyncFreeQueue but we also
+ * call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
+ /* Just call a subset of vital functions in case we are re-entering
+ * the event loop from processEventsWhileBlocked(). Note that in this
+ * case we keep track of the number of events we are processing, since
+ * processEventsWhileBlocked() wants to stop ASAP if there are no longer
+ * events to handle. */
+ if (ProcessingEventsWhileBlocked) {
+ uint64_t processed = 0;
+ processed += handleClientsWithPendingReadsUsingThreads();
+ processed += tlsProcessPendingData();
+ processed += handleClientsWithPendingWrites();
+ processed += freeClientsInAsyncFreeQueue();
+ server.events_processed_while_blocked += processed;
+ return;
+ }
+
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
@@ -2171,7 +2199,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* the different events callbacks. */
void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
- if (moduleCount()) moduleAcquireGIL();
+
+ if (!ProcessingEventsWhileBlocked) {
+ if (moduleCount()) moduleAcquireGIL();
+ }
}
/* =========================== Server initialization ======================== */
@@ -2737,6 +2768,7 @@ void initServer(void) {
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
+ server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) {
@@ -2879,6 +2911,11 @@ void initServer(void) {
"blocked clients subsystem.");
}
+ /* Register before and after sleep handlers (note this needs to be done
+ * before loading persistence since it is used by processEventsWhileBlocked. */
+ aeSetBeforeSleepProc(server.el,beforeSleep);
+ aeSetAfterSleepProc(server.el,afterSleep);
+
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
@@ -5135,8 +5172,6 @@ int main(int argc, char **argv) {
}
redisSetCpuAffinity(server.server_cpulist);
- aeSetBeforeSleepProc(server.el,beforeSleep);
- aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
diff --git a/src/server.h b/src/server.h
index 59cf1370e..55ee2d300 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1095,6 +1095,7 @@ struct redisServer {
queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */
+ long long events_processed_while_blocked; /* processEventsWhileBlocked() */
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
@@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...);
void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv);
unsigned long getClientOutputBufferMemoryUsage(client *c);
-void freeClientsInAsyncFreeQueue(void);
+int freeClientsInAsyncFreeQueue(void);
void asyncCloseClientOnOutputBufferLimitReached(client *c);
int getClientType(client *c);
int getClientTypeByName(char *name);
@@ -1662,7 +1663,7 @@ void disconnectSlaves(void);
int listenToPort(int port, int *fds, int *count);
void pauseClients(mstime_t duration);
int clientsArePaused(void);
-int processEventsWhileBlocked(void);
+void processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void);
diff --git a/src/tls.c b/src/tls.c
index c18aafebe..ee85bd302 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -768,15 +768,17 @@ int tlsHasPendingData() {
return listLength(pending_list) > 0;
}
-void tlsProcessPendingData() {
+int tlsProcessPendingData() {
listIter li;
listNode *ln;
+ int processed = listLength(pending_list);
listRewind(pending_list,&li);
while((ln = listNext(&li))) {
tls_connection *conn = listNodeValue(ln);
tlsHandleEvent(conn, AE_READABLE);
}
+ return processed;
}
#else /* USE_OPENSSL */
@@ -804,7 +806,8 @@ int tlsHasPendingData() {
return 0;
}
-void tlsProcessPendingData() {
+int tlsProcessPendingData() {
+ return 0;
}
#endif
diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl
index b364291ee..123e9c8b6 100644
--- a/tests/integration/rdb.tcl
+++ b/tests/integration/rdb.tcl
@@ -128,4 +128,56 @@ start_server {} {
# make sure the server is still writable
r set x xx
}
+}
+
+test {client freed during loading} {
+ start_server [list overrides [list key-load-delay 10 rdbcompression no]] {
+ # create a big rdb that will take long to load. it is important
+ # for keys to be big since the server processes events only once in 2mb.
+ # 100mb of rdb, 100k keys will load in more than 1 second
+ r debug populate 100000 key 1000
+
+ catch {
+ r debug restart
+ }
+
+ set stdout [srv 0 stdout]
+ while 1 {
+ # check that the new server actually started and is ready for connections
+ if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} {
+ break
+ }
+ after 10
+ }
+ # make sure it's still loading
+ assert_equal [s loading] 1
+
+ # connect and disconnect 10 clients
+ set clients {}
+ for {set j 0} {$j < 10} {incr j} {
+ lappend clients [redis_deferring_client]
+ }
+ foreach rd $clients {
+ $rd debug log bla
+ }
+ foreach rd $clients {
+ $rd read
+ }
+ foreach rd $clients {
+ $rd close
+ }
+
+ # make sure the server freed the clients
+ wait_for_condition 100 100 {
+ [s connected_clients] < 3
+ } else {
+ fail "clients didn't disconnect"
+ }
+
+ # make sure it's still loading
+ assert_equal [s loading] 1
+
+ # no need to keep waiting for loading to complete
+ exec kill [srv 0 pid]
+ }
} \ No newline at end of file