summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-09-28 16:55:46 +0200
committerantirez <antirez@gmail.com>2017-11-04 17:39:27 +0100
commite0638635c48ef868bfb35a3439ad64623a8024d4 (patch)
tree58f039b53d8e557bb6fdb88329641350163c2f66
parent804bde981a1d33aae0dec2c5b1422ae22129ff33 (diff)
downloadredis-e0638635c48ef868bfb35a3439ad64623a8024d4.tar.gz
Streams: delta encode IDs based on key. Add count + deleted fields.
We used to have the master ID stored at the start of the listpack, however using the key directly makes more sense in order to create a space efficient representation: anyway the key at the radix tree is very unlikely to change because of how the stream is implemented. Moreover on nodes merging, to rewrite the merged listpacks is anyway the most sensible operation, and we can use the iterator and the append-to-stream function in order to avoid re-implementing the code needed for merging. This commit also adds two items at the start of the listpack: the number of valid items inside the listpack, and the number of items marked as deleted. This means that there is no need to scan a listpack in order to understand if it's a good candidate for garbage collection, if the ration between valid/deleted items triggers the GC.
-rw-r--r--src/rdb.c28
-rw-r--r--src/t_stream.c72
2 files changed, 61 insertions, 39 deletions
diff --git a/src/rdb.c b/src/rdb.c
index a1da62e87..0bc9043bd 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -780,6 +780,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
while (raxNext(&ri)) {
unsigned char *lp = ri.data;
size_t lp_bytes = lpBytes(lp);
+ if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
+ nwritten += n;
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
nwritten += n;
}
@@ -1433,27 +1435,31 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
uint64_t listpacks = rdbLoadLen(rdb,NULL);
while(listpacks--) {
+ /* Get the master ID, the one we'll use as key of the radix tree
+ * node: the entries inside the listpack itself are delta-encoded
+ * relatively to this ID. */
+ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
+ if (sdslen(nodekey) != sizeof(streamID)) {
+ rdbExitReportCorruptRDB("Stream node key entry is not the "
+ "size of a stream ID");
+ }
+
+ /* Load the listpack. */
unsigned char *lp =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
if (lp == NULL) return NULL;
unsigned char *first = lpFirst(lp);
if (first == NULL) {
- /* Serialized listpacks should never be free, since on
+ /* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is emtpy. */
rdbExitReportCorruptRDB("Empty listpack inside stream");
}
- /* Get the ID of the first entry: we'll use it as key to add the
- * listpack into the radix tree. */
- int64_t e_len;
- unsigned char buf[LP_INTBUF_SIZE];
- unsigned char *e = lpGet(first,&e_len,buf);
- if (e_len != sizeof(streamID)) {
- rdbExitReportCorruptRDB("Listpack first entry is not the "
- "size of a stream ID");
- }
- int retval = raxInsert(s->rax,e,sizeof(streamID),lp,NULL);
+ /* Insert the key in the radix tree. */
+ int retval = raxInsert(s->rax,
+ (unsigned char*)nodekey,sizeof(streamID),lp,NULL);
+ sdsfree(nodekey);
if (!retval)
rdbExitReportCorruptRDB("Listpack re-added with existing key");
}
diff --git a/src/t_stream.c b/src/t_stream.c
index bfc6e4c9a..00d07ac57 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -83,6 +83,16 @@ unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
return lpAppend(lp,(unsigned char*)buf,slen);
}
+/* This is just a wrapper for lpReplace() to directly use a 64 bit integer
+ * instead of a string to replace the current element. The function returns
+ * the new listpack as return value, and also updates the current cursor
+ * by updating '*pos'. */
+unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) {
+ char buf[LONG_STR_SIZE];
+ int slen = ll2string(buf,sizeof(buf),value);
+ return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE, pos);
+}
+
/* This is a wrapper function for lpGet() to directly get an integer value
* from the listpack (that may store numbers as a string), converting
* the string if needed. */
@@ -179,26 +189,31 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
/* Create a new listpack and radix tree node if needed. Note that when
* a new listpack is created, we populate it with a "master entry". This
- * is just an ID and a set of fields that is taken as refernce in order
- * to compress the stream entries that we'll add inside the listpack.
+ * is just a set of fields that is taken as refernce in order to compress
+ * the stream entries that we'll add inside the listpack.
*
- * Note that while we use the first added entry ID and fields to create
+ * Note that while we use the first added entry fields to create
* the master entry, the first added entry is NOT represented in the master
* entry, which is a stand alone object. But of course, the first entry
* will compress well because it's used as reference.
*
- * The master entry is composed of just: an ID and a set of fields, like:
+ * The master entry is composed like in the following example:
+ *
+ * +-------+---------+------------+---------+--/--+---------+---------+
+ * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |
+ * +-------+---------+------------+---------+--/--+---------+---------+
*
- * +------------+------------+---------+---------+--/--+---------+
- * | 128 bit ID | num-fields | field_1 | field_2 | ... | field_N |
- * +------------+------------+---------+---------+--/--+---------+
+ * count and deleted just represent respectively the total number of
+ * entires inside the listpack that are valid, and marked as deleted
+ * (delted flag in the entry flags set). So the total number of items
+ * actually inside the listpack (both deleted and not) is count+deleted.
*
* The real entries will be encoded with an ID that is just the
- * millisecond and sequence difference compared to the master entry
- * (delta encoding), and if the fields of the entry are the same as
- * the master enty fields, the entry flags will specify this fact
- * and the entry fields and number of fields will be omitted (see later
- * in the code of this function). */
+ * millisecond and sequence difference compared to the key stored at
+ * the radix tree node containing the listpack (delta encoding), and
+ * if the fields of the entry are the same as the master enty fields, the
+ * entry flags will specify this fact and the entry fields and number
+ * of fields will be omitted (see later in the code of this function). */
int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
@@ -206,7 +221,8 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
streamEncodeID(rax_key,&id);
/* Create the listpack having the master entry ID and fields. */
lp = lpNew();
- lp = lpAppend(lp,(unsigned char*)rax_key,sizeof(rax_key));
+ lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
+ lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
lp = lpAppendInteger(lp,numfields);
for (int i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr;
@@ -220,14 +236,15 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key,ri.key,sizeof(rax_key));
- /* Read the master entry ID. */
- int64_t e_len;
+ /* Read the master ID from the radix tree key. */
+ streamDecodeID(rax_key,&master_id);
unsigned char *lp_ele = lpFirst(lp);
- unsigned char buf[LP_INTBUF_SIZE];
- unsigned char *e = lpGet(lp_ele,&e_len,buf);
- serverAssert(e_len == sizeof(streamID));
- streamDecodeID(e,&master_id);
- lp_ele = lpNext(lp,lp_ele);
+
+ /* Update count and skip the deleted fields. */
+ int64_t count = lpGetInteger(lp_ele);
+ lp = lpReplaceInteger(lp,&lp_ele,count+1);
+ lp_ele = lpNext(lp,lp_ele); /* seek delted. */
+ lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
/* Check if the entry we are adding, have the same fields
* as the master entry. */
@@ -237,6 +254,8 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
int i;
for (i = 0; i < master_fields_count; i++) {
sds field = argv[i*2]->ptr;
+ int64_t e_len;
+ unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(lp_ele,&e_len,buf);
/* Stop if there is a mismatch. */
if (sdslen(field) != (size_t)e_len ||
@@ -348,16 +367,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
if (si->lp == NULL || si->lp_ele == NULL) {
if (!raxNext(&si->ri)) return 0;
serverAssert(si->ri.key_len == sizeof(streamID));
- si->lp = si->ri.data;
- si->lp_ele = lpFirst(si->lp); /* Seek the master ID. */
/* Get the master ID. */
- int64_t e_len;
- unsigned char buf[LP_INTBUF_SIZE];
- unsigned char *e = lpGet(si->lp_ele,&e_len,buf);
- serverAssert(e_len == sizeof(streamID));
- streamDecodeID(e,&si->master_id);
- si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek fields count. */
+ streamDecodeID(si->ri.key,&si->master_id);
/* Get the master fields count. */
+ si->lp = si->ri.data;
+ si->lp_ele = lpFirst(si->lp); /* Seek items count */
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
si->master_fields_count = lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
si->master_fields_start = si->lp_ele;