summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2018-03-23 17:21:31 +0100
committerantirez <antirez@gmail.com>2018-03-23 17:21:31 +0100
commit1392c83fb829d8586af8cdd1ef778b211be40e4a (patch)
tree8b51d67f5db38f42447e542cc31320c1d8c7b65e /src
parent6c4cb1670a1f89ec3f38b67638cd76e161195358 (diff)
downloadredis-1392c83fb829d8586af8cdd1ef778b211be40e4a.tar.gz
CG: AOF rewriting implemented.
Diffstat (limited to 'src')
-rw-r--r--src/aof.c89
-rw-r--r--src/stream.h1
2 files changed, 86 insertions, 4 deletions
diff --git a/src/aof.c b/src/aof.c
index 4a7d749d0..8b735e241 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1074,14 +1074,52 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
return 1;
}
+/* Helper for rewriteStreamObject() that generates a bulk string into the
+ * AOF representing the ID 'id'. */
+int rioWriteBulkStreamID(rio *r,streamID *id) {
+ int retval;
+
+ sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
+ if ((retval = rioWriteBulkString(r,replyid,sdslen(replyid))) == 0) return 0;
+ sdsfree(replyid);
+ return retval;
+}
+
+/* Helper for rewriteStreamObject(): emit the XCLAIM needed in order to
+ * add the message described by 'nack' having the id 'rawid', into the pending
+ * list of the specified consumer. All this in the context of the specified
+ * key and group. */
+int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
+ /* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
+ RETRYCOUNT <count> JUSTID FORCE. */
+ streamID id;
+ streamDecodeID(rawid,&id);
+ if (rioWriteBulkCount(r,'*',12) == 0) return 0;
+ if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
+ if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
+ if (rioWriteBulkString(r,"0",1) == 0) return 0;
+ if (rioWriteBulkStreamID(r,&id) == 0) return 0;
+ if (rioWriteBulkString(r,"TIME",4) == 0) return 0;
+ if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0;
+ if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0;
+ if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0;
+ if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0;
+ if (rioWriteBulkString(r,"FORCE",5) == 0) return 0;
+ return 1;
+}
+
/* Emit the commands needed to rebuild a stream object.
* The function returns 0 on error, 1 on success. */
int rewriteStreamObject(rio *r, robj *key, robj *o) {
+ stream *s = o->ptr;
streamIterator si;
- streamIteratorStart(&si,o->ptr,NULL,NULL,0);
+ streamIteratorStart(&si,s,NULL,NULL,0);
streamID id;
int64_t numfields;
+ /* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
@@ -1090,9 +1128,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
- sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
- if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0;
- sdsfree(replyid);
+ if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
@@ -1101,6 +1137,51 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
}
}
+
+ /* Create all the stream consumer groups. */
+ if (s->cgroups) {
+ raxIterator ri;
+ raxStart(&ri,s->cgroups);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ streamCG *group = ri.data;
+ /* Emit the XGROUP CREATE in order to create the group. */
+ if (rioWriteBulkCount(r,'*',5) == 0) return 0;
+ if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
+ if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
+ if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;
+
+ /* Generate XCLAIMs for each consumer that happens to
+ * have pending entries. Empty consumers have no semantical
+ * value so they are discarded. */
+ raxIterator ri_cons;
+ raxStart(&ri_cons,group->consumers);
+ raxSeek(&ri_cons,"^",NULL,0);
+ while(raxNext(&ri_cons)) {
+ streamConsumer *consumer = ri_cons.data;
+ /* For the current consumer, iterate all the PEL entries
+ * to emit the XCLAIM protocol. */
+ raxIterator ri_pel;
+ raxStart(&ri_pel,consumer->pel);
+ raxSeek(&ri_pel,"^",NULL,0);
+ while(raxNext(&ri_pel)) {
+ streamNACK *nack = ri_pel.data;
+ if (rioWriteStreamPendingEntry(r,key,(char*)ri.key,
+ ri.key_len,consumer,
+ ri_pel.key,nack) == 0)
+ {
+ return 0;
+ }
+ }
+ raxStop(&ri_pel);
+ }
+ raxStop(&ri_cons);
+ }
+ raxStop(&ri);
+ }
+
streamIteratorStop(&si);
return 1;
}
diff --git a/src/stream.h b/src/stream.h
index 7cc44ae76..8a019e93c 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -105,5 +105,6 @@ streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
+void streamDecodeID(void *buf, streamID *id);
#endif