summaryrefslogtreecommitdiff
path: root/src/stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.h')
-rw-r--r--src/stream.h17
1 files changed, 15 insertions, 2 deletions
diff --git a/src/stream.h b/src/stream.h
index 724f2c2ad..2d4997919 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -15,8 +15,11 @@ typedef struct streamID {
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
- uint64_t length; /* Number of elements inside this stream. */
+ uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
+ streamID first_id; /* The first non-tombstone entry, zero if empty. */
+ streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
+ uint64_t entries_added; /* All time count of elements added. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
@@ -34,6 +37,7 @@ typedef struct streamIterator {
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
+ int skip_tombstones; /* True if not emitting tombstone entries. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
@@ -52,6 +56,11 @@ typedef struct streamCG {
streamID last_id; /* Last delivered (not acknowledged) ID for this
group. Consumers that will just ask for more
messages will served with IDs > than this. */
+ long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
+ XGROUP SETID, ...), this is the total number of
+ group reads. In the real world, the reasoning behind
+ this value is detailed at the top comment of
+ streamEstimateDistanceFromFirstEverEntry(). */
rax *pel; /* Pending entries list. This is a radix tree that
has every message delivered to consumers (without
the NOACK option) that was yet not acknowledged
@@ -105,6 +114,8 @@ struct client;
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
+#define SCG_INVALID_ENTRIES_READ -1
+
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
@@ -117,7 +128,7 @@ void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
-streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
+streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
@@ -131,6 +142,8 @@ int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
int streamDeleteItem(stream *s, streamID *id);
+void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
+long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);