summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-11-17 13:24:20 +0100
committerantirez <antirez@gmail.com>2017-12-01 10:24:25 +0100
commitee3490ec481c7f1ef89fe685b03c2b5f171d335b (patch)
tree9b5121006eb613090b3f7e2b4f8e227679d5f1ee
parent3c5d773f82eede4497cb3695d2cd32eec3e10382 (diff)
downloadredis-ee3490ec481c7f1ef89fe685b03c2b5f171d335b.tar.gz
Streams: state machine for reverse iteration WIP 1.
-rw-r--r--src/aof.c2
-rw-r--r--src/blocked.c2
-rw-r--r--src/stream.h7
-rw-r--r--src/t_stream.c129
4 files changed, 98 insertions, 42 deletions
diff --git a/src/aof.c b/src/aof.c
index 5fbfdd695..79962fd0a 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1035,7 +1035,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
* The function returns 0 on error, 1 on success. */
int rewriteStreamObject(rio *r, robj *key, robj *o) {
streamIterator si;
- streamIteratorStart(&si,o->ptr,NULL,NULL);
+ streamIteratorStart(&si,o->ptr,NULL,NULL,0);
streamID id;
int64_t numfields;
diff --git a/src/blocked.c b/src/blocked.c
index 734e6ffd6..f438c3353 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -326,7 +326,7 @@ void handleClientsBlockedOnKeys(void) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL,
- receiver->bpop.xread_count);
+ receiver->bpop.xread_count,0);
}
}
}
diff --git a/src/stream.h b/src/stream.h
index df29e9e70..214b6d9a5 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -28,9 +28,10 @@ typedef struct stream {
typedef struct streamIterator {
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
- unsigned char *master_fields_start; /* Master entries start in listapck. */
+ unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
+ int rev; /* True if iterating end to start (reverse). */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
@@ -49,8 +50,8 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
-size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count);
-void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev);
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);
diff --git a/src/t_stream.c b/src/t_stream.c
index 14eba44c0..945fc28c0 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -426,7 +426,9 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
- * at the end.
+ * at the end. The 'rev' parameter controls the direction. If it's zero the
+ * iteration is from the start to the end element (inclusive), otherwise
+ * if rev is non-zero, the iteration is reversed.
*
* Once the iterator is initalized, we iterate like this:
*
@@ -443,7 +445,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* }
* }
* streamIteratorStop(&myiterator); */
-void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) {
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
/* Intialize the iterator and translates the iteration start/stop
* elements into a 128 big big-endian number. */
if (start) {
@@ -462,17 +464,26 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
/* Seek the correct node in the radix tree. */
raxStart(&si->ri,s->rax);
- if (start && (start->ms || start->seq)) {
- raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
- sizeof(si->start_key));
- if (raxEOF(&si->ri))
- raxSeek(&si->ri,">",(unsigned char*)si->start_key,
+ if (!rev) {
+ if (start && (start->ms || start->seq)) {
+ raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
sizeof(si->start_key));
+ if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
+ } else {
+ raxSeek(&si->ri,"^",NULL,0);
+ }
} else {
- raxSeek(&si->ri,"^",NULL,0);
+ if (end && (end->ms || end->seq)) {
+ raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
+ sizeof(si->end_key));
+ if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
+ } else {
+ raxSeek(&si->ri,"$",NULL,0);
+ }
}
si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */
+ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
}
/* Return 1 and store the current item ID at 'id' if there are still
@@ -484,7 +495,8 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
* iteration or the previous listpack was completely iterated.
* Go to the next node. */
if (si->lp == NULL || si->lp_ele == NULL) {
- if (!raxNext(&si->ri)) return 0;
+ if (!si->rev && !raxNext(&si->ri)) return 0;
+ else if (si->rev && !raxPrev(&si->ri)) return 0;
serverAssert(si->ri.key_len == sizeof(streamID));
/* Get the master ID. */
streamDecodeID(si->ri.key,&si->master_id);
@@ -499,16 +511,38 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
/* Skip master fileds to seek the first entry. */
for (uint64_t i = 0; i < si->master_fields_count; i++)
si->lp_ele = lpNext(si->lp,si->lp_ele);
- /* We are now pointing the zero term of the master entry. */
+ /* We are now pointing the zero term of the master entry. If
+ * we are iterating in reverse order, we need to seek the
+ * end of the listpack. */
+ if (si->rev) si->lp_ele = lpLast(si->lp);
+ } else if (si->rev) {
+ /* If we are itereating in the reverse order, and this is not
+ * the first entry emitted for this listpack, then we already
+ * emitted the current entry, and have to go back to the previous
+ * one. */
+ int lp_count = lpGetInteger(si->lp_ele);
+ while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ /* Seek lp-count of prev entry. */
+ si->lp_ele = lpPrev(si->lp,si->lp_ele);
}
/* For every radix tree node, iterate the corresponding listpack,
* returning elements when they are within range. */
while(1) {
- /* Skip the previous entry lp-count field, or in case of the
- * master entry, the zero term field. */
- si->lp_ele = lpNext(si->lp,si->lp_ele);
- if (si->lp_ele == NULL) break;
+ if (!si->rev) {
+ /* If we are going forward, skip the previous entry
+ * lp-count field (or in case of the master entry, the zero
+ * term field) */
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ if (si->lp_ele == NULL) break;
+ } else {
+ /* If we are going backward, read the number of elements this
+ * entry is composed of, and jump backward N times to seek
+ * its start. */
+ int lp_count = lpGetInteger(si->lp_ele);
+ if (lp_count == 0) break; /* We reached the master entry. */
+ while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
/* Get the flags entry. */
int flags = lpGetInteger(si->lp_ele);
@@ -535,15 +569,28 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
/* If current >= start, and the entry is not marked as
* deleted, emit it. */
- if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
- !(flags & STREAM_ITEM_FLAG_DELETED))
- {
- if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
- return 0; /* We are already out of range. */
- si->entry_flags = flags;
- if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
- si->master_fields_ptr = si->master_fields_start;
- return 1; /* Valid item returned. */
+ if (!si->rev) {
+ if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
+ !(flags & STREAM_ITEM_FLAG_DELETED))
+ {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
+ return 1; /* Valid item returned. */
+ }
+ } else {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
+ !(flags & STREAM_ITEM_FLAG_DELETED))
+ {
+ if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
+ return 1; /* Valid item returned. */
+ }
}
/* If we do not emit, we have to discard. */
@@ -553,7 +600,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
si->lp_ele = lpNext(si->lp,si->lp_ele);
}
- /* End of listpack reached. Try the next radix tree node. */
+ /* End of listpack reached. Try the next/prev radix tree node. */
}
}
@@ -585,15 +632,16 @@ void streamIteratorStop(streamIterator *si) {
/* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
- * we want all the elements from 'start' till the end of the stream. */
-size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
+ * we want all the elements from 'start' till the end of the stream. If 'rev'
+ * is non zero, elements are produced in reversed order from end to start. */
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) {
void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
streamID id;
- streamIteratorStart(&si,s,start,end);
+ streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
@@ -797,25 +845,32 @@ void xaddCommand(client *c) {
signalKeyAsReady(c->db, c->argv[1]);
}
-/* XRANGE key start end [COUNT <n>] */
+/* XRANGE key start end [COUNT <n>] [REV] */
void xrangeCommand(client *c) {
robj *o;
stream *s;
streamID startid, endid;
long long count = 0;
+ int rev = 0;
if (streamParseIDOrReply(c,c->argv[2],&startid,0) == C_ERR) return;
if (streamParseIDOrReply(c,c->argv[3],&endid,UINT64_MAX) == C_ERR) return;
/* Parse the COUNT option if any. */
- if (c->argc > 5) {
- if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) {
- if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK)
+ if (c->argc > 4) {
+ for (int j = 4; j < c->argc; j++) {
+ int additional = c->argc-j-1;
+ if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
+ != C_OK) return;
+ if (count < 0) count = 0;
+ j++; /* Consume additional arg. */
+ } else if (strcasecmp(c->argv[j]->ptr,"REV") == 0) {
+ rev = 1;
+ } else {
+ addReply(c,shared.syntaxerr);
return;
- if (count < 0) count = 0;
- } else {
- addReply(c,shared.syntaxerr);
- return;
+ }
}
}
@@ -823,7 +878,7 @@ void xrangeCommand(client *c) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
- streamReplyWithRange(c,s,&startid,&endid,count);
+ streamReplyWithRange(c,s,&startid,&endid,count,rev);
}
/* XLEN */
@@ -931,7 +986,7 @@ void xreadCommand(client *c) {
* of the stream and the data we extracted from it. */
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[i+streams_arg]);
- streamReplyWithRange(c,s,&start,NULL,count);
+ streamReplyWithRange(c,s,&start,NULL,count,0);
}
}