diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/stream.h | 5 | ||||
-rw-r--r-- | src/t_stream.c | 171 |
2 files changed, 142 insertions, 34 deletions
diff --git a/src/stream.h b/src/stream.h index e38009321..df29e9e70 100644 --- a/src/stream.h +++ b/src/stream.h @@ -26,6 +26,11 @@ typedef struct stream { * rewriting code that also needs to iterate the stream to emit the XADD * commands. */ typedef struct streamIterator { + streamID master_id; /* ID of the master entry at listpack head. */ + uint64_t master_fields_count; /* Master entries # of fields. */ + unsigned char *master_fields_start; /* Master entries start in listapck. */ + unsigned char *master_fields_ptr; /* Master field to emit next. */ + int entry_flags; /* Flags of entry we are emitting. */ uint64_t start_key[2]; /* Start key as 128 bit big endian. */ uint64_t end_key[2]; /* End key as 128 bit big endian. */ raxIterator ri; /* Rax iterator. */ diff --git a/src/t_stream.c b/src/t_stream.c index 760050085..5250c36b3 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -27,16 +27,19 @@ * POSSIBILITY OF SUCH DAMAGE. */ -/* TODO: - * - After loading a stream, populate the last ID. - */ - #include "server.h" #include "endianconv.h" #include "stream.h" #define STREAM_BYTES_PER_LISTPACK 4096 +/* Every stream item inside the listpack, has a flags field that is used to + * mark the entry as deleted, or having the same field as the "master" + * entry at the start of the listpack> */ +#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ +#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */ +#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ + /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. * ----------------------------------------------------------------------- */ @@ -95,6 +98,19 @@ int64_t lpGetInteger(unsigned char *ele) { return v; } +/* Debugging function to log the full content of a listpack. Useful + * for development and debugging. */ +void streamLogListpackContent(unsigned char *lp) { + unsigned char *p = lpFirst(lp); + while(p) { + unsigned char buf[LP_INTBUF_SIZE]; + int64_t v; + unsigned char *ele = lpGet(p,&v,buf); + serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele); + p = lpNext(lp,p); + } +} + /* Convert the specified stream entry ID as a 128 bit big endian number, so * that the IDs can be sorted lexicographically. */ void streamEncodeID(void *buf, streamID *id) { @@ -159,32 +175,82 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, * to do so we consider the ID as a single 128 bit number written in * big endian, so that the most significant bytes are the first ones. */ uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ - uint64_t entry_id[2]; /* Entry ID of the new item as 128 bit string. */ - streamEncodeID(entry_id,&id); + streamID master_id; /* ID of the master entry in the listpack. */ /* Create a new listpack and radix tree node if needed. */ + int flags = STREAM_ITEM_FLAG_NONE; if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) { + master_id = id; + streamEncodeID(rax_key,&id); + /* Create the listpack having the master entry ID and fields. */ lp = lpNew(); - rax_key[0] = entry_id[0]; - rax_key[1] = entry_id[1]; + lp = lpAppend(lp,(unsigned char*)rax_key,sizeof(rax_key)); + lp = lpAppendInteger(lp,numfields); + for (int i = 0; i < numfields; i++) { + sds field = argv[i*2]->ptr; + lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); + } raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); + /* The first entry we insert, has obviously the same fields of the + * master entry. */ + flags |= STREAM_ITEM_FLAG_SAMEFIELDS; } else { serverAssert(ri.key_len == sizeof(rax_key)); memcpy(rax_key,ri.key,sizeof(rax_key)); + + /* Read the master entry ID. */ + int64_t e_len; + unsigned char *lp_ele = lpFirst(lp); + unsigned char buf[LP_INTBUF_SIZE]; + unsigned char *e = lpGet(lp_ele,&e_len,buf); + serverAssert(e_len == sizeof(streamID)); + streamDecodeID(e,&master_id); + lp_ele = lpNext(lp,lp_ele); + + /* Check if the entry we are adding, have the same fields + * as the master entry. */ + int master_fields_count = lpGetInteger(lp_ele); + lp_ele = lpNext(lp,lp_ele); + if (numfields == master_fields_count) { + int i; + for (i = 0; i < master_fields_count; i++) { + sds field = argv[i*2]->ptr; + unsigned char *e = lpGet(lp_ele,&e_len,buf); + /* Stop if there is a mismatch. */ + if (sdslen(field) != (size_t)e_len || + memcmp(e,field,e_len) != 0) break; + lp_ele = lpNext(lp,lp_ele); + } + if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS; + } } /* Populate the listpack with the new entry. We use the following * encoding: * - * +--------+----------+-------+-------+-/-+-------+-------+ - * |entry-id|num-fields|field-1|value-1|...|field-N|value-N| - * +--------+----------+-------+-------+-/-+-------+-------+ + * +-----+--------+----------+-------+-------+-/-+-------+-------+ + * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N| + * +-----+--------+----------+-------+-------+-/-+-------+-------+ + * + * However if the SAMEFIELD flag is set, we have just to populate + * the entry with the values, so it becomes: + * + * +-----+--------+-------+-/-+-------+ + * |flags|entry-id|value-1|...|value-N| + * +-----+--------+-------+-/-+-------+ + * + * The entry-id field is actually two separated fields: the ms + * and seq difference compared to the master entry. */ - lp = lpAppend(lp,(unsigned char*)entry_id,sizeof(entry_id)); - lp = lpAppendInteger(lp,numfields); + lp = lpAppendInteger(lp,flags); + lp = lpAppendInteger(lp,id.ms - master_id.ms); + lp = lpAppendInteger(lp,id.seq - master_id.seq); + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) + lp = lpAppendInteger(lp,numfields); for (int i = 0; i < numfields; i++) { sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; - lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); + if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) + lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); } @@ -259,35 +325,67 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { if (!raxNext(&si->ri)) return 0; serverAssert(si->ri.key_len == sizeof(streamID)); si->lp = si->ri.data; - si->lp_ele = lpFirst(si->lp); - } - - /* For every radix tree node, iterate the corresponding listpack, - * returning elements when they are within range. */ - while(si->lp_ele) { + si->lp_ele = lpFirst(si->lp); /* Seek the master ID. */ + /* Get the master ID. */ int64_t e_len; unsigned char buf[LP_INTBUF_SIZE]; unsigned char *e = lpGet(si->lp_ele,&e_len,buf); serverAssert(e_len == sizeof(streamID)); + streamDecodeID(e,&si->master_id); + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek fields count. */ + /* Get the master fields count. */ + si->master_fields_count = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ + si->master_fields_start = si->lp_ele; + /* Skip master fileds to seek the first entry. */ + for (uint64_t i = 0; i < si->master_fields_count; i++) + si->lp_ele = lpNext(si->lp,si->lp_ele); + } - /* Go to next field: number of elements. */ + /* For every radix tree node, iterate the corresponding listpack, + * returning elements when they are within range. */ + while(si->lp_ele) { + /* Get the flags entry. */ + int flags = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ + + /* Get the ID: it is encoded as difference between the master + * ID and this entry ID. */ + *id = si->master_id; + id->ms += lpGetInteger(si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele); + id->seq += lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf,id); - /* If current >= start */ - if (memcmp(e,si->start_key,sizeof(streamID)) >= 0) { - if (memcmp(e,si->end_key,sizeof(streamID)) > 0) - return 0; /* We are already out of range. */ - streamDecodeID(e,id); + /* The number of entries is here or not depending on the + * flags. */ + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { + *numfields = si->master_fields_count; + } else { *numfields = lpGetInteger(si->lp_ele); si->lp_ele = lpNext(si->lp,si->lp_ele); + } + + /* If current >= start, and the entry is not marked as + * deleted, emit it. */ + if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && + !(flags & STREAM_ITEM_FLAG_DELETED)) + { + if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) + return 0; /* We are already out of range. */ + si->entry_flags = flags; + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) + si->master_fields_ptr = si->master_fields_start; return 1; /* Valid item returned. */ - } else { - /* If we do not emit, we have to discard. */ - int64_t numfields = lpGetInteger(si->lp_ele); - si->lp_ele = lpNext(si->lp,si->lp_ele); - for (int64_t i = 0; i < numfields*2; i++) - si->lp_ele = lpNext(si->lp,si->lp_ele); } + + /* If we do not emit, we have to discard. */ + int to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ? + *numfields : *numfields*2; + for (int64_t i = 0; i < to_discard; i++) + si->lp_ele = lpNext(si->lp,si->lp_ele); } /* End of listpack reached. Try the next radix tree node. */ @@ -301,8 +399,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { * lengths by reference, that are valid until the next iterator call, assuming * no one touches the stream meanwhile. */ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { - *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); - si->lp_ele = lpNext(si->lp,si->lp_ele); + if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) { + *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); + si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); + } else { + *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); + si->lp_ele = lpNext(si->lp,si->lp_ele); + } *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); si->lp_ele = lpNext(si->lp,si->lp_ele); } |