summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-03-27 16:34:45 +0100
committerantirez <antirez@gmail.com>2020-03-27 16:35:03 +0100
commitdd7e61d77f8e74b2d502965cf43df8034cd7923e (patch)
treef5d1ac7e99e3d50f360752fcfaa4cafb4f9dda11
parent0e22cb2680db9f87fd232bc54419d538629edc2d (diff)
downloadredis-dd7e61d77f8e74b2d502965cf43df8034cd7923e.tar.gz
timeout.c created: move client timeouts code there.
-rw-r--r--src/Makefile2
-rw-r--r--src/blocked.c39
-rw-r--r--src/server.c128
-rw-r--r--src/server.h4
-rw-r--r--src/timeout.c192
5 files changed, 198 insertions, 167 deletions
diff --git a/src/Makefile b/src/Makefile
index bbfb06440..3f982cc8e 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -206,7 +206,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/blocked.c b/src/blocked.c
index 795985ea1..e3a803ae3 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -31,9 +31,6 @@
*
* API:
*
- * getTimeoutFromObjectOrReply() is just an utility function to parse a
- * timeout argument since blocking operations usually require a timeout.
- *
* blockClient() set the CLIENT_BLOCKED flag in the client, and set the
* specified block type 'btype' filed to one of BLOCKED_* macros.
*
@@ -67,42 +64,6 @@
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
-/* Get a timeout value from an object and store it into 'timeout'.
- * The final timeout is always stored as milliseconds as a time where the
- * timeout will expire, however the parsing is performed according to
- * the 'unit' that can be seconds or milliseconds.
- *
- * Note that if the timeout is zero (usually from the point of view of
- * commands API this means no timeout) the value stored into 'timeout'
- * is zero. */
-int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
- long long tval;
- long double ftval;
-
- if (unit == UNIT_SECONDS) {
- if (getLongDoubleFromObjectOrReply(c,object,&ftval,
- "timeout is not an float or out of range") != C_OK)
- return C_ERR;
- tval = (long long) (ftval * 1000.0);
- } else {
- if (getLongLongFromObjectOrReply(c,object,&tval,
- "timeout is not an integer or out of range") != C_OK)
- return C_ERR;
- }
-
- if (tval < 0) {
- addReplyError(c,"timeout is negative");
- return C_ERR;
- }
-
- if (tval > 0) {
- tval += mstime();
- }
- *timeout = tval;
-
- return C_OK;
-}
-
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
diff --git a/src/server.c b/src/server.c
index cfed2f91b..c1f7436a1 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1473,132 +1473,6 @@ int allPersistenceDisabled(void) {
return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
}
-/* ========================== Clients timeouts ============================= */
-
-/* Check if this blocked client timedout (does nothing if the client is
- * not blocked right now). If so send a reply, unblock it, and return 1.
- * Otherwise 0 is returned and no operation is performed. */
-int checkBlockedClientTimeout(client *c, mstime_t now) {
- if (c->flags & CLIENT_BLOCKED &&
- c->bpop.timeout != 0
- && c->bpop.timeout < now)
- {
- /* Handle blocking operation specific timeout. */
- replyToBlockedClientTimedOut(c);
- unblockClient(c);
- return 1;
- } else {
- return 0;
- }
-}
-
-/* Check for timeouts. Returns non-zero if the client was terminated.
- * The function gets the current time in milliseconds as argument since
- * it gets called multiple times in a loop, so calling gettimeofday() for
- * each iteration would be costly without any actual gain. */
-int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
- time_t now = now_ms/1000;
-
- if (server.maxidletime &&
- /* This handles the idle clients connection timeout if set. */
- !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
- !(c->flags & CLIENT_MASTER) && /* No timeout for masters */
- !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
- !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
- (now - c->lastinteraction > server.maxidletime))
- {
- serverLog(LL_VERBOSE,"Closing idle client");
- freeClient(c);
- return 1;
- } else if (c->flags & CLIENT_BLOCKED) {
- /* Cluster: handle unblock & redirect of clients blocked
- * into keys no longer served by this server. */
- if (server.cluster_enabled) {
- if (clusterRedirectBlockedClientIfNeeded(c))
- unblockClient(c);
- }
- }
- return 0;
-}
-
-/* For blocked clients timeouts we populate a radix tree of 128 bit keys
- * composed as such:
- *
- * [8 byte big endian expire time]+[8 byte client ID]
- *
- * We don't do any cleanup in the Radix tree: when we run the clients that
- * reached the timeout already, if they are no longer existing or no longer
- * blocked with such timeout, we just go forward.
- *
- * Every time a client blocks with a timeout, we add the client in
- * the tree. In beforeSleep() we call clientsHandleTimeout() to run
- * the tree and unblock the clients. */
-
-#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
-
-/* Given client ID and timeout, write the resulting radix tree key in buf. */
-void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
- timeout = htonu64(timeout);
- memcpy(buf,&timeout,sizeof(timeout));
- memcpy(buf+8,&id,sizeof(id));
-}
-
-/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
- * the timeout into *toptr and the client ID into *idptr. */
-void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
- memcpy(toptr,buf,sizeof(*toptr));
- *toptr = ntohu64(*toptr);
- memcpy(idptr,buf+8,sizeof(*idptr));
-}
-
-/* Add the specified client id / timeout as a key in the radix tree we use
- * to handle blocked clients timeouts. The client is not added to the list
- * if its timeout is zero (block forever). */
-void addClientToTimeoutTable(client *c) {
- if (c->bpop.timeout == 0) return;
- uint64_t timeout = c->bpop.timeout;
- uint64_t id = c->id;
- unsigned char buf[CLIENT_ST_KEYLEN];
- encodeTimeoutKey(buf,timeout,id);
- if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
- c->flags |= CLIENT_IN_TO_TABLE;
-}
-
-/* Remove the client from the table when it is unblocked for reasons
- * different than timing out. */
-void removeClientFromTimeoutTable(client *c) {
- if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
- c->flags &= ~CLIENT_IN_TO_TABLE;
- uint64_t timeout = c->bpop.timeout;
- uint64_t id = c->id;
- unsigned char buf[CLIENT_ST_KEYLEN];
- encodeTimeoutKey(buf,timeout,id);
- raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
-}
-
-/* This function is called in beforeSleep() in order to unblock clients
- * that are waiting in blocking operations with a timeout set. */
-void clientsHandleTimeout(void) {
- if (raxSize(server.clients_timeout_table) == 0) return;
- uint64_t now = mstime();
- raxIterator ri;
- raxStart(&ri,server.clients_timeout_table);
- raxSeek(&ri,"^",NULL,0);
-
- while(raxNext(&ri)) {
- uint64_t id, timeout;
- decodeTimeoutKey(ri.key,&timeout,&id);
- if (timeout >= now) break; /* All the timeouts are in the future. */
- client *c = lookupClientByID(id);
- if (c) {
- c->flags &= ~CLIENT_IN_TO_TABLE;
- checkBlockedClientTimeout(c,now);
- }
- raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
- raxSeek(&ri,"^",NULL,0);
- }
-}
-
/* ======================= Cron: called every 100 ms ======================== */
/* Add a sample to the operations per second array of samples. */
@@ -2183,7 +2057,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Handle precise timeouts of blocked clients. */
- clientsHandleTimeout();
+ handleBlockedClientsTimeout();
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
diff --git a/src/server.h b/src/server.h
index a3d91a09e..691f47c48 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2138,8 +2138,12 @@ void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
+
+/* timeout.c -- Blocked clients timeout and connections timeout. */
void addClientToTimeoutTable(client *c);
void removeClientFromTimeoutTable(client *c);
+void handleBlockedClientsTimeout(void);
+int clientsCronHandleTimeout(client *c, mstime_t now_ms);
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);
diff --git a/src/timeout.c b/src/timeout.c
new file mode 100644
index 000000000..ea2032e2a
--- /dev/null
+++ b/src/timeout.c
@@ -0,0 +1,192 @@
+/* Copyright (c) 2009-2020, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "cluster.h"
+
+/* ========================== Clients timeouts ============================= */
+
+/* Check if this blocked client timedout (does nothing if the client is
+ * not blocked right now). If so send a reply, unblock it, and return 1.
+ * Otherwise 0 is returned and no operation is performed. */
+int checkBlockedClientTimeout(client *c, mstime_t now) {
+ if (c->flags & CLIENT_BLOCKED &&
+ c->bpop.timeout != 0
+ && c->bpop.timeout < now)
+ {
+ /* Handle blocking operation specific timeout. */
+ replyToBlockedClientTimedOut(c);
+ unblockClient(c);
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/* Check for timeouts. Returns non-zero if the client was terminated.
+ * The function gets the current time in milliseconds as argument since
+ * it gets called multiple times in a loop, so calling gettimeofday() for
+ * each iteration would be costly without any actual gain. */
+int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
+ time_t now = now_ms/1000;
+
+ if (server.maxidletime &&
+ /* This handles the idle clients connection timeout if set. */
+ !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
+ !(c->flags & CLIENT_MASTER) && /* No timeout for masters */
+ !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
+ !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
+ (now - c->lastinteraction > server.maxidletime))
+ {
+ serverLog(LL_VERBOSE,"Closing idle client");
+ freeClient(c);
+ return 1;
+ } else if (c->flags & CLIENT_BLOCKED) {
+ /* Cluster: handle unblock & redirect of clients blocked
+ * into keys no longer served by this server. */
+ if (server.cluster_enabled) {
+ if (clusterRedirectBlockedClientIfNeeded(c))
+ unblockClient(c);
+ }
+ }
+ return 0;
+}
+
+/* For blocked clients timeouts we populate a radix tree of 128 bit keys
+ * composed as such:
+ *
+ * [8 byte big endian expire time]+[8 byte client ID]
+ *
+ * We don't do any cleanup in the Radix tree: when we run the clients that
+ * reached the timeout already, if they are no longer existing or no longer
+ * blocked with such timeout, we just go forward.
+ *
+ * Every time a client blocks with a timeout, we add the client in
+ * the tree. In beforeSleep() we call handleBlockedClientsTimeout() to run
+ * the tree and unblock the clients. */
+
+#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
+
+/* Given client ID and timeout, write the resulting radix tree key in buf. */
+void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
+ timeout = htonu64(timeout);
+ memcpy(buf,&timeout,sizeof(timeout));
+ memcpy(buf+8,&id,sizeof(id));
+}
+
+/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
+ * the timeout into *toptr and the client ID into *idptr. */
+void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
+ memcpy(toptr,buf,sizeof(*toptr));
+ *toptr = ntohu64(*toptr);
+ memcpy(idptr,buf+8,sizeof(*idptr));
+}
+
+/* Add the specified client id / timeout as a key in the radix tree we use
+ * to handle blocked clients timeouts. The client is not added to the list
+ * if its timeout is zero (block forever). */
+void addClientToTimeoutTable(client *c) {
+ if (c->bpop.timeout == 0) return;
+ uint64_t timeout = c->bpop.timeout;
+ uint64_t id = c->id;
+ unsigned char buf[CLIENT_ST_KEYLEN];
+ encodeTimeoutKey(buf,timeout,id);
+ if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
+ c->flags |= CLIENT_IN_TO_TABLE;
+}
+
+/* Remove the client from the table when it is unblocked for reasons
+ * different than timing out. */
+void removeClientFromTimeoutTable(client *c) {
+ if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
+ c->flags &= ~CLIENT_IN_TO_TABLE;
+ uint64_t timeout = c->bpop.timeout;
+ uint64_t id = c->id;
+ unsigned char buf[CLIENT_ST_KEYLEN];
+ encodeTimeoutKey(buf,timeout,id);
+ raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
+}
+
+/* This function is called in beforeSleep() in order to unblock clients
+ * that are waiting in blocking operations with a timeout set. */
+void handleBlockedClientsTimeout(void) {
+ if (raxSize(server.clients_timeout_table) == 0) return;
+ uint64_t now = mstime();
+ raxIterator ri;
+ raxStart(&ri,server.clients_timeout_table);
+ raxSeek(&ri,"^",NULL,0);
+
+ while(raxNext(&ri)) {
+ uint64_t id, timeout;
+ decodeTimeoutKey(ri.key,&timeout,&id);
+ if (timeout >= now) break; /* All the timeouts are in the future. */
+ client *c = lookupClientByID(id);
+ if (c) {
+ c->flags &= ~CLIENT_IN_TO_TABLE;
+ checkBlockedClientTimeout(c,now);
+ }
+ raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
+ raxSeek(&ri,"^",NULL,0);
+ }
+}
+
+/* Get a timeout value from an object and store it into 'timeout'.
+ * The final timeout is always stored as milliseconds as a time where the
+ * timeout will expire, however the parsing is performed according to
+ * the 'unit' that can be seconds or milliseconds.
+ *
+ * Note that if the timeout is zero (usually from the point of view of
+ * commands API this means no timeout) the value stored into 'timeout'
+ * is zero. */
+int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
+ long long tval;
+ long double ftval;
+
+ if (unit == UNIT_SECONDS) {
+ if (getLongDoubleFromObjectOrReply(c,object,&ftval,
+ "timeout is not an float or out of range") != C_OK)
+ return C_ERR;
+ tval = (long long) (ftval * 1000.0);
+ } else {
+ if (getLongLongFromObjectOrReply(c,object,&tval,
+ "timeout is not an integer or out of range") != C_OK)
+ return C_ERR;
+ }
+
+ if (tval < 0) {
+ addReplyError(c,"timeout is negative");
+ return C_ERR;
+ }
+
+ if (tval > 0) {
+ tval += mstime();
+ }
+ *timeout = tval;
+
+ return C_OK;
+}