diff options
Diffstat (limited to 'src/stream.h')
-rw-r--r-- | src/stream.h | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/src/stream.h b/src/stream.h index 214b6d9a5..75c9c57ef 100644 --- a/src/stream.h +++ b/src/stream.h @@ -17,6 +17,7 @@ typedef struct stream { rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Number of elements inside this stream. */ streamID last_id; /* Zero if there are yet no items. */ + rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream; /* We define an iterator to iterate stream items in an abstract way, without @@ -44,8 +45,46 @@ typedef struct streamIterator { unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator; -/* Prototypes of exported APIs. */ +/* Consumer group. */ +typedef struct streamCG { + streamID lastid; /* Last delivered (not acknowledged) ID for this + group. Consumers that will just ask for more + messages will served with IDs > than this. */ + rax *pel; /* Pending entries list. This is a radix tree that + has every message delivered to consumers (without + the NOACK option) that was yet not acknowledged + as processed. The key of the radix tree is the + ID as a 64 bit big endian number, while the + associated value is a streamNotAcked structure.*/ + rax *consumers; /* A radix tree representing the consumers by name + and their associated representation in the form + of streamConsumer structures. */ +} streamCG; + +/* A specific consumer in a consumer group. */ +typedef struct streamConsumer { + mstime_t seen_time; /* Last time this consumer was active. */ + sds *name; /* Consumer name. This is how the consumer + will be identified in the consumer group + protocol. Case sensitive. */ + rax *pel; /* Consumer specific pending entries list: all + the pending messages delivered to this + consumer not yet acknowledged. Keys are + big endian message IDs, while values are + the same streamNotAcked structure referenced + in the "pel" of the conumser group structure + itself, so the value is shared. */ +} streamConsumer; +/* Pending (yet not acknowledged) message in a consumer group. */ +typedef struct streamNotAcked { + mstime_t delivery_time; /* Last time this message was delivered. */ + uint64_t delivery_count; /* Number of times this message was delivered.*/ + streamConsumer *consumer; /* The consumer this message was delivered to + in the last delivery. */ +} streamNotAcked; + +/* Prototypes of exported APIs. */ struct client; stream *streamNew(void); |