summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2020-05-10 19:13:47 +0300
committerOran Agra <oran@redislabs.com>2020-05-11 11:33:46 +0300
commit905e28ee87eb0bbea448ec28c11dc74991359bf5 (patch)
treed51ac23a7e47caddf0e2281470cad98a6e1d3ae6
parent1750513ac762ef1d2d488e3c7d79a1002a905508 (diff)
downloadredis-905e28ee87eb0bbea448ec28c11dc74991359bf5.tar.gz
fix redis 6.0 not freeing closed connections during loading.
This bug was introduced by a recent change in which readQueryFromClient is using freeClientAsync, and despite the fact that now freeClientsInAsyncFreeQueue is in beforeSleep, that's not enough since it's not called during loading in processEventsWhileBlocked. furthermore, afterSleep was called in that case but beforeSleep wasn't. This bug also caused slowness sine the level-triggered mode of epoll kept signaling these connections as readable causing us to keep doing connRead again and again for ll of these, which keep accumulating. now both before and after sleep are called, but not all of their actions are performed during loading, some are only reserved for the main loop. fixes issue #7215
-rw-r--r--src/ae.c5
-rw-r--r--src/server.c134
-rw-r--r--tests/integration/rdb.tcl52
3 files changed, 133 insertions, 58 deletions
diff --git a/src/ae.c b/src/ae.c
index 1bf6cbfbf..7aa204250 100644
--- a/src/ae.c
+++ b/src/ae.c
@@ -428,6 +428,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
tvp = &tv;
}
+ if (eventLoop->beforesleep != NULL)
+ eventLoop->beforesleep(eventLoop);
+
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
@@ -522,8 +525,6 @@ int aeWait(int fd, int mask, long long milliseconds) {
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
- if (eventLoop->beforesleep != NULL)
- eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
diff --git a/src/server.c b/src/server.c
index e2b4b6f3d..fd75f25d6 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2081,14 +2081,26 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
return 1000/server.hz;
}
+extern int ProcessingEventsWhileBlocked;
+
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
- * for ready file descriptors. */
+ * for ready file descriptors.
+ * Note: This function is (currently) called from two functions:
+ * 1. aeMain - The main server loop
+ * 2. processEventsWhileBlocked - Process clients during RDB/AOF load
+ * If it was called from processEventsWhileBlocked we don't want
+ * to perform all actions (For example, we don't want to expire
+ * keys), but we do need to perform some actions.
+ * The most important is freeClientsInAsyncFreeQueue but we also
+ * call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
- /* Handle precise timeouts of blocked clients. */
- handleBlockedClientsTimeout();
+ if (!ProcessingEventsWhileBlocked) {
+ /* Handle precise timeouts of blocked clients. */
+ handleBlockedClientsTimeout();
+ }
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
@@ -2099,54 +2111,56 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* If tls still has pending unread data don't sleep at all. */
aeSetDontWait(server.el, tlsHasPendingData());
- /* Call the Redis Cluster before sleep function. Note that this function
- * may change the state of Redis Cluster (from ok to fail or vice versa),
- * so it's a good idea to call it before serving the unblocked clients
- * later in this function. */
- if (server.cluster_enabled) clusterBeforeSleep();
-
- /* Run a fast expire cycle (the called function will return
- * ASAP if a fast cycle is not needed). */
- if (server.active_expire_enabled && server.masterhost == NULL)
- activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
-
- /* Unblock all the clients blocked for synchronous replication
- * in WAIT. */
- if (listLength(server.clients_waiting_acks))
- processClientsWaitingReplicas();
-
- /* Check if there are clients unblocked by modules that implement
- * blocking commands. */
- if (moduleCount()) moduleHandleBlockedClients();
-
- /* Try to process pending commands for clients that were just unblocked. */
- if (listLength(server.unblocked_clients))
- processUnblockedClients();
-
- /* Send all the slaves an ACK request if at least one client blocked
- * during the previous event loop iteration. Note that we do this after
- * processUnblockedClients(), so if there are multiple pipelined WAITs
- * and the just unblocked WAIT gets blocked again, we don't have to wait
- * a server cron cycle in absence of other event loop events. See #6623. */
- if (server.get_ack_from_slaves) {
- robj *argv[3];
-
- argv[0] = createStringObject("REPLCONF",8);
- argv[1] = createStringObject("GETACK",6);
- argv[2] = createStringObject("*",1); /* Not used argument. */
- replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
- decrRefCount(argv[0]);
- decrRefCount(argv[1]);
- decrRefCount(argv[2]);
- server.get_ack_from_slaves = 0;
- }
+ if (!ProcessingEventsWhileBlocked) {
+ /* Call the Redis Cluster before sleep function. Note that this function
+ * may change the state of Redis Cluster (from ok to fail or vice versa),
+ * so it's a good idea to call it before serving the unblocked clients
+ * later in this function. */
+ if (server.cluster_enabled) clusterBeforeSleep();
+
+ /* Run a fast expire cycle (the called function will return
+ * ASAP if a fast cycle is not needed). */
+ if (server.active_expire_enabled && server.masterhost == NULL)
+ activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
+
+ /* Unblock all the clients blocked for synchronous replication
+ * in WAIT. */
+ if (listLength(server.clients_waiting_acks))
+ processClientsWaitingReplicas();
+
+ /* Check if there are clients unblocked by modules that implement
+ * blocking commands. */
+ if (moduleCount()) moduleHandleBlockedClients();
+
+ /* Try to process pending commands for clients that were just unblocked. */
+ if (listLength(server.unblocked_clients))
+ processUnblockedClients();
+
+ /* Send all the slaves an ACK request if at least one client blocked
+ * during the previous event loop iteration. Note that we do this after
+ * processUnblockedClients(), so if there are multiple pipelined WAITs
+ * and the just unblocked WAIT gets blocked again, we don't have to wait
+ * a server cron cycle in absence of other event loop events. See #6623. */
+ if (server.get_ack_from_slaves) {
+ robj *argv[3];
+
+ argv[0] = createStringObject("REPLCONF",8);
+ argv[1] = createStringObject("GETACK",6);
+ argv[2] = createStringObject("*",1); /* Not used argument. */
+ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
+ decrRefCount(argv[0]);
+ decrRefCount(argv[1]);
+ decrRefCount(argv[2]);
+ server.get_ack_from_slaves = 0;
+ }
- /* Send the invalidation messages to clients participating to the
- * client side caching protocol in broadcasting (BCAST) mode. */
- trackingBroadcastInvalidationMessages();
+ /* Send the invalidation messages to clients participating to the
+ * client side caching protocol in broadcasting (BCAST) mode. */
+ trackingBroadcastInvalidationMessages();
- /* Write the AOF buffer on disk */
- flushAppendOnlyFile(0);
+ /* Write the AOF buffer on disk */
+ flushAppendOnlyFile(0);
+ }
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
@@ -2154,10 +2168,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
- /* Before we are going to sleep, let the threads access the dataset by
- * releasing the GIL. Redis main thread will not touch anything at this
- * time. */
- if (moduleCount()) moduleReleaseGIL();
+ if (!ProcessingEventsWhileBlocked) {
+ /* Before we are going to sleep, let the threads access the dataset by
+ * releasing the GIL. Redis main thread will not touch anything at this
+ * time. */
+ if (moduleCount()) moduleReleaseGIL();
+ }
}
/* This function is called immadiately after the event loop multiplexing
@@ -2165,7 +2181,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* the different events callbacks. */
void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
- if (moduleCount()) moduleAcquireGIL();
+
+ if (!ProcessingEventsWhileBlocked) {
+ if (moduleCount()) moduleAcquireGIL();
+ }
}
/* =========================== Server initialization ======================== */
@@ -2873,6 +2892,11 @@ void initServer(void) {
"blocked clients subsystem.");
}
+ /* Register before and after sleep handlers (note this needs to be done
+ * before loading persistence since it is used by processEventsWhileBlocked. */
+ aeSetBeforeSleepProc(server.el,beforeSleep);
+ aeSetAfterSleepProc(server.el,afterSleep);
+
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
@@ -5129,8 +5153,6 @@ int main(int argc, char **argv) {
}
redisSetCpuAffinity(server.server_cpulist);
- aeSetBeforeSleepProc(server.el,beforeSleep);
- aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl
index b364291ee..123e9c8b6 100644
--- a/tests/integration/rdb.tcl
+++ b/tests/integration/rdb.tcl
@@ -128,4 +128,56 @@ start_server {} {
# make sure the server is still writable
r set x xx
}
+}
+
+test {client freed during loading} {
+ start_server [list overrides [list key-load-delay 10 rdbcompression no]] {
+ # create a big rdb that will take long to load. it is important
+ # for keys to be big since the server processes events only once in 2mb.
+ # 100mb of rdb, 100k keys will load in more than 1 second
+ r debug populate 100000 key 1000
+
+ catch {
+ r debug restart
+ }
+
+ set stdout [srv 0 stdout]
+ while 1 {
+ # check that the new server actually started and is ready for connections
+ if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} {
+ break
+ }
+ after 10
+ }
+ # make sure it's still loading
+ assert_equal [s loading] 1
+
+ # connect and disconnect 10 clients
+ set clients {}
+ for {set j 0} {$j < 10} {incr j} {
+ lappend clients [redis_deferring_client]
+ }
+ foreach rd $clients {
+ $rd debug log bla
+ }
+ foreach rd $clients {
+ $rd read
+ }
+ foreach rd $clients {
+ $rd close
+ }
+
+ # make sure the server freed the clients
+ wait_for_condition 100 100 {
+ [s connected_clients] < 3
+ } else {
+ fail "clients didn't disconnect"
+ }
+
+ # make sure it's still loading
+ assert_equal [s loading] 1
+
+ # no need to keep waiting for loading to complete
+ exec kill [srv 0 pid]
+ }
} \ No newline at end of file