diff options
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 82c926b39..65a926ae1 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -943,8 +943,8 @@ void xreadCommand(client *c) { streamID *ids = static_ids; streamCG **groups = NULL; int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ - sds groupname = NULL; - sds consumername = NULL; + robj *groupname = NULL; + robj *consumername = NULL; /* Parse arguments. */ for (int i = 1; i < c->argc; i++) { @@ -976,8 +976,8 @@ void xreadCommand(client *c) { "XREADGROUP. You called XREAD instead."); return; } - groupname = c->argv[i+1]->ptr; - consumername = c->argv[i+2]->ptr; + groupname = c->argv[i+1]; + consumername = c->argv[i+2]; i += 2; } else { addReply(c,shared.syntaxerr); @@ -1011,12 +1011,12 @@ void xreadCommand(client *c) { o = lookupKeyRead(c->db,key); if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; if (o == NULL || - (group = streamLookupCG(o->ptr,groupname)) == NULL) + (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) { addReplyErrorFormat(c, "No such key '%s' or consumer " "group '%s' in XREADGROUP with GROUP " "option", - key->ptr,groupname); + key->ptr,groupname->ptr); goto cleanup; } groups[id_idx] = group; @@ -1093,7 +1093,8 @@ void xreadCommand(client *c) { * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; - c->bpop.xread_group = NULL; /* Not used for now. */ + c->bpop.xread_group = groupname; + c->bpop.xread_consumer = consumername; goto cleanup; } |