diff options
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index 9e7d3d126..2fa59fa21 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1057,7 +1057,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start /* Look the stream at 'key' and return the corresponding stream object. * The function creates a key setting it to an empty stream if needed. */ robj *streamTypeLookupWriteOrCreate(client *c, robj *key) { - robj *o = lookupKeyWrite(c->db,key); + robj *o = lookupKeyWrite(c->db,key,NULL); if (o == NULL) { o = createStreamObject(); dbAdd(c->db,key,o); @@ -1287,8 +1287,8 @@ void xrangeGenericCommand(client *c, int rev) { } /* Return the specified range to the user. */ - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || - checkType(c,o,OBJ_STREAM)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyarray)) + == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; @@ -1313,7 +1313,7 @@ void xrevrangeCommand(client *c) { /* XLEN */ void xlenCommand(client *c) { robj *o; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL + if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; addReplyLongLong(c,s->length); @@ -1411,7 +1411,7 @@ void xreadCommand(client *c) { * starting from now. */ int id_idx = i - streams_arg - streams_count; robj *key = c->argv[i-streams_count]; - robj *o = lookupKeyRead(c->db,key); + robj *o = lookupKeyRead(c->db,key,NULL); if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; streamCG *group = NULL; @@ -1469,7 +1469,7 @@ void xreadCommand(client *c) { size_t arraylen = 0; void *arraylen_ptr = NULL; for (int i = 0; i < streams_count; i++) { - robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); + robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i],NULL); if (o == NULL) continue; stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ @@ -1736,7 +1736,7 @@ NULL /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { - o = lookupKeyWrite(c->db,c->argv[2]); + o = lookupKeyWrite(c->db,c->argv[2],NULL); if (o) { if (checkType(c,o,OBJ_STREAM)) return; s = o->ptr; @@ -1840,7 +1840,7 @@ NULL * * Set the internal "last ID" of a stream. */ void xsetidCommand(client *c) { - robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; @@ -1881,7 +1881,7 @@ void xsetidCommand(client *c) { */ void xackCommand(client *c) { streamCG *group = NULL; - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyRead(c->db,c->argv[1],NULL); if (o) { if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */ group = streamLookupCG(o->ptr,c->argv[2]->ptr); @@ -1952,7 +1952,7 @@ void xpendingCommand(client *c) { } /* Lookup the key and the group inside the stream. */ - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyRead(c->db,c->argv[1],NULL); streamCG *group; if (o && checkType(c,o,OBJ_STREAM)) return; @@ -2131,7 +2131,7 @@ void xpendingCommand(client *c) { * what messages it is now in charge of. */ void xclaimCommand(client *c) { streamCG *group = NULL; - robj *o = lookupKeyRead(c->db,c->argv[1]); + robj *o = lookupKeyRead(c->db,c->argv[1],NULL); long long minidle; /* Minimum idle time argument. */ long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ @@ -2323,7 +2323,7 @@ void xclaimCommand(client *c) { void xdelCommand(client *c) { robj *o; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL + if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; @@ -2368,7 +2368,7 @@ void xtrimCommand(client *c) { /* If the key does not exist, we are ok returning zero, that is, the * number of elements removed from the stream. */ - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL + if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL || checkType(c,o,OBJ_STREAM)) return; stream *s = o->ptr; @@ -2460,7 +2460,7 @@ NULL key = c->argv[2]; /* Lookup the key now, this is common for all the subcommands but HELP. */ - robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr); + robj *o = lookupKeyWriteOrReply(c,key,NULL,shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; |