summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c28
1 files changed, 14 insertions, 14 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 9e7d3d126..2fa59fa21 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1057,7 +1057,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
/* Look the stream at 'key' and return the corresponding stream object.
* The function creates a key setting it to an empty stream if needed. */
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
- robj *o = lookupKeyWrite(c->db,key);
+ robj *o = lookupKeyWrite(c->db,key,NULL);
if (o == NULL) {
o = createStreamObject();
dbAdd(c->db,key,o);
@@ -1287,8 +1287,8 @@ void xrangeGenericCommand(client *c, int rev) {
}
/* Return the specified range to the user. */
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||
- checkType(c,o,OBJ_STREAM)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.emptyarray))
+ == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
@@ -1313,7 +1313,7 @@ void xrevrangeCommand(client *c) {
/* XLEN */
void xlenCommand(client *c) {
robj *o;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
addReplyLongLong(c,s->length);
@@ -1411,7 +1411,7 @@ void xreadCommand(client *c) {
* starting from now. */
int id_idx = i - streams_arg - streams_count;
robj *key = c->argv[i-streams_count];
- robj *o = lookupKeyRead(c->db,key);
+ robj *o = lookupKeyRead(c->db,key,NULL);
if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
streamCG *group = NULL;
@@ -1469,7 +1469,7 @@ void xreadCommand(client *c) {
size_t arraylen = 0;
void *arraylen_ptr = NULL;
for (int i = 0; i < streams_count; i++) {
- robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
+ robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i],NULL);
if (o == NULL) continue;
stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */
@@ -1736,7 +1736,7 @@ NULL
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc >= 4) {
- o = lookupKeyWrite(c->db,c->argv[2]);
+ o = lookupKeyWrite(c->db,c->argv[2],NULL);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
@@ -1840,7 +1840,7 @@ NULL
*
* Set the internal "last ID" of a stream. */
void xsetidCommand(client *c) {
- robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
+ robj *o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
@@ -1881,7 +1881,7 @@ void xsetidCommand(client *c) {
*/
void xackCommand(client *c) {
streamCG *group = NULL;
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyRead(c->db,c->argv[1],NULL);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
@@ -1952,7 +1952,7 @@ void xpendingCommand(client *c) {
}
/* Lookup the key and the group inside the stream. */
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyRead(c->db,c->argv[1],NULL);
streamCG *group;
if (o && checkType(c,o,OBJ_STREAM)) return;
@@ -2131,7 +2131,7 @@ void xpendingCommand(client *c) {
* what messages it is now in charge of. */
void xclaimCommand(client *c) {
streamCG *group = NULL;
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyRead(c->db,c->argv[1],NULL);
long long minidle; /* Minimum idle time argument. */
long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
@@ -2323,7 +2323,7 @@ void xclaimCommand(client *c) {
void xdelCommand(client *c) {
robj *o;
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
@@ -2368,7 +2368,7 @@ void xtrimCommand(client *c) {
/* If the key does not exist, we are ok returning zero, that is, the
* number of elements removed from the stream. */
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL
+ if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL,shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
@@ -2460,7 +2460,7 @@ NULL
key = c->argv[2];
/* Lookup the key now, this is common for all the subcommands but HELP. */
- robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr);
+ robj *o = lookupKeyWriteOrReply(c,key,NULL,shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;