summaryrefslogtreecommitdiff
path: root/src/stream.h
diff options
context:
space:
mode:
authorItamar Haber <itamar@redislabs.com>2017-12-05 18:14:59 +0200
committerItamar Haber <itamar@redislabs.com>2017-12-05 18:14:59 +0200
commit8b51121998bd5b0f3f4992548ad5f4a929c2a9d7 (patch)
tree4b48521968561f0f3e02a1da5923bbb1f8702567 /src/stream.h
parent51eb6cb39513188001bd24e693868451ae267340 (diff)
parent62a4b817c6e83eedf96a451f45dd943099258fd0 (diff)
downloadredis-8b51121998bd5b0f3f4992548ad5f4a929c2a9d7.tar.gz
Merge remote-tracking branch 'upstream/unstable' into help_subcommands
Diffstat (limited to 'src/stream.h')
-rw-r--r--src/stream.h59
1 files changed, 59 insertions, 0 deletions
diff --git a/src/stream.h b/src/stream.h
new file mode 100644
index 000000000..214b6d9a5
--- /dev/null
+++ b/src/stream.h
@@ -0,0 +1,59 @@
+#ifndef STREAM_H
+#define STREAM_H
+
+#include "rax.h"
+#include "listpack.h"
+
+/* Stream item ID: a 128 bit number composed of a milliseconds time and
+ * a sequence counter. IDs generated in the same millisecond (or in a past
+ * millisecond if the clock jumped backward) will use the millisecond time
+ * of the latest generated ID and an incremented sequence. */
+typedef struct streamID {
+ uint64_t ms; /* Unix time in milliseconds. */
+ uint64_t seq; /* Sequence number. */
+} streamID;
+
+typedef struct stream {
+ rax *rax; /* The radix tree holding the stream. */
+ uint64_t length; /* Number of elements inside this stream. */
+ streamID last_id; /* Zero if there are yet no items. */
+} stream;
+
+/* We define an iterator to iterate stream items in an abstract way, without
+ * caring about the radix tree + listpack representation. Technically speaking
+ * the iterator is only used inside streamReplyWithRange(), so could just
+ * be implemented inside the function, but practically there is the AOF
+ * rewriting code that also needs to iterate the stream to emit the XADD
+ * commands. */
+typedef struct streamIterator {
+ streamID master_id; /* ID of the master entry at listpack head. */
+ uint64_t master_fields_count; /* Master entries # of fields. */
+ unsigned char *master_fields_start; /* Master entries start in listpack. */
+ unsigned char *master_fields_ptr; /* Master field to emit next. */
+ int entry_flags; /* Flags of entry we are emitting. */
+ int rev; /* True if iterating end to start (reverse). */
+ uint64_t start_key[2]; /* Start key as 128 bit big endian. */
+ uint64_t end_key[2]; /* End key as 128 bit big endian. */
+ raxIterator ri; /* Rax iterator. */
+ unsigned char *lp; /* Current listpack. */
+ unsigned char *lp_ele; /* Current listpack cursor. */
+ /* Buffers used to hold the string of lpGet() when the element is
+ * integer encoded, so that there is no string representation of the
+ * element inside the listpack itself. */
+ unsigned char field_buf[LP_INTBUF_SIZE];
+ unsigned char value_buf[LP_INTBUF_SIZE];
+} streamIterator;
+
+/* Prototypes of exported APIs. */
+
+struct client;
+
+stream *streamNew(void);
+void freeStream(stream *s);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev);
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
+int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
+void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
+void streamIteratorStop(streamIterator *si);
+
+#endif