summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/stream.h5
-rw-r--r--src/t_stream.c171
2 files changed, 142 insertions, 34 deletions
diff --git a/src/stream.h b/src/stream.h
index e38009321..df29e9e70 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -26,6 +26,11 @@ typedef struct stream {
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
+ streamID master_id; /* ID of the master entry at listpack head. */
+ uint64_t master_fields_count; /* Master entries # of fields. */
+ unsigned char *master_fields_start; /* Master entries start in listapck. */
+ unsigned char *master_fields_ptr; /* Master field to emit next. */
+ int entry_flags; /* Flags of entry we are emitting. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
diff --git a/src/t_stream.c b/src/t_stream.c
index 760050085..5250c36b3 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -27,16 +27,19 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
-/* TODO:
- * - After loading a stream, populate the last ID.
- */
-
#include "server.h"
#include "endianconv.h"
#include "stream.h"
#define STREAM_BYTES_PER_LISTPACK 4096
+/* Every stream item inside the listpack, has a flags field that is used to
+ * mark the entry as deleted, or having the same field as the "master"
+ * entry at the start of the listpack> */
+#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
+#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
+#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
+
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
* ----------------------------------------------------------------------- */
@@ -95,6 +98,19 @@ int64_t lpGetInteger(unsigned char *ele) {
return v;
}
+/* Debugging function to log the full content of a listpack. Useful
+ * for development and debugging. */
+void streamLogListpackContent(unsigned char *lp) {
+ unsigned char *p = lpFirst(lp);
+ while(p) {
+ unsigned char buf[LP_INTBUF_SIZE];
+ int64_t v;
+ unsigned char *ele = lpGet(p,&v,buf);
+ serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
+ p = lpNext(lp,p);
+ }
+}
+
/* Convert the specified stream entry ID as a 128 bit big endian number, so
* that the IDs can be sorted lexicographically. */
void streamEncodeID(void *buf, streamID *id) {
@@ -159,32 +175,82 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
- uint64_t entry_id[2]; /* Entry ID of the new item as 128 bit string. */
- streamEncodeID(entry_id,&id);
+ streamID master_id; /* ID of the master entry in the listpack. */
/* Create a new listpack and radix tree node if needed. */
+ int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
+ master_id = id;
+ streamEncodeID(rax_key,&id);
+ /* Create the listpack having the master entry ID and fields. */
lp = lpNew();
- rax_key[0] = entry_id[0];
- rax_key[1] = entry_id[1];
+ lp = lpAppend(lp,(unsigned char*)rax_key,sizeof(rax_key));
+ lp = lpAppendInteger(lp,numfields);
+ for (int i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr;
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ }
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ /* The first entry we insert, has obviously the same fields of the
+ * master entry. */
+ flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
} else {
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key,ri.key,sizeof(rax_key));
+
+ /* Read the master entry ID. */
+ int64_t e_len;
+ unsigned char *lp_ele = lpFirst(lp);
+ unsigned char buf[LP_INTBUF_SIZE];
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ serverAssert(e_len == sizeof(streamID));
+ streamDecodeID(e,&master_id);
+ lp_ele = lpNext(lp,lp_ele);
+
+ /* Check if the entry we are adding, have the same fields
+ * as the master entry. */
+ int master_fields_count = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp,lp_ele);
+ if (numfields == master_fields_count) {
+ int i;
+ for (i = 0; i < master_fields_count; i++) {
+ sds field = argv[i*2]->ptr;
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ /* Stop if there is a mismatch. */
+ if (sdslen(field) != (size_t)e_len ||
+ memcmp(e,field,e_len) != 0) break;
+ lp_ele = lpNext(lp,lp_ele);
+ }
+ if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
+ }
}
/* Populate the listpack with the new entry. We use the following
* encoding:
*
- * +--------+----------+-------+-------+-/-+-------+-------+
- * |entry-id|num-fields|field-1|value-1|...|field-N|value-N|
- * +--------+----------+-------+-------+-/-+-------+-------+
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+
+ * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+
+ *
+ * However if the SAMEFIELD flag is set, we have just to populate
+ * the entry with the values, so it becomes:
+ *
+ * +-----+--------+-------+-/-+-------+
+ * |flags|entry-id|value-1|...|value-N|
+ * +-----+--------+-------+-/-+-------+
+ *
+ * The entry-id field is actually two separated fields: the ms
+ * and seq difference compared to the master entry.
*/
- lp = lpAppend(lp,(unsigned char*)entry_id,sizeof(entry_id));
- lp = lpAppendInteger(lp,numfields);
+ lp = lpAppendInteger(lp,flags);
+ lp = lpAppendInteger(lp,id.ms - master_id.ms);
+ lp = lpAppendInteger(lp,id.seq - master_id.seq);
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppendInteger(lp,numfields);
for (int i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
- lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
@@ -259,35 +325,67 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
if (!raxNext(&si->ri)) return 0;
serverAssert(si->ri.key_len == sizeof(streamID));
si->lp = si->ri.data;
- si->lp_ele = lpFirst(si->lp);
- }
-
- /* For every radix tree node, iterate the corresponding listpack,
- * returning elements when they are within range. */
- while(si->lp_ele) {
+ si->lp_ele = lpFirst(si->lp); /* Seek the master ID. */
+ /* Get the master ID. */
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(si->lp_ele,&e_len,buf);
serverAssert(e_len == sizeof(streamID));
+ streamDecodeID(e,&si->master_id);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek fields count. */
+ /* Get the master fields count. */
+ si->master_fields_count = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
+ si->master_fields_start = si->lp_ele;
+ /* Skip master fileds to seek the first entry. */
+ for (uint64_t i = 0; i < si->master_fields_count; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
- /* Go to next field: number of elements. */
+ /* For every radix tree node, iterate the corresponding listpack,
+ * returning elements when they are within range. */
+ while(si->lp_ele) {
+ /* Get the flags entry. */
+ int flags = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
+
+ /* Get the ID: it is encoded as difference between the master
+ * ID and this entry ID. */
+ *id = si->master_id;
+ id->ms += lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele);
+ id->seq += lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,id);
- /* If current >= start */
- if (memcmp(e,si->start_key,sizeof(streamID)) >= 0) {
- if (memcmp(e,si->end_key,sizeof(streamID)) > 0)
- return 0; /* We are already out of range. */
- streamDecodeID(e,id);
+ /* The number of entries is here or not depending on the
+ * flags. */
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *numfields = si->master_fields_count;
+ } else {
*numfields = lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
+
+ /* If current >= start, and the entry is not marked as
+ * deleted, emit it. */
+ if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
+ !(flags & STREAM_ITEM_FLAG_DELETED))
+ {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
return 1; /* Valid item returned. */
- } else {
- /* If we do not emit, we have to discard. */
- int64_t numfields = lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- for (int64_t i = 0; i < numfields*2; i++)
- si->lp_ele = lpNext(si->lp,si->lp_ele);
}
+
+ /* If we do not emit, we have to discard. */
+ int to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
+ *numfields : *numfields*2;
+ for (int64_t i = 0; i < to_discard; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
}
/* End of listpack reached. Try the next radix tree node. */
@@ -301,8 +399,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
* lengths by reference, that are valid until the next iterator call, assuming
* no one touches the stream meanwhile. */
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
- *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
+ if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
+ si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
+ } else {
+ *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
*valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
si->lp_ele = lpNext(si->lp,si->lp_ele);
}