summaryrefslogtreecommitdiff
path: root/src/stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.h')
-rw-r--r--src/stream.h41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/stream.h b/src/stream.h
index 214b6d9a5..75c9c57ef 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -17,6 +17,7 @@ typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
+ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
@@ -44,8 +45,46 @@ typedef struct streamIterator {
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
-/* Prototypes of exported APIs. */
+/* Consumer group. */
+typedef struct streamCG {
+ streamID lastid; /* Last delivered (not acknowledged) ID for this
+ group. Consumers that will just ask for more
+ messages will served with IDs > than this. */
+ rax *pel; /* Pending entries list. This is a radix tree that
+ has every message delivered to consumers (without
+ the NOACK option) that was yet not acknowledged
+ as processed. The key of the radix tree is the
+ ID as a 64 bit big endian number, while the
+ associated value is a streamNotAcked structure.*/
+ rax *consumers; /* A radix tree representing the consumers by name
+ and their associated representation in the form
+ of streamConsumer structures. */
+} streamCG;
+
+/* A specific consumer in a consumer group. */
+typedef struct streamConsumer {
+ mstime_t seen_time; /* Last time this consumer was active. */
+ sds *name; /* Consumer name. This is how the consumer
+ will be identified in the consumer group
+ protocol. Case sensitive. */
+ rax *pel; /* Consumer specific pending entries list: all
+ the pending messages delivered to this
+ consumer not yet acknowledged. Keys are
+ big endian message IDs, while values are
+ the same streamNotAcked structure referenced
+ in the "pel" of the conumser group structure
+ itself, so the value is shared. */
+} streamConsumer;
+/* Pending (yet not acknowledged) message in a consumer group. */
+typedef struct streamNotAcked {
+ mstime_t delivery_time; /* Last time this message was delivered. */
+ uint64_t delivery_count; /* Number of times this message was delivered.*/
+ streamConsumer *consumer; /* The consumer this message was delivered to
+ in the last delivery. */
+} streamNotAcked;
+
+/* Prototypes of exported APIs. */
struct client;
stream *streamNew(void);