diff options
-rw-r--r-- | src/module.c | 3 | ||||
-rw-r--r-- | src/stream.h | 2 | ||||
-rw-r--r-- | src/t_stream.c | 93 | ||||
-rw-r--r-- | tests/unit/type/stream.tcl | 37 |
4 files changed, 107 insertions, 28 deletions
diff --git a/src/module.c b/src/module.c index c74cdfc90..7c1cc054b 100644 --- a/src/module.c +++ b/src/module.c @@ -4232,7 +4232,8 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM use_id.seq = id->seq; use_id_ptr = &use_id; } - if (streamAppendItem(s, argv, numfields, &added_id, use_id_ptr) == C_ERR) { + + if (streamAppendItem(s,argv,numfields,&added_id,use_id_ptr,1) == C_ERR) { /* Either the ID not greater than all existing IDs in the stream, or * the elements are too large to be stored. either way, errno is already * set by streamAppendItem. */ diff --git a/src/stream.h b/src/stream.h index d1f17c311..724f2c2ad 100644 --- a/src/stream.h +++ b/src/stream.h @@ -129,7 +129,7 @@ robj *streamDup(robj *o); int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep); int streamParseID(const robj *o, streamID *id); robj *createObjectFromStreamID(streamID *id); -int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id); +int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given); int streamDeleteItem(stream *s, streamID *id); int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx); diff --git a/src/t_stream.c b/src/t_stream.c index 776030c94..af32bc18f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -56,7 +56,7 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); -int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); +int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given); int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); /* ----------------------------------------------------------------------- @@ -418,6 +418,10 @@ void streamGetEdgeID(stream *s, int first, streamID *edge_id) * If 'use_id' is not NULL, the ID is not auto-generated by the function, * but instead the passed ID is used to add the new entry. In this case * adding the entry may fail as specified later in this comment. + * + * When 'use_id' is used alongside with a zero 'seq-given', the sequence + * part of the passed ID is ignored and the function will attempt to use an + * auto-generated sequence. * * The function returns C_OK if the item was added, this is always true * if the ID was generated by the function. However the function may return @@ -426,14 +430,31 @@ void streamGetEdgeID(stream *s, int first, streamID *edge_id) * current top ID is greater or equal. errno will be set to EDOM. * 2. If a size of a single element or the sum of the elements is too big to * be stored into the stream. errno will be set to ERANGE. */ -int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { +int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) { /* Generate the new entry ID. */ streamID id; - if (use_id) - id = *use_id; - else + if (use_id) { + if (seq_given) { + id = *use_id; + } else { + /* The automatically generated sequence can be either zero (new + * timestamps) or the incremented sequence of the last ID. In the + * latter case, we need to prevent an overflow/advancing forward + * in time. */ + if (s->last_id.ms == use_id->ms) { + if (s->last_id.seq == UINT64_MAX) { + return C_ERR; + } + id = s->last_id; + id.seq++; + } else { + id = *use_id; + } + } + } else { streamNextID(&s->last_id,&id); + } /* Check that the new ID is greater than the last entry ID * or return an error. Automatically generated IDs might @@ -652,6 +673,7 @@ typedef struct { /* XADD options */ streamID id; /* User-provided ID, for XADD only. */ int id_given; /* Was an ID different than "*" specified? for XADD only. */ + int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */ int no_mkstream; /* if set to 1 do not create new stream */ /* XADD + XTRIM common options */ @@ -929,7 +951,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i i++; } - if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK) + if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK) return -1; i++; @@ -955,7 +977,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i args->no_mkstream = 1; } else if (xadd) { /* If we are here is a syntax error or a valid ID. */ - if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK) + if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK) return -1; args->id_given = 1; break; @@ -1669,8 +1691,13 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { * that can be represented. If 'strict' is set to 1, "-" and "+" will be * treated as an invalid ID. * + * The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part + * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this + * form is accepted and the argument is set to 0 unless the sequence part is + * specified. + * * If 'c' is set to NULL, no reply is sent to the client. */ -int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) { +int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) { char buf[128]; if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; memcpy(buf,o->ptr,sdslen(o->ptr)+1); @@ -1678,6 +1705,10 @@ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') goto invalid; + if (seq_given != NULL) { + *seq_given = 1; + } + /* Handle the "-" and "+" special cases. */ if (buf[0] == '-' && buf[1] == '\0') { id->ms = 0; @@ -1690,12 +1721,22 @@ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t } /* Parse <ms>-<seq> form. */ + unsigned long long ms, seq; char *dot = strchr(buf,'-'); if (dot) *dot = '\0'; - unsigned long long ms, seq; if (string2ull(buf,&ms) == 0) goto invalid; - if (dot && string2ull(dot+1,&seq) == 0) goto invalid; - if (!dot) seq = missing_seq; + if (dot) { + size_t seqlen = strlen(dot+1); + if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') { + /* Handle the <ms>-* form. */ + seq = 0; + *seq_given = 0; + } else if (string2ull(dot+1,&seq) == 0) { + goto invalid; + } + } else { + seq = missing_seq; + } id->ms = ms; id->seq = seq; return C_OK; @@ -1708,20 +1749,20 @@ invalid: /* Wrapper for streamGenericParseIDOrReply() used by module API. */ int streamParseID(const robj *o, streamID *id) { - return streamGenericParseIDOrReply(NULL, o, id, 0, 0); + return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL); } /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to * 0, to be used when - and + are acceptable IDs. */ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { - return streamGenericParseIDOrReply(c,o,id,missing_seq,0); + return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL); } /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to * 1, to be used when we want to return an error if the special IDs + or - * are provided. */ -int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { - return streamGenericParseIDOrReply(c,o,id,missing_seq,1); +int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) { + return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given); } /* Helper for parsing a stream ID that is a range query interval. When the @@ -1738,7 +1779,7 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, if (exclude != NULL) *exclude = (len > 1 && p[0] == '('); if (exclude != NULL && *exclude) { robj *t = createStringObject(p+1,len-1); - invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR); + invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR); decrRefCount(t); } else invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); @@ -1785,7 +1826,7 @@ void xaddCommand(client *c) { /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating * a new stream and have streamAppendItem fail, leaving an empty key in the * database. */ - if (parsed_args.id_given && + if (parsed_args.id_given && parsed_args.seq_given && parsed_args.id.ms == 0 && parsed_args.id.seq == 0) { addReplyError(c,"The ID specified in XADD must be greater than 0-0"); @@ -1808,7 +1849,7 @@ void xaddCommand(client *c) { /* Append using the low level function and return the ID. */ streamID id; if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, - &id, parsed_args.id_given ? &parsed_args.id : NULL) == C_ERR) + &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR) { if (errno == EDOM) addReplyError(c,"The ID specified in XADD is equal or smaller than " @@ -1841,7 +1882,7 @@ void xaddCommand(client *c) { /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ - if (!parsed_args.id_given) { + if (!parsed_args.id_given || !parsed_args.seq_given) { robj *idarg = createObjectFromStreamID(&id); rewriteClientCommandArgument(c, idpos, idarg); decrRefCount(idarg); @@ -2080,7 +2121,7 @@ void xreadCommand(client *c) { ids[id_idx].seq = UINT64_MAX; continue; } - if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) + if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK) goto cleanup; } @@ -2428,7 +2469,7 @@ NULL id.ms = 0; id.seq = 0; } - } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) { + } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) { return; } @@ -2505,7 +2546,7 @@ void xsetidCommand(client *c) { stream *s = o->ptr; streamID id; - if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return; /* If the stream has at least one item, we want to check that the user * is setting a last ID that is equal or greater than the current top @@ -2559,7 +2600,7 @@ void xackCommand(client *c) { if (id_count > STREAMID_STATIC_VECTOR_LEN) ids = zmalloc(sizeof(streamID)*id_count); for (int j = 3; j < c->argc; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK) goto cleanup; + if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup; } int acknowledged = 0; @@ -2873,7 +2914,7 @@ void xclaimCommand(client *c) { if (id_count > STREAMID_STATIC_VECTOR_LEN) ids = zmalloc(sizeof(streamID)*id_count); for (j = 5; j < c->argc; j++) { - if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0) != C_OK) break; + if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break; } int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ @@ -2907,7 +2948,7 @@ void xclaimCommand(client *c) { != C_OK) goto cleanup; } else if (!strcasecmp(opt,"LASTID") && moreargs) { j++; - if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) goto cleanup; + if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup; } else { addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); goto cleanup; @@ -3213,7 +3254,7 @@ void xdelCommand(client *c) { if (id_count > STREAMID_STATIC_VECTOR_LEN) ids = zmalloc(sizeof(streamID)*id_count); for (int j = 2; j < c->argc; j++) { - if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK) goto cleanup; + if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup; } /* Actually apply the command. */ diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 1d2448489..dfd75dc4a 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -85,6 +85,43 @@ start_server { assert_error ERR* {r xadd mystream * c d} } + test {XADD auto-generated sequence is incremented for last ID} { + r DEL mystream + set id1 [r XADD mystream 123-456 item 1 value a] + set id2 [r XADD mystream 123-* item 2 value b] + lassign [split $id2 -] _ seq + assert {$seq == 457} + assert {[streamCompareID $id1 $id2] == -1} + } + + test {XADD auto-generated sequence is zero for future timestamp ID} { + r DEL mystream + set id1 [r XADD mystream 123-456 item 1 value a] + set id2 [r XADD mystream 789-* item 2 value b] + lassign [split $id2 -] _ seq + assert {$seq == 0} + assert {[streamCompareID $id1 $id2] == -1} + } + + test {XADD auto-generated sequence can't be smaller than last ID} { + r DEL mystream + r XADD mystream 123-456 item 1 value a + assert_error ERR* {r XADD mystream 42-* item 2 value b} + } + + test {XADD auto-generated sequence can't overflow} { + r DEL mystream + r xadd mystream 1-18446744073709551615 a b + assert_error ERR* {r xadd mystream 1-* c d} + } + + test {XADD 0-* should succeed} { + r DEL mystream + set id [r xadd mystream 0-* a b] + lassign [split $id -] _ seq + assert {$seq == 1} + } + test {XADD with MAXLEN option} { r DEL mystream for {set j 0} {$j < 1000} {incr j} { |