summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentino Geron <valentino@redislabs.com>2020-03-25 21:54:14 +0200
committerValentino Geron <valentino@redislabs.com>2020-03-26 15:40:23 +0200
commit1547d72cf3ec4ef5b67cbba7cf50726983be31e4 (patch)
tree9a0e721bc0f16af5d8e72351c4ffd30240d3de86
parentc4d7f30e250b4b291e6d5893d9a3323df4a57c7c (diff)
downloadredis-1547d72cf3ec4ef5b67cbba7cf50726983be31e4.tar.gz
XACK should be executed in a "all or nothing" fashion.
First, we must parse the IDs, so that we abort ASAP. The return value of this command cannot be an error if the client successfully acknowledged some messages, so it should be executed in a "all or nothing" fashion.
-rw-r--r--src/t_stream.c12
-rw-r--r--tests/unit/type/stream-cgroups.tcl12
2 files changed, 23 insertions, 1 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 557d1d642..2bf6a26a7 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1921,11 +1921,21 @@ void xackCommand(client *c) {
return;
}
+ /* Start parsing the IDs, so that we abort ASAP if there is a syntax
+ * error: the return value of this command cannot be an error in case
+ * the client successfully acknowledged some messages, so it should be
+ * executed in a "all or nothing" fashion. */
+ for (int j = 3; j < c->argc; j++) {
+ streamID id;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
+ }
+
int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
- if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
+ serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL: it will have a reference to the
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl
index 072ed14d6..2fc0c51cd 100644
--- a/tests/unit/type/stream-cgroups.tcl
+++ b/tests/unit/type/stream-cgroups.tcl
@@ -93,6 +93,18 @@ start_server {
assert {[r XACK mystream mygroup $id1 $id2] eq 1}
}
+ test {XACK should fail if got at least one invalid ID} {
+ r del mystream
+ r xgroup create s g $ MKSTREAM
+ r xadd s * f1 v1
+ set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]]
+ assert {$c == 1}
+ set pending [r xpending s g - + 10 c]
+ set id1 [lindex $pending 0 0]
+ assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id}
+ assert {[r xack s g $id1] eq 1}
+ }
+
test {PEL NACK reassignment after XGROUP SETID event} {
r del events
r xadd events * f1 v1