summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-15 12:17:25 +0200
committerantirez <antirez@gmail.com>2017-11-04 17:39:26 +0100
commit0e496bfbd430bf782779ebef2ef05d7a2f9bbe79 (patch)
treec1473165fd38f302c3aea5a2d72ba2083f83256c
parentb26320b5b411d8d39ad4e2c0313624323c7daba8 (diff)
downloadredis-0e496bfbd430bf782779ebef2ef05d7a2f9bbe79.tar.gz
Streams: export iteration API.
-rw-r--r--src/server.h5
-rw-r--r--src/stream.h31
-rw-r--r--src/t_stream.c19
3 files changed, 31 insertions, 24 deletions
diff --git a/src/server.h b/src/server.h
index d2b51dee4..41603a6da 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1425,11 +1425,6 @@ void listTypeConvert(robj *subject, int enc);
void unblockClientWaitingData(client *c);
void popGenericCommand(client *c, int where);
-/* Stream data type. */
-stream *streamNew(void);
-void freeStream(stream *s);
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count);
-
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);
void initClientMultiState(client *c);
diff --git a/src/stream.h b/src/stream.h
index e78af5bc5..e38009321 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -19,4 +19,35 @@ typedef struct stream {
streamID last_id; /* Zero if there are yet no items. */
} stream;
+/* We define an iterator to iterate stream items in an abstract way, without
+ * caring about the radix tree + listpack representation. Technically speaking
+ * the iterator is only used inside streamReplyWithRange(), so could just
+ * be implemented inside the function, but practically there is the AOF
+ * rewriting code that also needs to iterate the stream to emit the XADD
+ * commands. */
+typedef struct streamIterator {
+ uint64_t start_key[2]; /* Start key as 128 bit big endian. */
+ uint64_t end_key[2]; /* End key as 128 bit big endian. */
+ raxIterator ri; /* Rax iterator. */
+ unsigned char *lp; /* Current listpack. */
+ unsigned char *lp_ele; /* Current listpack cursor. */
+ /* Buffers used to hold the string of lpGet() when the element is
+ * integer encoded, so that there is no string representation of the
+ * element inside the listpack itself. */
+ unsigned char field_buf[LP_INTBUF_SIZE];
+ unsigned char value_buf[LP_INTBUF_SIZE];
+} streamIterator;
+
+/* Prototypes of exported APIs. */
+
+struct client;
+
+stream *streamNew(void);
+void freeStream(stream *s);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count);
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end);
+int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
+void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
+void streamIteratorStop(streamIterator *si);
+
#endif
diff --git a/src/t_stream.c b/src/t_stream.c
index de9561a51..3144adc7c 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -196,25 +196,6 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
return C_OK;
}
-/* We define an iterator to iterate stream items in an abstract way, without
- * caring about the radix tree + listpack representation. Technically speaking
- * the iterator is only used inside streamReplyWithRange(), so could just
- * be implemented inside the function, but practically there is the AOF
- * rewriting code that also needs to iterate the stream to emit the XADD
- * commands. */
-typedef struct streamIterator {
- uint64_t start_key[2]; /* Start key as 128 bit big endian. */
- uint64_t end_key[2]; /* End key as 128 bit big endian. */
- raxIterator ri; /* Rax iterator. */
- unsigned char *lp; /* Current listpack. */
- unsigned char *lp_ele; /* Current listpack cursor. */
- /* Buffers used to hold the string of lpGet() when the element is
- * integer encoded, so that there is no string representation of the
- * element inside the listpack itself. */
- unsigned char field_buf[LP_INTBUF_SIZE];
- unsigned char value_buf[LP_INTBUF_SIZE];
-} streamIterator;
-
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
* at the end.