summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-16 15:38:22 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit58f0c000a5630d22c221bc4291b46f2fc1654ead (patch)
treec7eaf5ccfd1beedc2444fdd76ca2d44ddc48540d
parentd8207d09eedb08c8d70bff3ed91e2f5307e9dcf8 (diff)
downloadredis-58f0c000a5630d22c221bc4291b46f2fc1654ead.tar.gz
CG: data structures design + XGROUP CREATE implementation.
-rw-r--r--src/server.c3
-rw-r--r--src/server.h1
-rw-r--r--src/stream.h41
-rw-r--r--src/t_stream.c100
4 files changed, 143 insertions, 2 deletions
diff --git a/src/server.c b/src/server.c
index 85f05f1f9..f718dfdd9 100644
--- a/src/server.c
+++ b/src/server.c
@@ -276,7 +276,7 @@ struct redisCommand redisCommandTable[] = {
{"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
{"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
{"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
- {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
+ {"object",objectCommand,-2,"r",0,NULL,2,2,1,0,0},
{"memory",memoryCommand,-2,"r",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
{"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
@@ -307,6 +307,7 @@ struct redisCommand redisCommandTable[] = {
{"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0},
+ {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
diff --git a/src/server.h b/src/server.h
index 0668c3752..72a6563a3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -2024,6 +2024,7 @@ void xrangeCommand(client *c);
void xrevrangeCommand(client *c);
void xlenCommand(client *c);
void xreadCommand(client *c);
+void xgroupCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/src/stream.h b/src/stream.h
index 214b6d9a5..75c9c57ef 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -17,6 +17,7 @@ typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
+ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
@@ -44,8 +45,46 @@ typedef struct streamIterator {
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
-/* Prototypes of exported APIs. */
+/* Consumer group. */
+typedef struct streamCG {
+ streamID lastid; /* Last delivered (not acknowledged) ID for this
+ group. Consumers that will just ask for more
+ messages will served with IDs > than this. */
+ rax *pel; /* Pending entries list. This is a radix tree that
+ has every message delivered to consumers (without
+ the NOACK option) that was yet not acknowledged
+ as processed. The key of the radix tree is the
+ ID as a 64 bit big endian number, while the
+ associated value is a streamNotAcked structure.*/
+ rax *consumers; /* A radix tree representing the consumers by name
+ and their associated representation in the form
+ of streamConsumer structures. */
+} streamCG;
+
+/* A specific consumer in a consumer group. */
+typedef struct streamConsumer {
+ mstime_t seen_time; /* Last time this consumer was active. */
+ sds *name; /* Consumer name. This is how the consumer
+ will be identified in the consumer group
+ protocol. Case sensitive. */
+ rax *pel; /* Consumer specific pending entries list: all
+ the pending messages delivered to this
+ consumer not yet acknowledged. Keys are
+ big endian message IDs, while values are
+ the same streamNotAcked structure referenced
+ in the "pel" of the conumser group structure
+ itself, so the value is shared. */
+} streamConsumer;
+/* Pending (yet not acknowledged) message in a consumer group. */
+typedef struct streamNotAcked {
+ mstime_t delivery_time; /* Last time this message was delivered. */
+ uint64_t delivery_count; /* Number of times this message was delivered.*/
+ streamConsumer *consumer; /* The consumer this message was delivered to
+ in the last delivery. */
+} streamNotAcked;
+
+/* Prototypes of exported APIs. */
struct client;
stream *streamNew(void);
diff --git a/src/t_stream.c b/src/t_stream.c
index 1f2e2094d..b9c0c4bd7 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -40,6 +40,8 @@
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
+void streamFreeCG(streamCG *cg);
+
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
* ----------------------------------------------------------------------- */
@@ -51,12 +53,15 @@ stream *streamNew(void) {
s->length = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
+ s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
/* Free a stream, including the listpacks stored inside the radix tree. */
void freeStream(stream *s) {
raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
+ if (s->cgroups)
+ raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
zfree(s);
}
@@ -1053,4 +1058,99 @@ cleanup:
if (ids != static_ids) zfree(ids);
}
+/* -----------------------------------------------------------------------
+ * Low level implementation of consumer groups
+ * ----------------------------------------------------------------------- */
+
+void streamNotAckedFree(streamNotAcked *na) {
+ zfree(na);
+}
+
+void streamConsumerFree(streamConsumer *sc) {
+ zfree(sc);
+}
+
+/* Create a new consumer group in the context of the stream 's', having the
+ * specified name and last server ID. If a consumer group with the same name
+ * already existed NULL is returned, otherwise the pointer to the consumer
+ * group is returned. */
+streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) {
+ if (s->cgroups == NULL) s->cgroups = raxNew();
+ if (raxFind(s->cgroups,(unsigned char*)name,namelen)) return NULL;
+
+ streamCG *cg = zmalloc(sizeof(*cg));
+ cg->pel = raxNew();
+ cg->consumers = raxNew();
+ cg->lastid = id;
+ raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
+ return cg;
+}
+
+/* Free a consumer group and all its associated data. */
+void streamFreeCG(streamCG *cg) {
+ raxFreeWithCallback(cg->pel,(void(*)(void*))streamNotAckedFree);
+ raxFreeWithCallback(cg->consumers,(void(*)(void*))streamConsumerFree);
+ zfree(cg);
+}
+
+/* -----------------------------------------------------------------------
+ * Consumer groups commands
+ * ----------------------------------------------------------------------- */
+
+/* XGROUP CREATE <key> <groupname> <id or $>
+ * XGROUP SETID <key> <id or $>
+ * XGROUP DELGROUP <key> <groupname>
+ * XGROUP DELCONSUMER <key> <groupname> <consumername> */
+void xgroupCommand(client *c) {
+ const char *help[] = {
+"CREATE <key> <groupname> <id or $> -- Create a new consumer group.",
+"SETID <key> <groupname> <id or $> -- Set the current group ID.",
+"DELGROUP <key> <groupname> -- Remove the specified group.",
+"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified conusmer.",
+"HELP -- Prints this help.",
+NULL
+ };
+ stream *s = NULL;
+ sds grpname = NULL;
+
+ /* Lookup the key now, this is common for all the subcommands but HELP. */
+ if (c->argc >= 4) {
+ robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
+ if (o == NULL) return;
+ s = o->ptr;
+ grpname = c->argv[3]->ptr;
+ }
+
+ char *opt = c->argv[1]->ptr;
+ if (!strcasecmp(opt,"CREATE") && c->argc == 5) {
+ streamID id;
+ if (!strcmp(c->argv[4]->ptr,"$")) {
+ id = s->last_id;
+ } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
+ return;
+ }
+ streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),id);
+ if (cg) {
+ addReply(c,shared.ok);
+ } else {
+ addReplyError(c,"Consumer Group name already exists");
+ }
+ } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
+ } else if (!strcasecmp(opt,"DELGROUP") && c->argc == 4) {
+ } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
+ } else if (!strcasecmp(opt,"HELP")) {
+ addReplyHelp(c, help);
+ } else {
+ addReply(c,shared.syntaxerr);
+ }
+}
+
+/* XPENDING <key> [<start> <stop>]. */
+
+/* XCLAIM <key> <group-name> <consumer-name> <min-idle-time> <ID-1> <ID-2> ...*/
+
+/* XACK <stream-key> */
+
+/* XREAD-GROUP will be implemented by xreadGenericCommand() */
+/* XINFO <key> [CONSUMERS|GROUPS|STREAM]. STREAM is the default */