summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-19 16:39:09 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commitccdae09046e05dae4d2e23a8a98a3ab7305c8d76 (patch)
tree6112435b1dfbd42014a05329aa80013265b08f75
parentb8e52321612f3fa7a3a78779700d74df99b154cf (diff)
downloadredis-ccdae09046e05dae4d2e23a8a98a3ab7305c8d76.tar.gz
CG: add & populate group+consumer in the blocking state.
-rw-r--r--src/blocked.c2
-rw-r--r--src/networking.c1
-rw-r--r--src/server.h3
-rw-r--r--src/t_stream.c15
4 files changed, 13 insertions, 8 deletions
diff --git a/src/blocked.c b/src/blocked.c
index f438c3353..d560a8f38 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -442,7 +442,9 @@ void unblockClientWaitingData(client *c) {
}
if (c->bpop.xread_group) {
decrRefCount(c->bpop.xread_group);
+ decrRefCount(c->bpop.xread_consumer);
c->bpop.xread_group = NULL;
+ c->bpop.xread_consumer = NULL;
}
}
diff --git a/src/networking.c b/src/networking.c
index 51c239fc8..c29adc1e0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -137,6 +137,7 @@ client *createClient(int fd) {
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
+ c->bpop.xread_consumer = NULL;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
diff --git a/src/server.h b/src/server.h
index 72a6563a3..d4f989264 100644
--- a/src/server.h
+++ b/src/server.h
@@ -653,7 +653,8 @@ typedef struct blockingState {
/* BLOCK_STREAM */
size_t xread_count; /* XREAD COUNT option. */
- robj *xread_group; /* XREAD group name. */
+ robj *xread_group; /* XREADGROUP group name. */
+ robj *xread_consumer; /* XREADGROUP consumer name. */
mstime_t xread_retry_time, xread_retry_ttl;
/* BLOCKED_WAIT */
diff --git a/src/t_stream.c b/src/t_stream.c
index 82c926b39..65a926ae1 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -943,8 +943,8 @@ void xreadCommand(client *c) {
streamID *ids = static_ids;
streamCG **groups = NULL;
int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
- sds groupname = NULL;
- sds consumername = NULL;
+ robj *groupname = NULL;
+ robj *consumername = NULL;
/* Parse arguments. */
for (int i = 1; i < c->argc; i++) {
@@ -976,8 +976,8 @@ void xreadCommand(client *c) {
"XREADGROUP. You called XREAD instead.");
return;
}
- groupname = c->argv[i+1]->ptr;
- consumername = c->argv[i+2]->ptr;
+ groupname = c->argv[i+1];
+ consumername = c->argv[i+2];
i += 2;
} else {
addReply(c,shared.syntaxerr);
@@ -1011,12 +1011,12 @@ void xreadCommand(client *c) {
o = lookupKeyRead(c->db,key);
if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
if (o == NULL ||
- (group = streamLookupCG(o->ptr,groupname)) == NULL)
+ (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{
addReplyErrorFormat(c, "No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP "
"option",
- key->ptr,groupname);
+ key->ptr,groupname->ptr);
goto cleanup;
}
groups[id_idx] = group;
@@ -1093,7 +1093,8 @@ void xreadCommand(client *c) {
* in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
- c->bpop.xread_group = NULL; /* Not used for now. */
+ c->bpop.xread_group = groupname;
+ c->bpop.xread_consumer = consumername;
goto cleanup;
}