diff options
author | Itamar Haber <itamar@redislabs.com> | 2017-12-05 18:14:59 +0200 |
---|---|---|
committer | Itamar Haber <itamar@redislabs.com> | 2017-12-05 18:14:59 +0200 |
commit | 8b51121998bd5b0f3f4992548ad5f4a929c2a9d7 (patch) | |
tree | 4b48521968561f0f3e02a1da5923bbb1f8702567 /src/stream.h | |
parent | 51eb6cb39513188001bd24e693868451ae267340 (diff) | |
parent | 62a4b817c6e83eedf96a451f45dd943099258fd0 (diff) | |
download | redis-8b51121998bd5b0f3f4992548ad5f4a929c2a9d7.tar.gz |
Merge remote-tracking branch 'upstream/unstable' into help_subcommands
Diffstat (limited to 'src/stream.h')
-rw-r--r-- | src/stream.h | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/src/stream.h b/src/stream.h new file mode 100644 index 000000000..214b6d9a5 --- /dev/null +++ b/src/stream.h @@ -0,0 +1,59 @@ +#ifndef STREAM_H +#define STREAM_H + +#include "rax.h" +#include "listpack.h" + +/* Stream item ID: a 128 bit number composed of a milliseconds time and + * a sequence counter. IDs generated in the same millisecond (or in a past + * millisecond if the clock jumped backward) will use the millisecond time + * of the latest generated ID and an incremented sequence. */ +typedef struct streamID { + uint64_t ms; /* Unix time in milliseconds. */ + uint64_t seq; /* Sequence number. */ +} streamID; + +typedef struct stream { + rax *rax; /* The radix tree holding the stream. */ + uint64_t length; /* Number of elements inside this stream. */ + streamID last_id; /* Zero if there are yet no items. */ +} stream; + +/* We define an iterator to iterate stream items in an abstract way, without + * caring about the radix tree + listpack representation. Technically speaking + * the iterator is only used inside streamReplyWithRange(), so could just + * be implemented inside the function, but practically there is the AOF + * rewriting code that also needs to iterate the stream to emit the XADD + * commands. */ +typedef struct streamIterator { + streamID master_id; /* ID of the master entry at listpack head. */ + uint64_t master_fields_count; /* Master entries # of fields. */ + unsigned char *master_fields_start; /* Master entries start in listpack. */ + unsigned char *master_fields_ptr; /* Master field to emit next. */ + int entry_flags; /* Flags of entry we are emitting. */ + int rev; /* True if iterating end to start (reverse). */ + uint64_t start_key[2]; /* Start key as 128 bit big endian. */ + uint64_t end_key[2]; /* End key as 128 bit big endian. */ + raxIterator ri; /* Rax iterator. */ + unsigned char *lp; /* Current listpack. */ + unsigned char *lp_ele; /* Current listpack cursor. */ + /* Buffers used to hold the string of lpGet() when the element is + * integer encoded, so that there is no string representation of the + * element inside the listpack itself. */ + unsigned char field_buf[LP_INTBUF_SIZE]; + unsigned char value_buf[LP_INTBUF_SIZE]; +} streamIterator; + +/* Prototypes of exported APIs. */ + +struct client; + +stream *streamNew(void); +void freeStream(stream *s); +size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev); +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); +int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); +void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); +void streamIteratorStop(streamIterator *si); + +#endif |