summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-11-17 10:16:30 +0100
committerantirez <antirez@gmail.com>2017-11-17 10:16:35 +0100
commit8f61d70c8a60f661aef9caf7232ceb097611466e (patch)
tree0d8982f6008fd4173e7e657f9fb186ad8f6975a4
parentc33072404403aaa85f3a5183483ead40b766d744 (diff)
downloadredis-8f61d70c8a60f661aef9caf7232ceb097611466e.tar.gz
Streams: augment stream entries to allow backward scanning.
-rw-r--r--src/t_stream.c47
1 files changed, 35 insertions, 12 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index 61b229a5c..14eba44c0 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -199,9 +199,9 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
*
* The master entry is composed like in the following example:
*
- * +-------+---------+------------+---------+--/--+---------+---------+
- * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |
- * +-------+---------+------------+---------+--/--+---------+---------+
+ * +-------+---------+------------+---------+--/--+---------+---------+-+
+ * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
+ * +-------+---------+------------+---------+--/--+---------+---------+-+
*
* count and deleted just represent respectively the total number of
* entires inside the listpack that are valid, and marked as deleted
@@ -213,7 +213,11 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
* the radix tree node containing the listpack (delta encoding), and
* if the fields of the entry are the same as the master enty fields, the
* entry flags will specify this fact and the entry fields and number
- * of fields will be omitted (see later in the code of this function). */
+ * of fields will be omitted (see later in the code of this function).
+ *
+ * The "0" entry at the end is the same as the 'lp-count' entry in the
+ * regular stream entries (see below), and marks the fact that there are
+ * no more entires, when we scan the stream from right to left. */
int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
@@ -228,6 +232,7 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
sds field = argv[i*2]->ptr;
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
}
+ lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the
* master entry. */
@@ -271,20 +276,25 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
/* Populate the listpack with the new entry. We use the following
* encoding:
*
- * +-----+--------+----------+-------+-------+-/-+-------+-------+
- * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|
- * +-----+--------+----------+-------+-------+-/-+-------+-------+
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
+ * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
*
* However if the SAMEFIELD flag is set, we have just to populate
* the entry with the values, so it becomes:
*
- * +-----+--------+-------+-/-+-------+
- * |flags|entry-id|value-1|...|value-N|
- * +-----+--------+-------+-/-+-------+
+ * +-----+--------+-------+-/-+-------+--------+
+ * |flags|entry-id|value-1|...|value-N|lp-count|
+ * +-----+--------+-------+-/-+-------+--------+
*
* The entry-id field is actually two separated fields: the ms
* and seq difference compared to the master entry.
- */
+ *
+ * The lp-count field is a number that states the number of listpack pieces
+ * that compose the entry, so that it's possible to travel the entry
+ * in reverse order: we can just start from the end of the listpack, read
+ * the entry, and jump back N times to seek the "flags" field to read
+ * the stream full entry. */
lp = lpAppendInteger(lp,flags);
lp = lpAppendInteger(lp,id.ms - master_id.ms);
lp = lpAppendInteger(lp,id.seq - master_id.seq);
@@ -296,6 +306,11 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
+ /* Compute and store the lp-count field. */
+ int lp_count = numfields;
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) lp_count *= 2;
+ lp_count += 3; /* Add the 3 fixed fileds flags + ms-diff + seq-diff. */
+ lp = lpAppendInteger(lp,lp_count);
/* Insert back into the tree in order to update the listpack pointer. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
@@ -361,6 +376,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
p = lpNext(lp,p); /* Seek the first field. */
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p); /* Skip all master fields. */
+ p = lpNext(lp,p); /* Skip the zero master entry terminator. */
/* 'p' is now pointing to the first entry inside the listpack.
* We have to run entry after entry, marking entries as deleted
@@ -389,6 +405,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
}
while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
+ p = lpNext(lp,p); /* Skip the final lp-count field. */
}
/* Here we should perform garbage collection in case at this point
@@ -482,11 +499,17 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
/* Skip master fileds to seek the first entry. */
for (uint64_t i = 0; i < si->master_fields_count; i++)
si->lp_ele = lpNext(si->lp,si->lp_ele);
+ /* We are now pointing the zero term of the master entry. */
}
/* For every radix tree node, iterate the corresponding listpack,
* returning elements when they are within range. */
- while(si->lp_ele) {
+ while(1) {
+ /* Skip the previous entry lp-count field, or in case of the
+ * master entry, the zero term field. */
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ if (si->lp_ele == NULL) break;
+
/* Get the flags entry. */
int flags = lpGetInteger(si->lp_ele);
si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */