summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rax.c5
-rw-r--r--src/rax.h1
-rw-r--r--src/rdb.c22
-rw-r--r--src/stream.h1
-rw-r--r--src/t_stream.c2
5 files changed, 29 insertions, 2 deletions
diff --git a/src/rax.c b/src/rax.c
index b4f5ae05d..3ead27ed7 100644
--- a/src/rax.c
+++ b/src/rax.c
@@ -1655,6 +1655,11 @@ int raxEOF(raxIterator *it) {
return it->flags & RAX_ITER_EOF;
}
+/* Return the number of elements inside the radix tree. */
+uint64_t raxSize(rax *rax) {
+ return rax->numele;
+}
+
/* ----------------------------- Introspection ------------------------------ */
/* This function is mostly used for debugging and learning purposes.
diff --git a/src/rax.h b/src/rax.h
index f6985c373..e22b6e699 100644
--- a/src/rax.h
+++ b/src/rax.h
@@ -157,5 +157,6 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key
void raxStop(raxIterator *it);
int raxEOF(raxIterator *it);
void raxShow(rax *rax);
+uint64_t raxSize(rax *rax);
#endif
diff --git a/src/rdb.c b/src/rdb.c
index 19ba59ab8..c79bfa8d4 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -31,6 +31,7 @@
#include "lzf.h" /* LZF compression library */
#include "zipmap.h"
#include "endianconv.h"
+#include "stream.h"
#include <math.h>
#include <sys/types.h>
@@ -622,6 +623,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
+ case OBJ_STREAM:
+ return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
@@ -762,7 +765,26 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
} else {
serverPanic("Unknown hash encoding");
}
+ } else if (o->type == OBJ_STREAM) {
+ /* Store how many listpacks we have inside the radix tree. */
+ stream *s = o->ptr;
+ rax *rax = s->rax;
+ if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
+ nwritten += n;
+ /* Serialize all the listpacks inside the radix tree as they are,
+ * when loading back, we'll use the first entry of each listpack
+ * to insert it back into the radix tree. */
+ raxIterator ri;
+ raxStart(&ri,rax);
+ raxSeek(&ri,"^",NULL,0);
+ while (raxNext(&ri)) {
+ unsigned char *lp = ri.data;
+ size_t lp_bytes = lpBytes(lp);
+ if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
+ nwritten += n;
+ }
+ raxStop(&ri);
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;
diff --git a/src/stream.h b/src/stream.h
index 065c328eb..e78af5bc5 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -2,6 +2,7 @@
#define STREAM_H
#include "rax.h"
+#include "listpack.h"
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
diff --git a/src/t_stream.c b/src/t_stream.c
index dcf9fccee..9ca001d71 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -32,7 +32,6 @@
*/
#include "server.h"
-#include "listpack.h"
#include "endianconv.h"
#include "stream.h"
@@ -169,7 +168,6 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id)
s->length++;
s->last_id = id;
if (added_id) *added_id = id;
- raxShow(s->rax);
}
/* Send the specified range to the client 'c'. The range the client will