summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-09 11:10:59 +0200
committerantirez <antirez@gmail.com>2017-11-04 17:39:26 +0100
commit08cc57f6c8e7fd6cff9bfbf46a999b865ddd7b71 (patch)
treeec876356be97d7919ec698850dca6c80a239f41c
parentcd8d39a0aa7cef9d30106816bd0049d8860a2584 (diff)
downloadredis-08cc57f6c8e7fd6cff9bfbf46a999b865ddd7b71.tar.gz
Streams: fix XREAD ready-key signaling.
With lists we need to signal only on key creation, but streams can provide data to clients listening at every new item added. To make this slightly more efficient we now track different classes of blocked clients to avoid signaling keys when there is nobody listening. A typical case is when the stream is used as a time series DB and accessed only by range with XRANGE.
-rw-r--r--src/blocked.c6
-rw-r--r--src/db.c3
-rw-r--r--src/server.c6
-rw-r--r--src/server.h4
-rw-r--r--src/t_stream.c2
5 files changed, 14 insertions, 7 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 84d74f24b..3cf661aa8 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -102,7 +102,8 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
- server.bpop_blocked_clients++;
+ server.blocked_clients++;
+ server.blocked_clients_by_type[btype]++;
}
/* This function is called in the beforeSleep() function of the event loop
@@ -145,9 +146,10 @@ void unblockClient(client *c) {
}
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
+ server.blocked_clients--;
+ server.blocked_clients_by_type[c->btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
- server.bpop_blocked_clients--;
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
diff --git a/src/db.c b/src/db.c
index 942b25722..88b97175f 100644
--- a/src/db.c
+++ b/src/db.c
@@ -162,8 +162,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
- if (val->type == OBJ_LIST || val->type == OBJ_STREAM)
- signalKeyAsReady(db, key);
+ if (val->type == OBJ_LIST) signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
diff --git a/src/server.c b/src/server.c
index 101ba52a4..4117ef226 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1423,7 +1423,9 @@ void initServerConfig(void) {
server.active_defrag_running = 0;
server.notify_keyspace_events = 0;
server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
- server.bpop_blocked_clients = 0;
+ server.blocked_clients = 0;
+ memset(server.blocked_clients_by_type,0,
+ sizeof(server.blocked_clients_by_type));
server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
@@ -2910,7 +2912,7 @@ sds genRedisInfoString(char *section) {
"blocked_clients:%d\r\n",
listLength(server.clients)-listLength(server.slaves),
lol, bib,
- server.bpop_blocked_clients);
+ server.blocked_clients);
}
/* Memory */
diff --git a/src/server.h b/src/server.h
index d840916ba..3446f16f3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -257,6 +257,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */
+#define BLOCKED_NUM 5 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -1130,7 +1131,8 @@ struct redisServer {
unsigned int lfu_log_factor; /* LFU logarithmic counter factor. */
unsigned int lfu_decay_time; /* LFU counter decay factor. */
/* Blocked clients */
- unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
+ unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
+ unsigned int blocked_clients_by_type[BLOCKED_NUM];
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
diff --git a/src/t_stream.c b/src/t_stream.c
index afa8224cb..c47c5dde1 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -354,6 +354,8 @@ void xaddCommand(client *c) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id);
server.dirty++;
+ if (server.blocked_clients_by_type[BLOCKED_STREAM])
+ signalKeyAsReady(c->db, c->argv[1]);
}
/* XRANGE key start end [COUNT <n>] */