summaryrefslogtreecommitdiff
path: root/src/stream.h
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-01-16 15:38:22 +0100
committerantirez <antirez@gmail.com>2018-03-15 12:54:10 +0100
commit58f0c000a5630d22c221bc4291b46f2fc1654ead (patch)
treec7eaf5ccfd1beedc2444fdd76ca2d44ddc48540d /src/stream.h
parentd8207d09eedb08c8d70bff3ed91e2f5307e9dcf8 (diff)
downloadredis-58f0c000a5630d22c221bc4291b46f2fc1654ead.tar.gz
CG: data structures design + XGROUP CREATE implementation.
Diffstat (limited to 'src/stream.h')
-rw-r--r--src/stream.h41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/stream.h b/src/stream.h
index 214b6d9a5..75c9c57ef 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -17,6 +17,7 @@ typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
+ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
@@ -44,8 +45,46 @@ typedef struct streamIterator {
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
-/* Prototypes of exported APIs. */
+/* Consumer group. */
+typedef struct streamCG {
+ streamID lastid; /* Last delivered (not acknowledged) ID for this
+ group. Consumers that will just ask for more
+ messages will served with IDs > than this. */
+ rax *pel; /* Pending entries list. This is a radix tree that
+ has every message delivered to consumers (without
+ the NOACK option) that was yet not acknowledged
+ as processed. The key of the radix tree is the
+ ID as a 64 bit big endian number, while the
+ associated value is a streamNotAcked structure.*/
+ rax *consumers; /* A radix tree representing the consumers by name
+ and their associated representation in the form
+ of streamConsumer structures. */
+} streamCG;
+
+/* A specific consumer in a consumer group. */
+typedef struct streamConsumer {
+ mstime_t seen_time; /* Last time this consumer was active. */
+ sds *name; /* Consumer name. This is how the consumer
+ will be identified in the consumer group
+ protocol. Case sensitive. */
+ rax *pel; /* Consumer specific pending entries list: all
+ the pending messages delivered to this
+ consumer not yet acknowledged. Keys are
+ big endian message IDs, while values are
+ the same streamNotAcked structure referenced
+ in the "pel" of the conumser group structure
+ itself, so the value is shared. */
+} streamConsumer;
+/* Pending (yet not acknowledged) message in a consumer group. */
+typedef struct streamNotAcked {
+ mstime_t delivery_time; /* Last time this message was delivered. */
+ uint64_t delivery_count; /* Number of times this message was delivered.*/
+ streamConsumer *consumer; /* The consumer this message was delivered to
+ in the last delivery. */
+} streamNotAcked;
+
+/* Prototypes of exported APIs. */
struct client;
stream *streamNew(void);