summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-10-17 12:04:03 +0200
committerantirez <antirez@gmail.com>2018-10-17 12:04:06 +0200
commit2e3d403349af7e23a263632cd876479241d5d72e (patch)
treecfcfe49a092dcd339b37d59d21c11c04739efca7 /src/t_stream.c
parentab11c5ebd1adc9c6b794f275b74d5ad697c65ffe (diff)
downloadredis-2e3d403349af7e23a263632cd876479241d5d72e.tar.gz
Process MKSTREAM option of XGROUP CREATE at a later time.
This avoids issues with having to replicate a command that produced errors.
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c45
1 files changed, 28 insertions, 17 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index c366578c5..4c08affcc 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1701,24 +1701,29 @@ NULL
sds grpname = NULL;
streamCG *cg = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
+ int mkstream = 0;
+ robj *o;
- /* Lookup the key now, this is common for all the subcommands but HELP. */
- if (c->argc >= 4) {
- robj *o = lookupKeyWrite(c->db,c->argv[2]);
-
- /* CREATE has an MKSTREAM option that creates the stream if it
- * does not exist. */
- if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
- if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
- addReplySubcommandSyntaxError(c);
- return;
- }
- if (o == NULL) {
- o = createStreamObject();
- dbAdd(c->db,c->argv[2],o);
- }
+ /* CREATE has an MKSTREAM option that creates the stream if it
+ * does not exist. */
+ if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
+ if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
+ addReplySubcommandSyntaxError(c);
+ return;
}
+ mkstream = 1;
+ grpname = c->argv[3]->ptr;
+ }
+ /* Everything but the "HELP" option requires a key and group name. */
+ if (c->argc > 4) {
+ o = lookupKeyWrite(c->db,c->argv[2]);
+ if (o) s = o->ptr;
+ grpname = c->argv[3]->ptr;
+ }
+
+ /* Check for missing key/group. */
+ if (c->argc >= 4 && !mkstream) {
/* At this point key must exist, or there is an error. */
if (o == NULL) {
addReplyError(c,
@@ -1729,8 +1734,6 @@ NULL
}
if (checkType(c,o,OBJ_STREAM)) return;
- s = o->ptr;
- grpname = c->argv[3]->ptr;
/* Certain subcommands require the group to exist. */
if ((cg = streamLookupCG(s,grpname)) == NULL &&
@@ -1752,6 +1755,14 @@ NULL
} else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return;
}
+
+ /* Handle the MKSTREAM option now that the command can no longer fail. */
+ if (s == NULL && mkstream) {
+ robj *o = createStreamObject();
+ dbAdd(c->db,c->argv[2],o);
+ s = o->ptr;
+ }
+
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
if (cg) {
addReply(c,shared.ok);