summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-07 17:45:34 +0200
committerantirez <antirez@gmail.com>2017-11-04 17:39:26 +0100
commitb6028f287bdae798ee606a8708eafefd9a11adc4 (patch)
tree0c08d7e6f2b7fb864ae1f55ed0b39c148c79a5ef
parentfdd0977aeeaa5f29b7266ed4ad58e5df48fe951b (diff)
downloadredis-b6028f287bdae798ee606a8708eafefd9a11adc4.tar.gz
Streams: XREAD, first draft. Handling of blocked clients still missing.
-rw-r--r--src/t_stream.c56
1 files changed, 46 insertions, 10 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 485ea29aa..0820a7438 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -398,10 +398,10 @@ void xlenCommand(client *c) {
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ...
* key_N ID_N */
void xreadCommand(client *c) {
- long long block = 0;
+ long long timeout = 0;
long long count = 0;
int streams_count = 0;
- int streams_argc = 0;
+ int streams_arg = 0;
#define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
@@ -412,17 +412,17 @@ void xreadCommand(client *c) {
char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) {
i++;
- if (getLongLongFromObjectOrReply(c,c->argv[i],&block,NULL) != C_OK)
- return;
- if (block < 0) block = 0;
+ if (getLongLongFromObjectOrReply(c,c->argv[i],&timeout,NULL)
+ != C_OK) return;
+ if (timeout < 0) timeout = 0;
} else if (!strcasecmp(o,"COUNT") && moreargs) {
i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
return;
if (count < 0) count = 0;
} else if (!strcasecmp(o,"STREAMS") && moreargs) {
- streams_argc = i+1;
- streams_count = (c->argc-streams_argc);
+ streams_arg = i+1;
+ streams_count = (c->argc-streams_arg);
if ((streams_count % 2) != 0) {
addReplyError(c,"Unbalanced XREAD list of streams: "
"for each stream key an ID or '$' must be "
@@ -438,7 +438,7 @@ void xreadCommand(client *c) {
}
/* STREAMS option is mandatory. */
- if (streams_argc == 0) {
+ if (streams_arg == 0) {
addReply(c,shared.syntaxerr);
return;
}
@@ -447,8 +447,7 @@ void xreadCommand(client *c) {
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*streams_count);
- /* Try to serve the client synchronously. */
- for (int i = streams_argc + streams_count; i < c->argc; i++) {
+ for (int i = streams_arg + streams_count; i < c->argc; i++) {
/* Specifying "$" as last-known-id means that the client wants to be
* served with just the messages that will arrive into the stream
* starting from now. */
@@ -466,6 +465,43 @@ void xreadCommand(client *c) {
if (streamParseIDOrReply(c,c->argv[i],ids+i,0) != C_OK) goto cleanup;
}
+ /* Try to serve the client synchronously. */
+ for (int i = 0; i < streams_count; i++) {
+ robj *o = lookupKeyRead(c->db,c->argv[i+streams_arg]);
+ if (o == NULL) continue;
+ stream *s = o->ptr;
+ streamID *gt = ids+i; /* ID must be greater than this. */
+ if (s->last_id.ms > gt->ms ||
+ (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
+ {
+ /* streamReplyWithRange() handles the 'start' ID as inclusive,
+ * so start from the next ID, since we want only messages with
+ * IDs greater than start. */
+ streamID start = *gt;
+ start.seq++; /* Can't overflow, it's an uint64_t */
+ streamReplyWithRange(c,s,&start,NULL,count);
+ goto cleanup;
+ }
+ }
+
+ /* Block if needed. */
+ if (timeout) {
+ /* If we are inside a MULTI/EXEC and the list is empty the only thing
+ * we can do is treating it as a timeout (even with timeout 0). */
+ if (c->flags & CLIENT_MULTI) {
+ addReply(c,shared.nullmultibulk);
+ goto cleanup;
+ }
+ blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
+ timeout, NULL, ids);
+ goto cleanup;
+ }
+
+ /* No BLOCK option, nor any stream we can serve. Reply as with a
+ * timeout happened. */
+ addReply(c,shared.nullmultibulk);
+ /* Continue to cleanup... */
+
cleanup:
/* Cleanup. */
if (ids != static_ids) zfree(ids);