summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-11 11:20:36 +0200
committerantirez <antirez@gmail.com>2017-11-04 17:39:26 +0100
commit2829a057f6f734f328943eb029bb3714263a545f (patch)
tree0b4db8b6e669a9b152c4b1f5f8babf0e4c62731b
parent3636bb6e9afd39fef4a76f98b2a3095e16e6313c (diff)
downloadredis-2829a057f6f734f328943eb029bb3714263a545f.tar.gz
Streams: When XREAD blocks without COUNT, set a default one.
A client may lose a lot of time between invocations of blocking XREAD, for example because it is processing the messages or for any other cause. When it returns back, it may provide a low enough message ID that the server will block to send an unreasonable number of messages in a single call. For this reason we set a COUNT when the client is blocked with XREAD calls, even if no COUNT is given. This is arbitrarily set to 1000 because it's enough to avoid slowing down the reception of many messages, but low enough to avoid to block.
-rw-r--r--src/t_stream.c6
1 files changed, 6 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index c47c5dde1..1836ae735 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -399,6 +399,7 @@ void xlenCommand(client *c) {
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
* ID_1 ID_2 ... ID_N */
+#define XREAD_BLOCKED_DEFAULT_COUNT 1000
void xreadCommand(client *c) {
long long timeout = -1; /* -1 means, no BLOCK argument given. */
long long count = 0;
@@ -510,6 +511,11 @@ void xreadCommand(client *c) {
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
timeout, NULL, ids);
+ /* If no COUNT is given and we block, set a relatively small count:
+ * in case the ID provided is too low, we do not want the server to
+ * block just to serve this client a huge stream of messages. */
+ c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
+ c->bpop.xread_group = NULL; /* Not used for now. */
goto cleanup;
}