summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/module.c3
-rw-r--r--src/stream.h2
-rw-r--r--src/t_stream.c93
-rw-r--r--tests/unit/type/stream.tcl37
4 files changed, 107 insertions, 28 deletions
diff --git a/src/module.c b/src/module.c
index c74cdfc90..7c1cc054b 100644
--- a/src/module.c
+++ b/src/module.c
@@ -4232,7 +4232,8 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM
use_id.seq = id->seq;
use_id_ptr = &use_id;
}
- if (streamAppendItem(s, argv, numfields, &added_id, use_id_ptr) == C_ERR) {
+
+ if (streamAppendItem(s,argv,numfields,&added_id,use_id_ptr,1) == C_ERR) {
/* Either the ID not greater than all existing IDs in the stream, or
* the elements are too large to be stored. either way, errno is already
* set by streamAppendItem. */
diff --git a/src/stream.h b/src/stream.h
index d1f17c311..724f2c2ad 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -129,7 +129,7 @@ robj *streamDup(robj *o);
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
-int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id);
+int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
int streamDeleteItem(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
diff --git a/src/t_stream.c b/src/t_stream.c
index 776030c94..af32bc18f 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -56,7 +56,7 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
-int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
+int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
/* -----------------------------------------------------------------------
@@ -418,6 +418,10 @@ void streamGetEdgeID(stream *s, int first, streamID *edge_id)
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is used to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
+ *
+ * When 'use_id' is used alongside with a zero 'seq-given', the sequence
+ * part of the passed ID is ignored and the function will attempt to use an
+ * auto-generated sequence.
*
* The function returns C_OK if the item was added, this is always true
* if the ID was generated by the function. However the function may return
@@ -426,14 +430,31 @@ void streamGetEdgeID(stream *s, int first, streamID *edge_id)
* current top ID is greater or equal. errno will be set to EDOM.
* 2. If a size of a single element or the sum of the elements is too big to
* be stored into the stream. errno will be set to ERANGE. */
-int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
+int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) {
/* Generate the new entry ID. */
streamID id;
- if (use_id)
- id = *use_id;
- else
+ if (use_id) {
+ if (seq_given) {
+ id = *use_id;
+ } else {
+ /* The automatically generated sequence can be either zero (new
+ * timestamps) or the incremented sequence of the last ID. In the
+ * latter case, we need to prevent an overflow/advancing forward
+ * in time. */
+ if (s->last_id.ms == use_id->ms) {
+ if (s->last_id.seq == UINT64_MAX) {
+ return C_ERR;
+ }
+ id = s->last_id;
+ id.seq++;
+ } else {
+ id = *use_id;
+ }
+ }
+ } else {
streamNextID(&s->last_id,&id);
+ }
/* Check that the new ID is greater than the last entry ID
* or return an error. Automatically generated IDs might
@@ -652,6 +673,7 @@ typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
+ int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */
/* XADD + XTRIM common options */
@@ -929,7 +951,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
i++;
}
- if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK)
+ if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK)
return -1;
i++;
@@ -955,7 +977,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
args->no_mkstream = 1;
} else if (xadd) {
/* If we are here is a syntax error or a valid ID. */
- if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK)
+ if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK)
return -1;
args->id_given = 1;
break;
@@ -1669,8 +1691,13 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
* that can be represented. If 'strict' is set to 1, "-" and "+" will be
* treated as an invalid ID.
*
+ * The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part
+ * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
+ * form is accepted and the argument is set to 0 unless the sequence part is
+ * specified.
+ *
* If 'c' is set to NULL, no reply is sent to the client. */
-int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) {
+int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) {
char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
@@ -1678,6 +1705,10 @@ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t
if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
goto invalid;
+ if (seq_given != NULL) {
+ *seq_given = 1;
+ }
+
/* Handle the "-" and "+" special cases. */
if (buf[0] == '-' && buf[1] == '\0') {
id->ms = 0;
@@ -1690,12 +1721,22 @@ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t
}
/* Parse <ms>-<seq> form. */
+ unsigned long long ms, seq;
char *dot = strchr(buf,'-');
if (dot) *dot = '\0';
- unsigned long long ms, seq;
if (string2ull(buf,&ms) == 0) goto invalid;
- if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
- if (!dot) seq = missing_seq;
+ if (dot) {
+ size_t seqlen = strlen(dot+1);
+ if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') {
+ /* Handle the <ms>-* form. */
+ seq = 0;
+ *seq_given = 0;
+ } else if (string2ull(dot+1,&seq) == 0) {
+ goto invalid;
+ }
+ } else {
+ seq = missing_seq;
+ }
id->ms = ms;
id->seq = seq;
return C_OK;
@@ -1708,20 +1749,20 @@ invalid:
/* Wrapper for streamGenericParseIDOrReply() used by module API. */
int streamParseID(const robj *o, streamID *id) {
- return streamGenericParseIDOrReply(NULL, o, id, 0, 0);
+ return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL);
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 0, to be used when - and + are acceptable IDs. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
- return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
+ return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL);
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 1, to be used when we want to return an error if the special IDs + or -
* are provided. */
-int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
- return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
+int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) {
+ return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given);
}
/* Helper for parsing a stream ID that is a range query interval. When the
@@ -1738,7 +1779,7 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude,
if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
if (exclude != NULL && *exclude) {
robj *t = createStringObject(p+1,len-1);
- invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR);
+ invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR);
decrRefCount(t);
} else
invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
@@ -1785,7 +1826,7 @@ void xaddCommand(client *c) {
/* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
* a new stream and have streamAppendItem fail, leaving an empty key in the
* database. */
- if (parsed_args.id_given &&
+ if (parsed_args.id_given && parsed_args.seq_given &&
parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
{
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
@@ -1808,7 +1849,7 @@ void xaddCommand(client *c) {
/* Append using the low level function and return the ID. */
streamID id;
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
- &id, parsed_args.id_given ? &parsed_args.id : NULL) == C_ERR)
+ &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)
{
if (errno == EDOM)
addReplyError(c,"The ID specified in XADD is equal or smaller than "
@@ -1841,7 +1882,7 @@ void xaddCommand(client *c) {
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
- if (!parsed_args.id_given) {
+ if (!parsed_args.id_given || !parsed_args.seq_given) {
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c, idpos, idarg);
decrRefCount(idarg);
@@ -2080,7 +2121,7 @@ void xreadCommand(client *c) {
ids[id_idx].seq = UINT64_MAX;
continue;
}
- if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
+ if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)
goto cleanup;
}
@@ -2428,7 +2469,7 @@ NULL
id.ms = 0;
id.seq = 0;
}
- } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
+ } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) {
return;
}
@@ -2505,7 +2546,7 @@ void xsetidCommand(client *c) {
stream *s = o->ptr;
streamID id;
- if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return;
+ if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return;
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
@@ -2559,7 +2600,7 @@ void xackCommand(client *c) {
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 3; j < c->argc; j++) {
- if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK) goto cleanup;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
}
int acknowledged = 0;
@@ -2873,7 +2914,7 @@ void xclaimCommand(client *c) {
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (j = 5; j < c->argc; j++) {
- if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0) != C_OK) break;
+ if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break;
}
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
@@ -2907,7 +2948,7 @@ void xclaimCommand(client *c) {
!= C_OK) goto cleanup;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
- if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) goto cleanup;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
goto cleanup;
@@ -3213,7 +3254,7 @@ void xdelCommand(client *c) {
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 2; j < c->argc; j++) {
- if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK) goto cleanup;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup;
}
/* Actually apply the command. */
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
index 1d2448489..dfd75dc4a 100644
--- a/tests/unit/type/stream.tcl
+++ b/tests/unit/type/stream.tcl
@@ -85,6 +85,43 @@ start_server {
assert_error ERR* {r xadd mystream * c d}
}
+ test {XADD auto-generated sequence is incremented for last ID} {
+ r DEL mystream
+ set id1 [r XADD mystream 123-456 item 1 value a]
+ set id2 [r XADD mystream 123-* item 2 value b]
+ lassign [split $id2 -] _ seq
+ assert {$seq == 457}
+ assert {[streamCompareID $id1 $id2] == -1}
+ }
+
+ test {XADD auto-generated sequence is zero for future timestamp ID} {
+ r DEL mystream
+ set id1 [r XADD mystream 123-456 item 1 value a]
+ set id2 [r XADD mystream 789-* item 2 value b]
+ lassign [split $id2 -] _ seq
+ assert {$seq == 0}
+ assert {[streamCompareID $id1 $id2] == -1}
+ }
+
+ test {XADD auto-generated sequence can't be smaller than last ID} {
+ r DEL mystream
+ r XADD mystream 123-456 item 1 value a
+ assert_error ERR* {r XADD mystream 42-* item 2 value b}
+ }
+
+ test {XADD auto-generated sequence can't overflow} {
+ r DEL mystream
+ r xadd mystream 1-18446744073709551615 a b
+ assert_error ERR* {r xadd mystream 1-* c d}
+ }
+
+ test {XADD 0-* should succeed} {
+ r DEL mystream
+ set id [r xadd mystream 0-* a b]
+ lassign [split $id -] _ seq
+ assert {$seq == 1}
+ }
+
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {