diff options
author | swamp0407 <swamp0407@gmail.com> | 2020-11-17 19:03:05 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-17 12:03:05 +0200 |
commit | ea7cf737a1ff65524994ce6731eb9320891aa946 (patch) | |
tree | f6e491ee1f41765e916597c1d7719717da20353e /src/t_stream.c | |
parent | 9812e88959a37ed4aa95bb4d3fb55c56dc49a768 (diff) | |
download | redis-ea7cf737a1ff65524994ce6731eb9320891aa946.tar.gz |
Add COPY command (#7953)
Syntax:
COPY <key> <new-key> [DB <dest-db>] [REPLACE]
No support for module keys yet.
Co-authored-by: tmgauss
Co-authored-by: Itamar Haber <itamar@redislabs.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index d6f6a3011..02e4c3242 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -106,6 +106,110 @@ void streamNextID(streamID *last_id, streamID *new_id) { } } +/* This is a helper function for the COPY command. + * Duplicate a Stream object, with the guarantee that the returned object + * has the same encoding as the original one. + * + * The resulting object always has refcount set to 1 */ +robj *streamDup(robj *o) { + robj *sobj; + + serverAssert(o->type == OBJ_STREAM); + + switch (o->encoding) { + case OBJ_ENCODING_STREAM: + sobj = createStreamObject(); + break; + default: + serverPanic("Wrong encoding."); + break; + } + + stream *s; + stream *new_s; + s = o->ptr; + new_s = sobj->ptr; + + raxIterator ri; + uint64_t rax_key[2]; + raxStart(&ri, s->rax); + raxSeek(&ri, "^", NULL, 0); + size_t lp_bytes = 0; /* Total bytes in the listpack. */ + unsigned char *lp = NULL; /* listpack pointer. */ + /* Get a reference to the listpack node. */ + while (raxNext(&ri)) { + lp = ri.data; + lp_bytes = lpBytes(lp); + unsigned char *new_lp = zmalloc(lp_bytes); + memcpy(new_lp, lp, lp_bytes); + memcpy(rax_key, ri.key, sizeof(rax_key)); + raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key), + new_lp, NULL); + } + new_s->length = s->length; + new_s->last_id = s->last_id; + raxStop(&ri); + + if (s->cgroups == NULL) return sobj; + + /* Consumer Groups */ + raxIterator ri_cgroups; + raxStart(&ri_cgroups, s->cgroups); + raxSeek(&ri_cgroups, "^", NULL, 0); + while (raxNext(&ri_cgroups)) { + streamCG *cg = ri_cgroups.data; + streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, + ri_cgroups.key_len, &cg->last_id); + + serverAssert(new_cg != NULL); + + /* Consumer Group PEL */ + raxIterator ri_cg_pel; + raxStart(&ri_cg_pel,cg->pel); + raxSeek(&ri_cg_pel,"^",NULL,0); + while(raxNext(&ri_cg_pel)){ + streamNACK *nack = ri_cg_pel.data; + streamNACK *new_nack = streamCreateNACK(NULL); + new_nack->delivery_time = nack->delivery_time; + new_nack->delivery_count = nack->delivery_count; + raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL); + } + raxStop(&ri_cg_pel); + + /* Consumers */ + raxIterator ri_consumers; + raxStart(&ri_consumers, cg->consumers); + raxSeek(&ri_consumers, "^", NULL, 0); + while (raxNext(&ri_consumers)) { + streamConsumer *consumer = ri_consumers.data; + streamConsumer *new_consumer; + new_consumer = zmalloc(sizeof(*new_consumer)); + new_consumer->name = sdsdup(consumer->name); + new_consumer->pel = raxNew(); + raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, + sdslen(new_consumer->name), new_consumer, NULL); + new_consumer->seen_time = consumer->seen_time; + + /* Consumer PEL */ + raxIterator ri_cpel; + raxStart(&ri_cpel, consumer->pel); + raxSeek(&ri_cpel, "^", NULL, 0); + while (raxNext(&ri_cpel)) { + streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID)); + + serverAssert(new_nack != raxNotFound); + + new_nack->consumer = new_consumer; + raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL); + } + raxStop(&ri_cpel); + } + raxStop(&ri_consumers); + } + raxStop(&ri_cgroups); + return sobj; +} + /* This is just a wrapper for lpAppend() to directly use a 64 bit integer * instead of a string. */ unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) { |