summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
authorswamp0407 <swamp0407@gmail.com>2020-11-17 19:03:05 +0900
committerGitHub <noreply@github.com>2020-11-17 12:03:05 +0200
commitea7cf737a1ff65524994ce6731eb9320891aa946 (patch)
treef6e491ee1f41765e916597c1d7719717da20353e /src/t_stream.c
parent9812e88959a37ed4aa95bb4d3fb55c56dc49a768 (diff)
downloadredis-ea7cf737a1ff65524994ce6731eb9320891aa946.tar.gz
Add COPY command (#7953)
Syntax: COPY <key> <new-key> [DB <dest-db>] [REPLACE] No support for module keys yet. Co-authored-by: tmgauss Co-authored-by: Itamar Haber <itamar@redislabs.com> Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c104
1 files changed, 104 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index d6f6a3011..02e4c3242 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -106,6 +106,110 @@ void streamNextID(streamID *last_id, streamID *new_id) {
}
}
+/* This is a helper function for the COPY command.
+ * Duplicate a Stream object, with the guarantee that the returned object
+ * has the same encoding as the original one.
+ *
+ * The resulting object always has refcount set to 1 */
+robj *streamDup(robj *o) {
+ robj *sobj;
+
+ serverAssert(o->type == OBJ_STREAM);
+
+ switch (o->encoding) {
+ case OBJ_ENCODING_STREAM:
+ sobj = createStreamObject();
+ break;
+ default:
+ serverPanic("Wrong encoding.");
+ break;
+ }
+
+ stream *s;
+ stream *new_s;
+ s = o->ptr;
+ new_s = sobj->ptr;
+
+ raxIterator ri;
+ uint64_t rax_key[2];
+ raxStart(&ri, s->rax);
+ raxSeek(&ri, "^", NULL, 0);
+ size_t lp_bytes = 0; /* Total bytes in the listpack. */
+ unsigned char *lp = NULL; /* listpack pointer. */
+ /* Get a reference to the listpack node. */
+ while (raxNext(&ri)) {
+ lp = ri.data;
+ lp_bytes = lpBytes(lp);
+ unsigned char *new_lp = zmalloc(lp_bytes);
+ memcpy(new_lp, lp, lp_bytes);
+ memcpy(rax_key, ri.key, sizeof(rax_key));
+ raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
+ new_lp, NULL);
+ }
+ new_s->length = s->length;
+ new_s->last_id = s->last_id;
+ raxStop(&ri);
+
+ if (s->cgroups == NULL) return sobj;
+
+ /* Consumer Groups */
+ raxIterator ri_cgroups;
+ raxStart(&ri_cgroups, s->cgroups);
+ raxSeek(&ri_cgroups, "^", NULL, 0);
+ while (raxNext(&ri_cgroups)) {
+ streamCG *cg = ri_cgroups.data;
+ streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
+ ri_cgroups.key_len, &cg->last_id);
+
+ serverAssert(new_cg != NULL);
+
+ /* Consumer Group PEL */
+ raxIterator ri_cg_pel;
+ raxStart(&ri_cg_pel,cg->pel);
+ raxSeek(&ri_cg_pel,"^",NULL,0);
+ while(raxNext(&ri_cg_pel)){
+ streamNACK *nack = ri_cg_pel.data;
+ streamNACK *new_nack = streamCreateNACK(NULL);
+ new_nack->delivery_time = nack->delivery_time;
+ new_nack->delivery_count = nack->delivery_count;
+ raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
+ }
+ raxStop(&ri_cg_pel);
+
+ /* Consumers */
+ raxIterator ri_consumers;
+ raxStart(&ri_consumers, cg->consumers);
+ raxSeek(&ri_consumers, "^", NULL, 0);
+ while (raxNext(&ri_consumers)) {
+ streamConsumer *consumer = ri_consumers.data;
+ streamConsumer *new_consumer;
+ new_consumer = zmalloc(sizeof(*new_consumer));
+ new_consumer->name = sdsdup(consumer->name);
+ new_consumer->pel = raxNew();
+ raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
+ sdslen(new_consumer->name), new_consumer, NULL);
+ new_consumer->seen_time = consumer->seen_time;
+
+ /* Consumer PEL */
+ raxIterator ri_cpel;
+ raxStart(&ri_cpel, consumer->pel);
+ raxSeek(&ri_cpel, "^", NULL, 0);
+ while (raxNext(&ri_cpel)) {
+ streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID));
+
+ serverAssert(new_nack != raxNotFound);
+
+ new_nack->consumer = new_consumer;
+ raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL);
+ }
+ raxStop(&ri_cpel);
+ }
+ raxStop(&ri_consumers);
+ }
+ raxStop(&ri_cgroups);
+ return sobj;
+}
+
/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
* instead of a string. */
unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {