summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-27 17:41:32 +0200
committerantirez <antirez@gmail.com>2017-12-01 10:24:24 +0100
commit3f2d7e277e1754a5a421948bfbd45b5725a05148 (patch)
treefd3bd213aebcc8aa865f6c7308a731281bc44727 /src
parent8f00cf85a7ee93da987e8d0b899ada33f2a88505 (diff)
downloadredis-3f2d7e277e1754a5a421948bfbd45b5725a05148.tar.gz
Streams: items compression implemented.
The approach used is to set a fixed header at the start of every listpack blob (that contains many entries). The header contains a "master" ID and fields, that are initially just obtained from the first entry inserted in the listpack, so that the first enty is always well compressed. Later every new entry is checked against these fields, and if it matches, the SAMEFIELD flag is set in the entry so that we know to just use the master entry flags. The IDs are always delta-encoded against the first entry. This approach avoids cascading effects in which entries are encoded depending on the previous entries, in order to avoid complexity and rewritings of the data when data is removed in the middle (which is a planned feature).
Diffstat (limited to 'src')
-rw-r--r--src/stream.h5
-rw-r--r--src/t_stream.c171
2 files changed, 142 insertions, 34 deletions
diff --git a/src/stream.h b/src/stream.h
index e38009321..df29e9e70 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -26,6 +26,11 @@ typedef struct stream {
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
+ streamID master_id; /* ID of the master entry at listpack head. */
+ uint64_t master_fields_count; /* Master entries # of fields. */
+ unsigned char *master_fields_start; /* Master entries start in listapck. */
+ unsigned char *master_fields_ptr; /* Master field to emit next. */
+ int entry_flags; /* Flags of entry we are emitting. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
diff --git a/src/t_stream.c b/src/t_stream.c
index 760050085..5250c36b3 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -27,16 +27,19 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
-/* TODO:
- * - After loading a stream, populate the last ID.
- */
-
#include "server.h"
#include "endianconv.h"
#include "stream.h"
#define STREAM_BYTES_PER_LISTPACK 4096
+/* Every stream item inside the listpack, has a flags field that is used to
+ * mark the entry as deleted, or having the same field as the "master"
+ * entry at the start of the listpack> */
+#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
+#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
+#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
+
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
* ----------------------------------------------------------------------- */
@@ -95,6 +98,19 @@ int64_t lpGetInteger(unsigned char *ele) {
return v;
}
+/* Debugging function to log the full content of a listpack. Useful
+ * for development and debugging. */
+void streamLogListpackContent(unsigned char *lp) {
+ unsigned char *p = lpFirst(lp);
+ while(p) {
+ unsigned char buf[LP_INTBUF_SIZE];
+ int64_t v;
+ unsigned char *ele = lpGet(p,&v,buf);
+ serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
+ p = lpNext(lp,p);
+ }
+}
+
/* Convert the specified stream entry ID as a 128 bit big endian number, so
* that the IDs can be sorted lexicographically. */
void streamEncodeID(void *buf, streamID *id) {
@@ -159,32 +175,82 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
- uint64_t entry_id[2]; /* Entry ID of the new item as 128 bit string. */
- streamEncodeID(entry_id,&id);
+ streamID master_id; /* ID of the master entry in the listpack. */
/* Create a new listpack and radix tree node if needed. */
+ int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
+ master_id = id;
+ streamEncodeID(rax_key,&id);
+ /* Create the listpack having the master entry ID and fields. */
lp = lpNew();
- rax_key[0] = entry_id[0];
- rax_key[1] = entry_id[1];
+ lp = lpAppend(lp,(unsigned char*)rax_key,sizeof(rax_key));
+ lp = lpAppendInteger(lp,numfields);
+ for (int i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr;
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ }
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ /* The first entry we insert, has obviously the same fields of the
+ * master entry. */
+ flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
} else {
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key,ri.key,sizeof(rax_key));
+
+ /* Read the master entry ID. */
+ int64_t e_len;
+ unsigned char *lp_ele = lpFirst(lp);
+ unsigned char buf[LP_INTBUF_SIZE];
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ serverAssert(e_len == sizeof(streamID));
+ streamDecodeID(e,&master_id);
+ lp_ele = lpNext(lp,lp_ele);
+
+ /* Check if the entry we are adding, have the same fields
+ * as the master entry. */
+ int master_fields_count = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp,lp_ele);
+ if (numfields == master_fields_count) {
+ int i;
+ for (i = 0; i < master_fields_count; i++) {
+ sds field = argv[i*2]->ptr;
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ /* Stop if there is a mismatch. */
+ if (sdslen(field) != (size_t)e_len ||
+ memcmp(e,field,e_len) != 0) break;
+ lp_ele = lpNext(lp,lp_ele);
+ }
+ if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
+ }
}
/* Populate the listpack with the new entry. We use the following
* encoding:
*
- * +--------+----------+-------+-------+-/-+-------+-------+
- * |entry-id|num-fields|field-1|value-1|...|field-N|value-N|
- * +--------+----------+-------+-------+-/-+-------+-------+
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+
+ * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+
+ *
+ * However if the SAMEFIELD flag is set, we have just to populate
+ * the entry with the values, so it becomes:
+ *
+ * +-----+--------+-------+-/-+-------+
+ * |flags|entry-id|value-1|...|value-N|
+ * +-----+--------+-------+-/-+-------+
+ *
+ * The entry-id field is actually two separated fields: the ms
+ * and seq difference compared to the master entry.
*/
- lp = lpAppend(lp,(unsigned char*)entry_id,sizeof(entry_id));
- lp = lpAppendInteger(lp,numfields);
+ lp = lpAppendInteger(lp,flags);
+ lp = lpAppendInteger(lp,id.ms - master_id.ms);
+ lp = lpAppendInteger(lp,id.seq - master_id.seq);
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppendInteger(lp,numfields);
for (int i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
- lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
@@ -259,35 +325,67 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
if (!raxNext(&si->ri)) return 0;
serverAssert(si->ri.key_len == sizeof(streamID));
si->lp = si->ri.data;
- si->lp_ele = lpFirst(si->lp);
- }
-
- /* For every radix tree node, iterate the corresponding listpack,
- * returning elements when they are within range. */
- while(si->lp_ele) {
+ si->lp_ele = lpFirst(si->lp); /* Seek the master ID. */
+ /* Get the master ID. */
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(si->lp_ele,&e_len,buf);
serverAssert(e_len == sizeof(streamID));
+ streamDecodeID(e,&si->master_id);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek fields count. */
+ /* Get the master fields count. */
+ si->master_fields_count = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
+ si->master_fields_start = si->lp_ele;
+ /* Skip master fileds to seek the first entry. */
+ for (uint64_t i = 0; i < si->master_fields_count; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
- /* Go to next field: number of elements. */
+ /* For every radix tree node, iterate the corresponding listpack,
+ * returning elements when they are within range. */
+ while(si->lp_ele) {
+ /* Get the flags entry. */
+ int flags = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
+
+ /* Get the ID: it is encoded as difference between the master
+ * ID and this entry ID. */
+ *id = si->master_id;
+ id->ms += lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele);
+ id->seq += lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,id);
- /* If current >= start */
- if (memcmp(e,si->start_key,sizeof(streamID)) >= 0) {
- if (memcmp(e,si->end_key,sizeof(streamID)) > 0)
- return 0; /* We are already out of range. */
- streamDecodeID(e,id);
+ /* The number of entries is here or not depending on the
+ * flags. */
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *numfields = si->master_fields_count;
+ } else {
*numfields = lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
+
+ /* If current >= start, and the entry is not marked as
+ * deleted, emit it. */
+ if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
+ !(flags & STREAM_ITEM_FLAG_DELETED))
+ {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
return 1; /* Valid item returned. */
- } else {
- /* If we do not emit, we have to discard. */
- int64_t numfields = lpGetInteger(si->lp_ele);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- for (int64_t i = 0; i < numfields*2; i++)
- si->lp_ele = lpNext(si->lp,si->lp_ele);
}
+
+ /* If we do not emit, we have to discard. */
+ int to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
+ *numfields : *numfields*2;
+ for (int64_t i = 0; i < to_discard; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
}
/* End of listpack reached. Try the next radix tree node. */
@@ -301,8 +399,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
* lengths by reference, that are valid until the next iterator call, assuming
* no one touches the stream meanwhile. */
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
- *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
- si->lp_ele = lpNext(si->lp,si->lp_ele);
+ if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
+ si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
+ } else {
+ *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
*valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
si->lp_ele = lpNext(si->lp,si->lp_ele);
}