summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-03-26 13:28:39 +0100
committerantirez <antirez@gmail.com>2020-03-27 16:35:03 +0100
commit324a8c91d05023420c7f15d9ad84ba41ad9eefe3 (patch)
tree0e355a10d6e4afe5d0a31a8bbcab3f087bb0ad6f
parent8d116cc8a2b8168bdf0dd649ca933ba1b3404852 (diff)
downloadredis-324a8c91d05023420c7f15d9ad84ba41ad9eefe3.tar.gz
Precise timeouts: working initial implementation.
-rw-r--r--src/blocked.c1
-rw-r--r--src/server.c135
-rw-r--r--src/server.h2
3 files changed, 110 insertions, 28 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 06aa5850e..c470cba00 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -619,6 +619,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
listAddNodeTail(l,c);
}
blockClient(c,btype);
+ addClientToShortTimeoutTable(c);
}
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
diff --git a/src/server.c b/src/server.c
index db56f6a0d..bd63c4b4e 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1473,34 +1473,7 @@ int allPersistenceDisabled(void) {
return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
}
-/* ======================= Cron: called every 100 ms ======================== */
-
-/* Add a sample to the operations per second array of samples. */
-void trackInstantaneousMetric(int metric, long long current_reading) {
- long long t = mstime() - server.inst_metric[metric].last_sample_time;
- long long ops = current_reading -
- server.inst_metric[metric].last_sample_count;
- long long ops_sec;
-
- ops_sec = t > 0 ? (ops*1000/t) : 0;
-
- server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
- ops_sec;
- server.inst_metric[metric].idx++;
- server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
- server.inst_metric[metric].last_sample_time = mstime();
- server.inst_metric[metric].last_sample_count = current_reading;
-}
-
-/* Return the mean of all the samples. */
-long long getInstantaneousMetric(int metric) {
- int j;
- long long sum = 0;
-
- for (j = 0; j < STATS_METRIC_SAMPLES; j++)
- sum += server.inst_metric[metric].samples[j];
- return sum / STATS_METRIC_SAMPLES;
-}
+/* ========================== Clients timeouts ============================= */
/* Check if this blocked client timedout (does nothing if the client is
* not blocked right now). If so send a reply, unblock it, and return 1.
@@ -1555,6 +1528,107 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
return 0;
}
+/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we
+ * populate a radix tree of 128 bit keys composed as such:
+ *
+ * [8 byte big endian expire time]+[8 byte client ID]
+ *
+ * We don't do any cleanup in the Radix tree: when we run the clients that
+ * reached the timeout already, if they are no longer existing or no longer
+ * blocked with such timeout, we just go forward.
+ *
+ * Every time a client blocks with a short timeout, we add the client in
+ * the tree. In beforeSleep() we call clientsHandleShortTimeout() to run
+ * the tree and unblock the clients.
+ *
+ * Design hint: why we block only clients with short timeouts? For frugality:
+ * Clients blocking for 30 seconds usually don't need to be unblocked
+ * precisely, and anyway for the nature of Redis to *guarantee* unblock time
+ * precision is hard, so we can avoid putting a large number of clients in
+ * the radix tree without a good reason. This idea also has a role in memory
+ * usage as well given that we don't do cleanup, the shorter a client timeout,
+ * the less time it will stay in the radix tree. */
+
+#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
+
+/* Given client ID and timeout, write the resulting radix tree key in buf. */
+void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
+ timeout = htonu64(timeout);
+ memcpy(buf,&timeout,sizeof(timeout));
+ memcpy(buf+8,&id,sizeof(id));
+}
+
+/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
+ * the timeout into *toptr and the client ID into *idptr. */
+void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
+ memcpy(toptr,buf,sizeof(*toptr));
+ *toptr = ntohu64(*toptr);
+ memcpy(idptr,buf+8,sizeof(*idptr));
+}
+
+/* Add the specified client id / timeout as a key in the radix tree we use
+ * to handle short timeouts. The client is not added to the list if its
+ * timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */
+void addClientToShortTimeoutTable(client *c) {
+ if (c->bpop.timeout == 0 ||
+ c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT)
+ {
+ return;
+ }
+ uint64_t timeout = c->bpop.timeout;
+ uint64_t id = c->id;
+ unsigned char buf[CLIENT_ST_KEYLEN];
+ encodeTimeoutKey(buf,timeout,id);
+ raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL);
+}
+
+/* This function is called in beforeSleep() in order to unblock ASAP clients
+ * that are waiting in blocking operations with a short timeout set. */
+void clientsHandleShortTimeout(void) {
+ uint64_t now = mstime();
+ raxIterator ri;
+ raxStart(&ri,server.clients_timeout_table);
+
+ while(raxNext(&ri)) {
+ uint64_t id, timeout;
+ decodeTimeoutKey(ri.key,&timeout,&id);
+ if (timeout >= now) break; /* All the timeouts are in the future. */
+ client *c = lookupClientByID(id);
+ if (c) checkBlockedClientTimeout(c,now);
+ raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
+ raxSeek(&ri,"^",NULL,0);
+ }
+}
+
+/* ======================= Cron: called every 100 ms ======================== */
+
+/* Add a sample to the operations per second array of samples. */
+void trackInstantaneousMetric(int metric, long long current_reading) {
+ long long t = mstime() - server.inst_metric[metric].last_sample_time;
+ long long ops = current_reading -
+ server.inst_metric[metric].last_sample_count;
+ long long ops_sec;
+
+ ops_sec = t > 0 ? (ops*1000/t) : 0;
+
+ server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
+ ops_sec;
+ server.inst_metric[metric].idx++;
+ server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
+ server.inst_metric[metric].last_sample_time = mstime();
+ server.inst_metric[metric].last_sample_count = current_reading;
+}
+
+/* Return the mean of all the samples. */
+long long getInstantaneousMetric(int metric) {
+ int j;
+ long long sum = 0;
+
+ for (j = 0; j < STATS_METRIC_SAMPLES; j++)
+ sum += server.inst_metric[metric].samples[j];
+ return sum / STATS_METRIC_SAMPLES;
+}
+
/* The client query buffer is an sds.c string that can end with a lot of
* free space not used, this function reclaims space if needed.
*
@@ -2109,11 +2183,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
+ /* Handle precise timeouts of blocked clients. */
+ clientsHandleShortTimeout();
+
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
tlsProcessPendingData();
+
/* If tls still has pending unread data don't sleep at all. */
aeSetDontWait(server.el, tlsHasPendingData());
@@ -2738,6 +2816,7 @@ void initServer(void) {
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
+ server.clients_timeout_table = raxNew();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
diff --git a/src/server.h b/src/server.h
index 93552b36e..6f417b730 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1070,6 +1070,7 @@ struct redisServer {
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client executing the command. */
+ rax *clients_timeout_table; /* Radix tree for clients with short timeout. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
rax *clients_index; /* Active clients dictionary by client ID. */
int clients_paused; /* True if clients are currently paused */
@@ -2139,6 +2140,7 @@ void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
+void addClientToShortTimeoutTable(client *c);
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);