summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorItamar Haber <itamar@redislabs.com>2017-12-05 18:14:59 +0200
committerItamar Haber <itamar@redislabs.com>2017-12-05 18:14:59 +0200
commit8b51121998bd5b0f3f4992548ad5f4a929c2a9d7 (patch)
tree4b48521968561f0f3e02a1da5923bbb1f8702567
parent51eb6cb39513188001bd24e693868451ae267340 (diff)
parent62a4b817c6e83eedf96a451f45dd943099258fd0 (diff)
downloadredis-8b51121998bd5b0f3f4992548ad5f4a929c2a9d7.tar.gz
Merge remote-tracking branch 'upstream/unstable' into help_subcommands
-rw-r--r--README.md2
-rw-r--r--redis.conf12
-rw-r--r--src/Makefile2
-rw-r--r--src/adlist.c2
-rw-r--r--src/anet.c2
-rw-r--r--src/aof.c33
-rw-r--r--src/blocked.c292
-rw-r--r--src/cluster.c2
-rw-r--r--src/config.c8
-rw-r--r--src/db.c55
-rw-r--r--src/debug.c36
-rw-r--r--src/defrag.c2
-rw-r--r--src/dict.c22
-rw-r--r--src/dict.h4
-rw-r--r--src/evict.c27
-rw-r--r--src/listpack.c783
-rw-r--r--src/listpack.h61
-rw-r--r--src/listpack_malloc.h45
-rw-r--r--src/lzfP.h6
-rw-r--r--src/networking.c23
-rw-r--r--src/notify.c2
-rw-r--r--src/object.c63
-rw-r--r--src/quicklist.c2
-rw-r--r--src/quicklist.h6
-rw-r--r--src/rax.c47
-rw-r--r--src/rax.h3
-rw-r--r--src/rdb.c95
-rw-r--r--src/rdb.h8
-rw-r--r--src/redis-check-rdb.c9
-rw-r--r--src/redis-cli.c146
-rw-r--r--src/replication.c2
-rw-r--r--src/scripting.c90
-rw-r--r--src/server.c54
-rw-r--r--src/server.h55
-rw-r--r--src/setproctitle.c6
-rw-r--r--src/stream.h59
-rw-r--r--src/t_hash.c4
-rw-r--r--src/t_list.c208
-rw-r--r--src/t_set.c14
-rw-r--r--src/t_stream.c1053
-rw-r--r--tests/instances.tcl2
-rw-r--r--tests/integration/psync2.tcl65
-rw-r--r--tests/integration/replication-3.tcl23
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/type/stream.tcl256
45 files changed, 3316 insertions, 376 deletions
diff --git a/README.md b/README.md
index 70a15790f..42ab47853 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-This README is just a fast *quick start* document. You can find more detailed documentation at http://redis.io.
+This README is just a fast *quick start* document. You can find more detailed documentation at [redis.io](https://redis.io).
What is Redis?
--------------
diff --git a/redis.conf b/redis.conf
index c54dba392..7eb692a8d 100644
--- a/redis.conf
+++ b/redis.conf
@@ -59,7 +59,7 @@
# internet, binding to all the interfaces is dangerous and will expose the
# instance to everybody on the internet. So by default we uncomment the
# following bind directive, that will force Redis to listen only into
-# the IPv4 lookback interface address (this means Redis will be able to
+# the IPv4 loopback interface address (this means Redis will be able to
# accept connections only from clients running into the same computer it
# is running).
#
@@ -296,7 +296,9 @@ dir ./
#
# 2) if slave-serve-stale-data is set to 'no' the slave will reply with
# an error "SYNC with master in progress" to all the kind of commands
-# but to INFO and SLAVEOF.
+# but to INFO, SLAVEOF, AUTH, PING, SHUTDOWN, REPLCONF, ROLE, CONFIG,
+# SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB,
+# COMMAND, POST, HOST: and LATENCY.
#
slave-serve-stale-data yes
@@ -606,7 +608,7 @@ slave-priority 100
# deletion of the object. It means that the server stops processing new commands
# in order to reclaim all the memory associated with an object in a synchronous
# way. If the key deleted is associated with a small object, the time needed
-# in order to execute th DEL command is very small and comparable to most other
+# in order to execute the DEL command is very small and comparable to most other
# O(1) or O(log_N) commands in Redis. However if the key is associated with an
# aggregated value containing millions of elements, the server can block for
# a long time (even seconds) in order to complete the operation.
@@ -621,7 +623,7 @@ slave-priority 100
# It's up to the design of the application to understand when it is a good
# idea to use one or the other. However the Redis server sometimes has to
# delete keys or flush the whole database as a side effect of other operations.
-# Specifically Redis deletes objects independently of an user call in the
+# Specifically Redis deletes objects independently of a user call in the
# following scenarios:
#
# 1) On eviction, because of the maxmemory and maxmemory policy configurations,
@@ -914,7 +916,7 @@ lua-time-limit 5000
# Docker and other containers).
#
# In order to make Redis Cluster working in such environments, a static
-# configuration where each node known its public address is needed. The
+# configuration where each node knows its public address is needed. The
# following two options are used for this scope, and are:
#
# * cluster-announce-ip
diff --git a/src/Makefile b/src/Makefile
index 86e0b3fe0..b896b1263 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -144,7 +144,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.c
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/adlist.c b/src/adlist.c
index e87d25cee..ec5f8bbf4 100644
--- a/src/adlist.c
+++ b/src/adlist.c
@@ -353,7 +353,7 @@ void listJoin(list *l, list *o) {
else
l->head = o->head;
- l->tail = o->tail;
+ if (o->tail) l->tail = o->tail;
l->len += o->len;
/* Setup other as an empty list. */
diff --git a/src/anet.c b/src/anet.c
index 53a56b0d2..e9530398d 100644
--- a/src/anet.c
+++ b/src/anet.c
@@ -237,7 +237,7 @@ int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len) {
static int anetSetReuseAddr(char *err, int fd) {
int yes = 1;
- /* Make sure connection-intensive things like the redis benckmark
+ /* Make sure connection-intensive things like the redis benchmark
* will be able to close/open sockets a zillion of times */
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));
diff --git a/src/aof.c b/src/aof.c
index 0593b2707..79962fd0a 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1031,6 +1031,37 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
return 1;
}
+/* Emit the commands needed to rebuild a stream object.
+ * The function returns 0 on error, 1 on success. */
+int rewriteStreamObject(rio *r, robj *key, robj *o) {
+ streamIterator si;
+ streamIteratorStart(&si,o->ptr,NULL,NULL,0);
+ streamID id;
+ int64_t numfields;
+
+ while(streamIteratorGetID(&si,&id,&numfields)) {
+ /* Emit a two elements array for each item. The first is
+ * the ID, the second is an array of field-value pairs. */
+
+ /* Emit the XADD <key> <id> ...fields... command. */
+ if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
+ if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
+ if (rioWriteBulkObject(r,key) == 0) return 0;
+ sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
+ if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0;
+ sdsfree(replyid);
+ while(numfields--) {
+ unsigned char *field, *value;
+ int64_t field_len, value_len;
+ streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
+ if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
+ if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
+ }
+ }
+ streamIteratorStop(&si);
+ return 1;
+}
+
/* Call the module type callback in order to rewrite a data type
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
@@ -1111,6 +1142,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
+ } else if (o->type == OBJ_STREAM) {
+ if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else {
diff --git a/src/blocked.c b/src/blocked.c
index 54b26b713..f438c3353 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -65,6 +65,8 @@
#include "server.h"
+int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
+
/* Get a timeout value from an object and store it into 'timeout'.
* The final timeout is always stored as milliseconds as a time where the
* timeout will expire, however the parsing is performed according to
@@ -100,7 +102,8 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
- server.bpop_blocked_clients++;
+ server.blocked_clients++;
+ server.blocked_clients_by_type[btype]++;
}
/* This function is called in the beforeSleep() function of the event loop
@@ -132,7 +135,7 @@ void processUnblockedClients(void) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
- if (c->btype == BLOCKED_LIST) {
+ if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
@@ -143,9 +146,10 @@ void unblockClient(client *c) {
}
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
+ server.blocked_clients--;
+ server.blocked_clients_by_type[c->btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
- server.bpop_blocked_clients--;
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
@@ -158,7 +162,7 @@ void unblockClient(client *c) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
- if (c->btype == BLOCKED_LIST) {
+ if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
@@ -193,3 +197,283 @@ void disconnectAllBlockedClients(void) {
}
}
}
+
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH/XADD operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnKeys(void) {
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalKeyAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BRPOPLPUSH. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalKeyAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ /* Serve clients blocked on list key. */
+ robj *o = lookupKeyWrite(rl->db,rl->key);
+ if (o != NULL && o->type == OBJ_LIST) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ if (receiver->btype != BLOCKED_LIST) {
+ /* Put on the tail, so that at the next call
+ * we'll not run into it again. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+ continue;
+ }
+
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ LIST_HEAD : LIST_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClient()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClient(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == C_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) {
+ dbDelete(rl->db,rl->key);
+ }
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+ }
+
+ /* Serve clients blocked on stream key. */
+ else if (o != NULL && o->type == OBJ_STREAM) {
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ stream *s = o->ptr;
+
+ /* We need to provide the new data arrived on the stream
+ * to all the clients that are waiting for an offset smaller
+ * than the current top item. */
+ if (de) {
+ list *clients = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_STREAM) continue;
+ streamID *gt = dictFetchValue(receiver->bpop.keys,
+ rl->key);
+ if (s->last_id.ms > gt->ms ||
+ (s->last_id.ms == gt->ms &&
+ s->last_id.seq > gt->seq))
+ {
+ streamID start = *gt;
+ start.seq++; /* Can't overflow, it's an uint64_t */
+ /* Note that after we unblock the client, 'gt'
+ * is no longer valid, so we must do it after
+ * we copied the ID into the 'start' variable. */
+ unblockClient(receiver);
+
+ /* Emit the two elements sub-array consisting of
+ * the name of the stream and the data we
+ * extracted from it. Wrapped in a single-item
+ * array, since we have just one key. */
+ addReplyMultiBulkLen(receiver,1);
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,rl->key);
+ streamReplyWithRange(receiver,s,&start,NULL,
+ receiver->bpop.xread_count,0);
+ }
+ }
+ }
+ }
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
+}
+
+/* This is how the current blocking lists/streams work, we use BLPOP as
+ * example, but the concept is the same for other list ops and XREAD.
+ * - If the user calls BLPOP and the key exists and contains a non empty list
+ * then LPOP is called instead. So BLPOP is semantically the same as LPOP
+ * if blocking is not required.
+ * - If instead BLPOP is called and the key does not exists or the list is
+ * empty we need to block. In order to do so we remove the notification for
+ * new data to read in the client socket (so that we'll not serve new
+ * requests if the blocking request is not served). Also we put the client
+ * in a dictionary (db->blocking_keys) mapping keys to a list of clients
+ * blocking for this keys.
+ * - If a PUSH operation against a key with blocked clients waiting is
+ * performed, we mark this key as "ready", and after the current command,
+ * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ * for this list, from the one that blocked first, to the last, accordingly
+ * to the number of elements we have in the ready list.
+ */
+
+/* Set a client in blocking mode for the specified key (list or stream), with
+ * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
+ * depending on the kind of operation we are waiting for an empty key in
+ * order to awake the client. The client is blocked for all the 'numkeys'
+ * keys as in the 'keys' argument. When we block for stream keys, we also
+ * provide an array of streamID structures: clients will be unblocked only
+ * when items with an ID greater or equal to the specified one is appended
+ * to the stream. */
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
+ dictEntry *de;
+ list *l;
+ int j;
+
+ c->bpop.timeout = timeout;
+ c->bpop.target = target;
+
+ if (target != NULL) incrRefCount(target);
+
+ for (j = 0; j < numkeys; j++) {
+ /* The value associated with the key name in the bpop.keys dictionary
+ * is NULL for lists, or the stream ID for streams. */
+ void *key_data = NULL;
+ if (btype == BLOCKED_STREAM) {
+ key_data = zmalloc(sizeof(streamID));
+ memcpy(key_data,ids+j,sizeof(streamID));
+ }
+
+ /* If the key already exists in the dictionary ignore it. */
+ if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
+ zfree(key_data);
+ continue;
+ }
+ incrRefCount(keys[j]);
+
+ /* And in the other "side", to map keys -> clients */
+ de = dictFind(c->db->blocking_keys,keys[j]);
+ if (de == NULL) {
+ int retval;
+
+ /* For every key we take a list of clients blocked for it */
+ l = listCreate();
+ retval = dictAdd(c->db->blocking_keys,keys[j],l);
+ incrRefCount(keys[j]);
+ serverAssertWithInfo(c,keys[j],retval == DICT_OK);
+ } else {
+ l = dictGetVal(de);
+ }
+ listAddNodeTail(l,c);
+ }
+ blockClient(c,btype);
+}
+
+/* Unblock a client that's waiting in a blocking operation such as BLPOP.
+ * You should never call this function directly, but unblockClient() instead. */
+void unblockClientWaitingData(client *c) {
+ dictEntry *de;
+ dictIterator *di;
+ list *l;
+
+ serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
+ di = dictGetIterator(c->bpop.keys);
+ /* The client may wait for multiple keys, so unblock it for every key. */
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+
+ /* Remove this client from the list of clients waiting for this key. */
+ l = dictFetchValue(c->db->blocking_keys,key);
+ serverAssertWithInfo(c,key,l != NULL);
+ listDelNode(l,listSearchKey(l,c));
+ /* If the list is empty we need to remove it to avoid wasting memory */
+ if (listLength(l) == 0)
+ dictDelete(c->db->blocking_keys,key);
+ }
+ dictReleaseIterator(di);
+
+ /* Cleanup the client structure */
+ dictEmpty(c->bpop.keys,NULL);
+ if (c->bpop.target) {
+ decrRefCount(c->bpop.target);
+ c->bpop.target = NULL;
+ }
+ if (c->bpop.xread_group) {
+ decrRefCount(c->bpop.xread_group);
+ c->bpop.xread_group = NULL;
+ }
+}
+
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is a hash table that allows us to avoid putting
+ * the same key again and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
+ *
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalKeyAsReady(redisDb *db, robj *key) {
+ readyList *rl;
+
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(db->blocking_keys,key) == NULL) return;
+
+ /* Key was already signaled? No need to queue it again. */
+ if (dictFind(db->ready_keys,key) != NULL) return;
+
+ /* Ok, we need to queue this key into server.ready_keys. */
+ rl = zmalloc(sizeof(*rl));
+ rl->key = key;
+ rl->db = db;
+ incrRefCount(key);
+ listAddNodeTail(server.ready_keys,rl);
+
+ /* We also add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
+}
+
+
diff --git a/src/cluster.c b/src/cluster.c
index 2c9866d0f..4e695822b 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -653,7 +653,7 @@ unsigned int keyHashSlot(char *key, int keylen) {
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
- /* No '}' or nothing betweeen {} ? Hash the whole key. */
+ /* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
diff --git a/src/config.c b/src/config.c
index 4089ef063..0324c4b7c 100644
--- a/src/config.c
+++ b/src/config.c
@@ -330,13 +330,13 @@ void loadServerConfigFromString(char *config) {
}
} else if (!strcasecmp(argv[0],"lfu-log-factor") && argc == 2) {
server.lfu_log_factor = atoi(argv[1]);
- if (server.maxmemory_samples < 0) {
+ if (server.lfu_log_factor < 0) {
err = "lfu-log-factor must be 0 or greater";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"lfu-decay-time") && argc == 2) {
server.lfu_decay_time = atoi(argv[1]);
- if (server.maxmemory_samples < 1) {
+ if (server.lfu_decay_time < 0) {
err = "lfu-decay-time must be 0 or greater";
goto loaderr;
}
@@ -1221,6 +1221,8 @@ void configGetCommand(client *c) {
/* Numerical values */
config_get_numerical_field("maxmemory",server.maxmemory);
config_get_numerical_field("maxmemory-samples",server.maxmemory_samples);
+ config_get_numerical_field("lfu-log-factor",server.lfu_log_factor);
+ config_get_numerical_field("lfu-decay-time",server.lfu_decay_time);
config_get_numerical_field("timeout",server.maxidletime);
config_get_numerical_field("active-defrag-threshold-lower",server.active_defrag_threshold_lower);
config_get_numerical_field("active-defrag-threshold-upper",server.active_defrag_threshold_upper);
@@ -1992,6 +1994,8 @@ int rewriteConfig(char *path) {
rewriteConfigBytesOption(state,"maxmemory",server.maxmemory,CONFIG_DEFAULT_MAXMEMORY);
rewriteConfigEnumOption(state,"maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum,CONFIG_DEFAULT_MAXMEMORY_POLICY);
rewriteConfigNumericalOption(state,"maxmemory-samples",server.maxmemory_samples,CONFIG_DEFAULT_MAXMEMORY_SAMPLES);
+ rewriteConfigNumericalOption(state,"lfu-log-factor",server.lfu_log_factor,CONFIG_DEFAULT_LFU_LOG_FACTOR);
+ rewriteConfigNumericalOption(state,"lfu-decay-time",server.lfu_decay_time,CONFIG_DEFAULT_LFU_DECAY_TIME);
rewriteConfigNumericalOption(state,"active-defrag-threshold-lower",server.active_defrag_threshold_lower,CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER);
rewriteConfigNumericalOption(state,"active-defrag-threshold-upper",server.active_defrag_threshold_upper,CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER);
rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES);
diff --git a/src/db.c b/src/db.c
index 71c642d00..0ded55586 100644
--- a/src/db.c
+++ b/src/db.c
@@ -38,6 +38,15 @@
* C-level DB API
*----------------------------------------------------------------------------*/
+/* Update LFU when an object is accessed.
+ * Firstly, decrement the counter if the decrement time is reached.
+ * Then logarithmically increment the counter, and update the access time. */
+void updateLFU(robj *val) {
+ unsigned long counter = LFUDecrAndReturn(val);
+ counter = LFULogIncr(counter);
+ val->lru = (LFUGetTimeInMinutes()<<8) | counter;
+}
+
/* Low level key lookup API, not actually called directly from commands
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
@@ -54,9 +63,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
!(flags & LOOKUP_NOTOUCH))
{
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
- unsigned long ldt = val->lru >> 8;
- unsigned long counter = LFULogIncr(val->lru & 255);
- val->lru = (ldt << 8) | counter;
+ updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
@@ -162,9 +169,9 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
- if (val->type == OBJ_LIST) signalListAsReady(db, key);
+ if (val->type == OBJ_LIST) signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
- }
+}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
@@ -180,6 +187,9 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
int saved_lru = old->lru;
dictReplace(db->dict, key->ptr, val);
val->lru = saved_lru;
+ /* LFU should be not only copied but also updated
+ * when a key is overwritten. */
+ updateLFU(val);
} else {
dictReplace(db->dict, key->ptr, val);
}
@@ -788,6 +798,7 @@ void typeCommand(client *c) {
case OBJ_SET: type = "set"; break;
case OBJ_ZSET: type = "zset"; break;
case OBJ_HASH: type = "hash"; break;
+ case OBJ_STREAM: type = "stream"; break;
case OBJ_MODULE: {
moduleValue *mv = o->ptr;
type = mv->type->name;
@@ -941,8 +952,8 @@ void scanDatabaseForReadyLists(redisDb *db) {
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
- if (value && value->type == OBJ_LIST)
- signalListAsReady(db, key);
+ if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM))
+ signalKeyAsReady(db, key);
}
dictReleaseIterator(di);
}
@@ -1352,6 +1363,36 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk
return keys;
}
+/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
+ * [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
+ * ID_1 ID_2 ... ID_N */
+int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int i, num, *keys;
+ UNUSED(cmd);
+
+ /* We need to seek the last argument that contains "STREAMS", because other
+ * arguments before may contain it (for example the group name). */
+ int streams_pos = -1;
+ for (i = 1; i < argc; i++) {
+ char *arg = argv[i]->ptr;
+ if (!strcasecmp(arg, "streams")) streams_pos = i;
+ }
+ if (streams_pos != -1) num = argc - streams_pos - 1;
+
+ /* Syntax error. */
+ if (streams_pos == -1 || num % 2 != 0) {
+ *numkeys = 0;
+ return NULL;
+ }
+ num /= 2; /* We have half the keys as there are arguments because
+ there are also the IDs, one per key. */
+
+ keys = zmalloc(sizeof(int) * num);
+ for (i = streams_pos+1; i < argc; i++) keys[i-streams_pos-1] = i;
+ *numkeys = num;
+ return keys;
+}
+
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster and in other conditions when we need to
diff --git a/src/debug.c b/src/debug.c
index 9236d806c..c6aea6cb6 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -239,6 +239,27 @@ void computeDatasetDigest(unsigned char *final) {
xorDigest(digest,eledigest,20);
}
hashTypeReleaseIterator(hi);
+ } else if (o->type == OBJ_STREAM) {
+ streamIterator si;
+ streamIteratorStart(&si,o->ptr,NULL,NULL,0);
+ streamID id;
+ int64_t numfields;
+
+ while(streamIteratorGetID(&si,&id,&numfields)) {
+ sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
+ mixDigest(digest,itemid,sdslen(itemid));
+ sdsfree(itemid);
+
+ while(numfields--) {
+ unsigned char *field, *value;
+ int64_t field_len, value_len;
+ streamIteratorGetField(&si,&field,&value,
+ &field_len,&value_len);
+ mixDigest(digest,field,field_len);
+ mixDigest(digest,value,value_len);
+ }
+ }
+ streamIteratorStop(&si);
} else if (o->type == OBJ_MODULE) {
RedisModuleDigest md;
moduleValue *mv = o->ptr;
@@ -262,14 +283,10 @@ void computeDatasetDigest(unsigned char *final) {
}
void debugCommand(client *c) {
- if (c->argc == 1) {
- addReplyError(c,"You must specify a subcommand for DEBUG. Try DEBUG HELP for info.");
- return;
- }
-
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
"assert -- Crash by assertion failed.",
+ "change-repl-id -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.",
"crash-and-recovery <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
"digest -- Outputs an hex signature representing the current DB content.",
"htstats <dbid> -- Return hash table statistics of the specified Redis database.",
@@ -351,13 +368,13 @@ void debugCommand(client *c) {
val = dictGetVal(de);
strenc = strEncoding(val->encoding);
- char extra[128] = {0};
+ char extra[138] = {0};
if (val->encoding == OBJ_ENCODING_QUICKLIST) {
char *nextra = extra;
int remaining = sizeof(extra);
quicklist *ql = val->ptr;
/* Add number of quicklist nodes */
- int used = snprintf(nextra, remaining, " ql_nodes:%u", ql->len);
+ int used = snprintf(nextra, remaining, " ql_nodes:%lu", ql->len);
nextra += used;
remaining -= used;
/* Add average quicklist fill factor */
@@ -530,6 +547,11 @@ void debugCommand(client *c) {
stats = sdscat(stats,buf);
addReplyBulkSds(c,stats);
+ } else if (!strcasecmp(c->argv[1]->ptr,"change-repl-id") && c->argc == 2) {
+ serverLog(LL_WARNING,"Changing replication IDs after receiving DEBUG change-repl-id");
+ changeReplicationId();
+ clearReplicationId2();
+ addReply(c,shared.ok);
} else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try DEBUG help",
(char*)c->argv[1]->ptr);
diff --git a/src/defrag.c b/src/defrag.c
index 4a1dcefe4..3f0e6627c 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -289,7 +289,7 @@ int defragKey(redisDb *db, dictEntry *de) {
/* Dirty code:
* I can't search in db->expires for that key after i already released
* the pointer it holds it won't be able to do the string compare */
- unsigned int hash = dictGetHash(db->dict, de->key);
+ uint64_t hash = dictGetHash(db->dict, de->key);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
}
diff --git a/src/dict.c b/src/dict.c
index 210d50dcd..97e636805 100644
--- a/src/dict.c
+++ b/src/dict.c
@@ -66,7 +66,7 @@ static unsigned int dict_force_resize_ratio = 5;
static int _dictExpandIfNeeded(dict *ht);
static unsigned long _dictNextPower(unsigned long size);
-static int _dictKeyIndex(dict *ht, const void *key, unsigned int hash, dictEntry **existing);
+static long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing);
static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
/* -------------------------- hash functions -------------------------------- */
@@ -202,7 +202,7 @@ int dictRehash(dict *d, int n) {
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
- unsigned int h;
+ uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
@@ -291,7 +291,7 @@ int dictAdd(dict *d, void *key, void *val)
*/
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
- int index;
+ long index;
dictEntry *entry;
dictht *ht;
@@ -362,7 +362,7 @@ dictEntry *dictAddOrFind(dict *d, void *key) {
* dictDelete() and dictUnlink(), please check the top comment
* of those functions. */
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
- unsigned int h, idx;
+ uint64_t h, idx;
dictEntry *he, *prevHe;
int table;
@@ -476,7 +476,7 @@ void dictRelease(dict *d)
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
- unsigned int h, idx, table;
+ uint64_t h, idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
@@ -610,7 +610,7 @@ void dictReleaseIterator(dictIterator *iter)
dictEntry *dictGetRandomKey(dict *d)
{
dictEntry *he, *orighe;
- unsigned int h;
+ unsigned long h;
int listlen, listele;
if (dictSize(d) == 0) return NULL;
@@ -955,9 +955,9 @@ static unsigned long _dictNextPower(unsigned long size)
*
* Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */
-static int _dictKeyIndex(dict *d, const void *key, unsigned int hash, dictEntry **existing)
+static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
- unsigned int idx, table;
+ unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
@@ -995,7 +995,7 @@ void dictDisableResize(void) {
dict_can_resize = 0;
}
-unsigned int dictGetHash(dict *d, const void *key) {
+uint64_t dictGetHash(dict *d, const void *key) {
return dictHashKey(d, key);
}
@@ -1004,9 +1004,9 @@ unsigned int dictGetHash(dict *d, const void *key) {
* the hash value should be provided using dictGetHash.
* no string / key comparison is performed.
* return value is the reference to the dictEntry if found, or NULL if not found. */
-dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, unsigned int hash) {
+dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
dictEntry *he, **heref;
- unsigned int idx, table;
+ unsigned long idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
for (table = 0; table <= 1; table++) {
diff --git a/src/dict.h b/src/dict.h
index bf316a00f..62018cc44 100644
--- a/src/dict.h
+++ b/src/dict.h
@@ -178,8 +178,8 @@ int dictRehashMilliseconds(dict *d, int ms);
void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
-unsigned int dictGetHash(dict *d, const void *key);
-dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, unsigned int hash);
+uint64_t dictGetHash(dict *d, const void *key);
+dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
/* Hash table types */
extern dictType dictTypeHeapStringCopyKey;
diff --git a/src/evict.c b/src/evict.c
index 5ce5ca07f..bf485ddc5 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -60,8 +60,6 @@ struct evictionPoolEntry {
static struct evictionPoolEntry *EvictionPoolLRU;
-unsigned long LFUDecrAndReturn(robj *o);
-
/* ----------------------------------------------------------------------------
* Implementation of eviction, aging and LRU
* --------------------------------------------------------------------------*/
@@ -302,8 +300,8 @@ unsigned long LFUGetTimeInMinutes(void) {
return (server.unixtime/60) & 65535;
}
-/* Given an object last decrement time, compute the minimum number of minutes
- * that elapsed since the last decrement. Handle overflow (ldt greater than
+/* Given an object last access time, compute the minimum number of minutes
+ * that elapsed since the last access. Handle overflow (ldt greater than
* the current 16 bits minutes time) considering the time as wrapping
* exactly once. */
unsigned long LFUTimeElapsed(unsigned long ldt) {
@@ -324,25 +322,22 @@ uint8_t LFULogIncr(uint8_t counter) {
return counter;
}
-/* If the object decrement time is reached, decrement the LFU counter and
- * update the decrement time field. Return the object frequency counter.
+/* If the object decrement time is reached decrement the LFU counter but
+ * do not update LFU fields of the object, we update the access time
+ * and counter in an explicit way when the object is really accessed.
+ * And we will times halve the counter according to the times of
+ * elapsed time than server.lfu_decay_time.
+ * Return the object frequency counter.
*
* This function is used in order to scan the dataset for the best object
* to fit: as we check for the candidate, we incrementally decrement the
* counter of the scanned objects if needed. */
-#define LFU_DECR_INTERVAL 1
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
- if (LFUTimeElapsed(ldt) >= server.lfu_decay_time && counter) {
- if (counter > LFU_INIT_VAL*2) {
- counter /= 2;
- if (counter < LFU_INIT_VAL*2) counter = LFU_INIT_VAL*2;
- } else {
- counter--;
- }
- o->lru = (LFUGetTimeInMinutes()<<8) | counter;
- }
+ unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
+ if (num_periods)
+ counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
diff --git a/src/listpack.c b/src/listpack.c
new file mode 100644
index 000000000..6db4086e9
--- /dev/null
+++ b/src/listpack.c
@@ -0,0 +1,783 @@
+/* Listpack -- A lists of strings serialization format
+ *
+ * This file implements the specification you can find at:
+ *
+ * https://github.com/antirez/listpack
+ *
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdint.h>
+#include <limits.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "listpack.h"
+#include "listpack_malloc.h"
+
+#define LP_HDR_SIZE 6 /* 32 bit total len + 16 bit number of elements. */
+#define LP_HDR_NUMELE_UNKNOWN UINT16_MAX
+#define LP_MAX_INT_ENCODING_LEN 9
+#define LP_MAX_BACKLEN_SIZE 5
+#define LP_MAX_ENTRY_BACKLEN 34359738367ULL
+#define LP_ENCODING_INT 0
+#define LP_ENCODING_STRING 1
+
+#define LP_ENCODING_7BIT_UINT 0
+#define LP_ENCODING_7BIT_UINT_MASK 0x80
+#define LP_ENCODING_IS_7BIT_UINT(byte) (((byte)&LP_ENCODING_7BIT_UINT_MASK)==LP_ENCODING_7BIT_UINT)
+
+#define LP_ENCODING_6BIT_STR 0x80
+#define LP_ENCODING_6BIT_STR_MASK 0xC0
+#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR)
+
+#define LP_ENCODING_13BIT_INT 0xC0
+#define LP_ENCODING_13BIT_INT_MASK 0xE0
+#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT)
+
+#define LP_ENCODING_12BIT_STR 0xE0
+#define LP_ENCODING_12BIT_STR_MASK 0xF0
+#define LP_ENCODING_IS_12BIT_STR(byte) (((byte)&LP_ENCODING_12BIT_STR_MASK)==LP_ENCODING_12BIT_STR)
+
+#define LP_ENCODING_16BIT_INT 0xF1
+#define LP_ENCODING_16BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_16BIT_INT(byte) (((byte)&LP_ENCODING_16BIT_INT_MASK)==LP_ENCODING_16BIT_INT)
+
+#define LP_ENCODING_24BIT_INT 0xF2
+#define LP_ENCODING_24BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_24BIT_INT(byte) (((byte)&LP_ENCODING_24BIT_INT_MASK)==LP_ENCODING_24BIT_INT)
+
+#define LP_ENCODING_32BIT_INT 0xF3
+#define LP_ENCODING_32BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_32BIT_INT(byte) (((byte)&LP_ENCODING_32BIT_INT_MASK)==LP_ENCODING_32BIT_INT)
+
+#define LP_ENCODING_64BIT_INT 0xF4
+#define LP_ENCODING_64BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_64BIT_INT(byte) (((byte)&LP_ENCODING_64BIT_INT_MASK)==LP_ENCODING_64BIT_INT)
+
+#define LP_ENCODING_32BIT_STR 0xF0
+#define LP_ENCODING_32BIT_STR_MASK 0xFF
+#define LP_ENCODING_IS_32BIT_STR(byte) (((byte)&LP_ENCODING_32BIT_STR_MASK)==LP_ENCODING_32BIT_STR)
+
+#define LP_EOF 0xFF
+
+#define LP_ENCODING_6BIT_STR_LEN(p) ((p)[0] & 0x3F)
+#define LP_ENCODING_12BIT_STR_LEN(p) ((((p)[0] & 0xF) << 8) | (p)[1])
+#define LP_ENCODING_32BIT_STR_LEN(p) (((uint32_t)(p)[1]<<0) | \
+ ((uint32_t)(p)[2]<<8) | \
+ ((uint32_t)(p)[3]<<16) | \
+ ((uint32_t)(p)[4]<<24))
+
+#define lpGetTotalBytes(p) (((uint32_t)(p)[0]<<0) | \
+ ((uint32_t)(p)[1]<<8) | \
+ ((uint32_t)(p)[2]<<16) | \
+ ((uint32_t)(p)[3]<<24))
+
+#define lpGetNumElements(p) (((uint32_t)(p)[4]<<0) | \
+ ((uint32_t)(p)[5]<<8))
+#define lpSetTotalBytes(p,v) do { \
+ (p)[0] = (v)&0xff; \
+ (p)[1] = ((v)>>8)&0xff; \
+ (p)[2] = ((v)>>16)&0xff; \
+ (p)[3] = ((v)>>24)&0xff; \
+} while(0)
+
+#define lpSetNumElements(p,v) do { \
+ (p)[4] = (v)&0xff; \
+ (p)[5] = ((v)>>8)&0xff; \
+} while(0)
+
+/* Convert a string into a signed 64 bit integer.
+ * The function returns 1 if the string could be parsed into a (non-overflowing)
+ * signed 64 bit int, 0 otherwise. The 'value' will be set to the parsed value
+ * when the function returns success.
+ *
+ * Note that this function demands that the string strictly represents
+ * a int64 value: no spaces or other characters before or after the string
+ * representing the number are accepted, nor zeroes at the start if not
+ * for the string "0" representing the zero number.
+ *
+ * Because of its strictness, it is safe to use this function to check if
+ * you can convert a string into a long long, and obtain back the string
+ * from the number without any loss in the string representation. *
+ *
+ * -----------------------------------------------------------------------------
+ *
+ * Credits: this function was adapted from the Redis source code, file
+ * "utils.c", function string2ll(), and is copyright:
+ *
+ * Copyright(C) 2011, Pieter Noordhuis
+ * Copyright(C) 2011, Salvatore Sanfilippo
+ *
+ * The function is released under the BSD 3-clause license.
+ */
+int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
+ const char *p = s;
+ unsigned long plen = 0;
+ int negative = 0;
+ uint64_t v;
+
+ if (plen == slen)
+ return 0;
+
+ /* Special case: first and only digit is 0. */
+ if (slen == 1 && p[0] == '0') {
+ if (value != NULL) *value = 0;
+ return 1;
+ }
+
+ if (p[0] == '-') {
+ negative = 1;
+ p++; plen++;
+
+ /* Abort on only a negative sign. */
+ if (plen == slen)
+ return 0;
+ }
+
+ /* First digit should be 1-9, otherwise the string should just be 0. */
+ if (p[0] >= '1' && p[0] <= '9') {
+ v = p[0]-'0';
+ p++; plen++;
+ } else if (p[0] == '0' && slen == 1) {
+ *value = 0;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ while (plen < slen && p[0] >= '0' && p[0] <= '9') {
+ if (v > (UINT64_MAX / 10)) /* Overflow. */
+ return 0;
+ v *= 10;
+
+ if (v > (UINT64_MAX - (p[0]-'0'))) /* Overflow. */
+ return 0;
+ v += p[0]-'0';
+
+ p++; plen++;
+ }
+
+ /* Return if not all bytes were used. */
+ if (plen < slen)
+ return 0;
+
+ if (negative) {
+ if (v > ((uint64_t)(-(INT64_MIN+1))+1)) /* Overflow. */
+ return 0;
+ if (value != NULL) *value = -v;
+ } else {
+ if (v > INT64_MAX) /* Overflow. */
+ return 0;
+ if (value != NULL) *value = v;
+ }
+ return 1;
+}
+
+/* Create a new, empty listpack.
+ * On success the new listpack is returned, otherwise an error is returned. */
+unsigned char *lpNew(void) {
+ unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
+ if (lp == NULL) return NULL;
+ lpSetTotalBytes(lp,LP_HDR_SIZE+1);
+ lpSetNumElements(lp,0);
+ lp[LP_HDR_SIZE] = LP_EOF;
+ return lp;
+}
+
+/* Free the specified listpack. */
+void lpFree(unsigned char *lp) {
+ lp_free(lp);
+}
+
+/* Given an element 'ele' of size 'size', determine if the element can be
+ * represented inside the listpack encoded as integer, and returns
+ * LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
+ * encoding is possible.
+ *
+ * If the LP_ENCODING_INT is returned, the function stores the integer encoded
+ * representation of the element in the 'intenc' buffer.
+ *
+ * Regardless of the returned encoding, 'enclen' is populated by reference to
+ * the number of bytes that the string or integer encoded element will require
+ * in order to be represented. */
+int lpEncodeGetType(unsigned char *ele, uint32_t size, unsigned char *intenc, uint64_t *enclen) {
+ int64_t v;
+ if (lpStringToInt64((const char*)ele, size, &v)) {
+ if (v >= 0 && v <= 127) {
+ /* Single byte 0-127 integer. */
+ intenc[0] = v;
+ *enclen = 1;
+ } else if (v >= -4096 && v <= 4095) {
+ /* 13 bit integer. */
+ if (v < 0) v = ((int64_t)1<<13)+v;
+ intenc[0] = (v>>8)|LP_ENCODING_13BIT_INT;
+ intenc[1] = v&0xff;
+ *enclen = 2;
+ } else if (v >= -32768 && v <= 32767) {
+ /* 16 bit integer. */
+ if (v < 0) v = ((int64_t)1<<16)+v;
+ intenc[0] = LP_ENCODING_16BIT_INT;
+ intenc[1] = v&0xff;
+ intenc[2] = v>>8;
+ *enclen = 3;
+ } else if (v >= -8388608 && v <= 8388607) {
+ /* 24 bit integer. */
+ if (v < 0) v = ((int64_t)1<<24)+v;
+ intenc[0] = LP_ENCODING_24BIT_INT;
+ intenc[1] = v&0xff;
+ intenc[2] = (v>>8)&0xff;
+ intenc[3] = v>>16;
+ *enclen = 4;
+ } else if (v >= -2147483648 && v <= 2147483647) {
+ /* 32 bit integer. */
+ if (v < 0) v = ((int64_t)1<<32)+v;
+ intenc[0] = LP_ENCODING_32BIT_INT;
+ intenc[1] = v&0xff;
+ intenc[2] = (v>>8)&0xff;
+ intenc[3] = (v>>16)&0xff;
+ intenc[4] = v>>24;
+ *enclen = 5;
+ } else {
+ /* 64 bit integer. */
+ uint64_t uv = v;
+ intenc[0] = LP_ENCODING_64BIT_INT;
+ intenc[1] = uv&0xff;
+ intenc[2] = (uv>>8)&0xff;
+ intenc[3] = (uv>>16)&0xff;
+ intenc[4] = (uv>>24)&0xff;
+ intenc[5] = (uv>>32)&0xff;
+ intenc[6] = (uv>>40)&0xff;
+ intenc[7] = (uv>>48)&0xff;
+ intenc[8] = uv>>56;
+ *enclen = 9;
+ }
+ return LP_ENCODING_INT;
+ } else {
+ if (size < 64) *enclen = 1+size;
+ else if (size < 4096) *enclen = 2+size;
+ else *enclen = 5+size;
+ return LP_ENCODING_STRING;
+ }
+}
+
+/* Store a reverse-encoded variable length field, representing the length
+ * of the previous element of size 'l', in the target buffer 'buf'.
+ * The function returns the number of bytes used to encode it, from
+ * 1 to 5. If 'buf' is NULL the funciton just returns the number of bytes
+ * needed in order to encode the backlen. */
+unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {
+ if (l <= 127) {
+ if (buf) buf[0] = l;
+ return 1;
+ } else if (l < 16383) {
+ if (buf) {
+ buf[0] = l>>7;
+ buf[1] = (l&127)|128;
+ }
+ return 2;
+ } else if (l < 2097151) {
+ if (buf) {
+ buf[0] = l>>14;
+ buf[1] = ((l>>7)&127)|128;
+ buf[2] = (l&127)|128;
+ }
+ return 3;
+ } else if (l < 268435455) {
+ if (buf) {
+ buf[0] = l>>21;
+ buf[1] = ((l>>14)&127)|128;
+ buf[2] = ((l>>7)&127)|128;
+ buf[3] = (l&127)|128;
+ }
+ return 4;
+ } else {
+ if (buf) {
+ buf[0] = l>>28;
+ buf[1] = ((l>>21)&127)|128;
+ buf[2] = ((l>>14)&127)|128;
+ buf[3] = ((l>>7)&127)|128;
+ buf[4] = (l&127)|128;
+ }
+ return 5;
+ }
+}
+
+/* Decode the backlen and returns it. If the encoding looks invalid (more than
+ * 5 bytes are used), UINT64_MAX is returned to report the problem. */
+uint64_t lpDecodeBacklen(unsigned char *p) {
+ uint64_t val = 0;
+ uint64_t shift = 0;
+ do {
+ val |= (uint64_t)(p[0] & 127) << shift;
+ if (!(p[0] & 128)) break;
+ shift += 7;
+ p--;
+ if (shift > 28) return UINT64_MAX;
+ } while(1);
+ return val;
+}
+
+/* Encode the string element pointed by 's' of size 'len' in the target
+ * buffer 's'. The function should be called with 'buf' having always enough
+ * space for encoding the string. This is done by calling lpEncodeGetType()
+ * before calling this function. */
+void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) {
+ if (len < 64) {
+ buf[0] = len | LP_ENCODING_6BIT_STR;
+ memcpy(buf+1,s,len);
+ } else if (len < 4096) {
+ buf[0] = (len >> 8) | LP_ENCODING_12BIT_STR;
+ buf[1] = len & 0xff;
+ memcpy(buf+2,s,len);
+ } else {
+ buf[0] = LP_ENCODING_32BIT_STR;
+ buf[1] = len & 0xff;
+ buf[2] = (len >> 8) & 0xff;
+ buf[3] = (len >> 16) & 0xff;
+ buf[4] = (len >> 24) & 0xff;
+ memcpy(buf+5,s,len);
+ }
+}
+
+/* Return the encoded length of the listpack element pointed by 'p'. If the
+ * element encoding is wrong then 0 is returned. */
+uint32_t lpCurrentEncodedSize(unsigned char *p) {
+ if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;
+ if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p);
+ if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2;
+ if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3;
+ if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4;
+ if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5;
+ if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9;
+ if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p);
+ if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p);
+ if (p[0] == LP_EOF) return 1;
+ return 0;
+}
+
+/* Skip the current entry returning the next. It is invalid to call this
+ * function if the current element is the EOF element at the end of the
+ * listpack, however, while this function is used to implement lpNext(),
+ * it does not return NULL when the EOF element is encountered. */
+unsigned char *lpSkip(unsigned char *p) {
+ unsigned long entrylen = lpCurrentEncodedSize(p);
+ entrylen += lpEncodeBacklen(NULL,entrylen);
+ p += entrylen;
+ return p;
+}
+
+/* If 'p' points to an element of the listpack, calling lpNext() will return
+ * the pointer to the next element (the one on the right), or NULL if 'p'
+ * already pointed to the last element of the listpack. */
+unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
+ ((void) lp); /* lp is not used for now. However lpPrev() uses it. */
+ p = lpSkip(p);
+ if (p[0] == LP_EOF) return NULL;
+ return p;
+}
+
+/* If 'p' points to an element of the listpack, calling lpPrev() will return
+ * the pointer to the preivous element (the one on the left), or NULL if 'p'
+ * already pointed to the first element of the listpack. */
+unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
+ if (p-lp == LP_HDR_SIZE) return NULL;
+ p--; /* Seek the first backlen byte of the last element. */
+ uint64_t prevlen = lpDecodeBacklen(p);
+ prevlen += lpEncodeBacklen(NULL,prevlen);
+ return p-prevlen+1; /* Seek the first byte of the previous entry. */
+}
+
+/* Return a pointer to the first element of the listpack, or NULL if the
+ * listpack has no elements. */
+unsigned char *lpFirst(unsigned char *lp) {
+ lp += LP_HDR_SIZE; /* Skip the header. */
+ if (lp[0] == LP_EOF) return NULL;
+ return lp;
+}
+
+/* Return a pointer to the last element of the listpack, or NULL if the
+ * listpack has no elements. */
+unsigned char *lpLast(unsigned char *lp) {
+ unsigned char *p = lp+lpGetTotalBytes(lp)-1; /* Seek EOF element. */
+ return lpPrev(lp,p); /* Will return NULL if EOF is the only element. */
+}
+
+/* Return the number of elements inside the listpack. This function attempts
+ * to use the cached value when within range, otherwise a full scan is
+ * needed. As a side effect of calling this function, the listpack header
+ * could be modified, because if the count is found to be already within
+ * the 'numele' header field range, the new value is set. */
+uint32_t lpLength(unsigned char *lp) {
+ uint32_t numele = lpGetNumElements(lp);
+ if (numele != LP_HDR_NUMELE_UNKNOWN) return numele;
+
+ /* Too many elements inside the listpack. We need to scan in order
+ * to get the total number. */
+ uint32_t count = 0;
+ unsigned char *p = lpFirst(lp);
+ while(p) {
+ count++;
+ p = lpNext(lp,p);
+ }
+
+ /* If the count is again within range of the header numele field,
+ * set it. */
+ if (count < LP_HDR_NUMELE_UNKNOWN) lpSetNumElements(lp,count);
+ return count;
+}
+
+/* Return the listpack element pointed by 'p'.
+ *
+ * The function changes behavior depending on the passed 'intbuf' value.
+ * Specifically, if 'intbuf' is NULL:
+ *
+ * If the element is internally encoded as an integer, the function returns
+ * NULL and populates the integer value by reference in 'count'. Otherwise if
+ * the element is encoded as a string a pointer to the string (pointing inside
+ * the listpack itself) is returned, and 'count' is set to the length of the
+ * string.
+ *
+ * If instead 'intbuf' points to a buffer passed by the caller, that must be
+ * at least LP_INTBUF_SIZE bytes, the function always returns the element as
+ * it was a string (returning the pointer to the string and setting the
+ * 'count' argument to the string length by reference). However if the element
+ * is encoded as an integer, the 'intbuf' buffer is used in order to store
+ * the string representation.
+ *
+ * The user should use one or the other form depending on what the value will
+ * be used for. If there is immediate usage for an integer value returned
+ * by the function, than to pass a buffer (and convert it back to a number)
+ * is of course useless.
+ *
+ * If the function is called against a badly encoded ziplist, so that there
+ * is no valid way to parse it, the function returns like if there was an
+ * integer encoded with value 12345678900000000 + <unrecognized byte>, this may
+ * be an hint to understand that something is wrong. To crash in this case is
+ * not sensible because of the different requirements of the application using
+ * this lib.
+ *
+ * Similarly, there is no error returned since the listpack normally can be
+ * assumed to be valid, so that would be a very high API cost. However a function
+ * in order to check the integrity of the listpack at load time is provided,
+ * check lpIsValid(). */
+unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
+ int64_t val;
+ uint64_t uval, negstart, negmax;
+
+ if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
+ negstart = UINT64_MAX; /* 7 bit ints are always positive. */
+ negmax = 0;
+ uval = p[0] & 0x7f;
+ } else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
+ *count = LP_ENCODING_6BIT_STR_LEN(p);
+ return p+1;
+ } else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
+ uval = ((p[0]&0x1f)<<8) | p[1];
+ negstart = (uint64_t)1<<12;
+ negmax = 8191;
+ } else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8;
+ negstart = (uint64_t)1<<15;
+ negmax = UINT16_MAX;
+ } else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8 |
+ (uint64_t)p[3]<<16;
+ negstart = (uint64_t)1<<23;
+ negmax = UINT32_MAX>>8;
+ } else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8 |
+ (uint64_t)p[3]<<16 |
+ (uint64_t)p[4]<<24;
+ negstart = (uint64_t)1<<31;
+ negmax = UINT32_MAX;
+ } else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8 |
+ (uint64_t)p[3]<<16 |
+ (uint64_t)p[4]<<24 |
+ (uint64_t)p[5]<<32 |
+ (uint64_t)p[6]<<40 |
+ (uint64_t)p[7]<<48 |
+ (uint64_t)p[8]<<56;
+ negstart = (uint64_t)1<<63;
+ negmax = UINT64_MAX;
+ } else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
+ *count = LP_ENCODING_12BIT_STR_LEN(p);
+ return p+2;
+ } else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
+ *count = LP_ENCODING_32BIT_STR_LEN(p);
+ return p+5;
+ } else {
+ uval = 12345678900000000ULL + p[0];
+ negstart = UINT64_MAX;
+ negmax = 0;
+ }
+
+ /* We reach this code path only for integer encodings.
+ * Convert the unsigned value to the signed one using two's complement
+ * rule. */
+ if (uval >= negstart) {
+ /* This three steps conversion should avoid undefined behaviors
+ * in the unsigned -> signed conversion. */
+ uval = negmax-uval;
+ val = uval;
+ val = -val-1;
+ } else {
+ val = uval;
+ }
+
+ /* Return the string representation of the integer or the value itself
+ * depending on intbuf being NULL or not. */
+ if (intbuf) {
+ *count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",val);
+ return intbuf;
+ } else {
+ *count = val;
+ return NULL;
+ }
+}
+
+/* Insert, delete or replace the specified element 'ele' of lenght 'len' at
+ * the specified position 'p', with 'p' being a listpack element pointer
+ * obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or
+ * lpSeek().
+ *
+ * The element is inserted before, after, or replaces the element pointed
+ * by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER
+ * or LP_REPLACE.
+ *
+ * If 'ele' is set to NULL, the function removes the element pointed by 'p'
+ * instead of inserting one.
+ *
+ * Returns NULL on out of memory or when the listpack total length would exceed
+ * the max allowed size of 2^32-1, otherwise the new pointer to the listpack
+ * holding the new element is returned (and the old pointer passed is no longer
+ * considered valid)
+ *
+ * If 'newp' is not NULL, at the end of a successful call '*newp' will be set
+ * to the address of the element just added, so that it will be possible to
+ * continue an interation with lpNext() and lpPrev().
+ *
+ * For deletion operations ('ele' set to NULL) 'newp' is set to the next
+ * element, on the right of the deleted one, or to NULL if the deleted element
+ * was the last one. */
+unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
+ unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
+ unsigned char backlen[LP_MAX_BACKLEN_SIZE];
+
+ uint64_t enclen; /* The length of the encoded element. */
+
+ /* An element pointer set to NULL means deletion, which is conceptually
+ * replacing the element with a zero-length element. So whatever we
+ * get passed as 'where', set it to LP_REPLACE. */
+ if (ele == NULL) where = LP_REPLACE;
+
+ /* If we need to insert after the current element, we just jump to the
+ * next element (that could be the EOF one) and handle the case of
+ * inserting before. So the function will actually deal with just two
+ * cases: LP_BEFORE and LP_REPLACE. */
+ if (where == LP_AFTER) {
+ p = lpSkip(p);
+ where = LP_BEFORE;
+ }
+
+ /* Store the offset of the element 'p', so that we can obtain its
+ * address again after a reallocation. */
+ unsigned long poff = p-lp;
+
+ /* Calling lpEncodeGetType() results into the encoded version of the
+ * element to be stored into 'intenc' in case it is representable as
+ * an integer: in that case, the function returns LP_ENCODING_INT.
+ * Otherwise if LP_ENCODING_STR is returned, we'll have to call
+ * lpEncodeString() to actually write the encoded string on place later.
+ *
+ * Whatever the returned encoding is, 'enclen' is populated with the
+ * length of the encoded element. */
+ int enctype;
+ if (ele) {
+ enctype = lpEncodeGetType(ele,size,intenc,&enclen);
+ } else {
+ enctype = -1;
+ enclen = 0;
+ }
+
+ /* We need to also encode the backward-parsable length of the element
+ * and append it to the end: this allows to traverse the listpack from
+ * the end to the start. */
+ unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
+ uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
+ uint32_t replaced_len = 0;
+ if (where == LP_REPLACE) {
+ replaced_len = lpCurrentEncodedSize(p);
+ replaced_len += lpEncodeBacklen(NULL,replaced_len);
+ }
+
+ uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
+ - replaced_len;
+ if (new_listpack_bytes > UINT32_MAX) return NULL;
+
+ /* We now need to reallocate in order to make space or shrink the
+ * allocation (in case 'when' value is LP_REPLACE and the new element is
+ * smaller). However we do that before memmoving the memory to
+ * make room for the new element if the final allocation will get
+ * larger, or we do it after if the final allocation will get smaller. */
+
+ unsigned char *dst = lp + poff; /* May be updated after reallocation. */
+
+ /* Realloc before: we need more room. */
+ if (new_listpack_bytes > old_listpack_bytes) {
+ if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
+ dst = lp + poff;
+ }
+
+ /* Setup the listpack relocating the elements to make the exact room
+ * we need to store the new one. */
+ if (where == LP_BEFORE) {
+ memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
+ } else { /* LP_REPLACE. */
+ long lendiff = (enclen+backlen_size)-replaced_len;
+ memmove(dst+replaced_len+lendiff,
+ dst+replaced_len,
+ old_listpack_bytes-poff-replaced_len);
+ }
+
+ /* Realloc after: we need to free space. */
+ if (new_listpack_bytes < old_listpack_bytes) {
+ if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
+ dst = lp + poff;
+ }
+
+ /* Store the entry. */
+ if (newp) {
+ *newp = dst;
+ /* In case of deletion, set 'newp' to NULL if the next element is
+ * the EOF element. */
+ if (!ele && dst[0] == LP_EOF) *newp = NULL;
+ }
+ if (ele) {
+ if (enctype == LP_ENCODING_INT) {
+ memcpy(dst,intenc,enclen);
+ } else {
+ lpEncodeString(dst,ele,size);
+ }
+ dst += enclen;
+ memcpy(dst,backlen,backlen_size);
+ dst += backlen_size;
+ }
+
+ /* Update header. */
+ if (where != LP_REPLACE || ele == NULL) {
+ uint32_t num_elements = lpGetNumElements(lp);
+ if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
+ if (ele)
+ lpSetNumElements(lp,num_elements+1);
+ else
+ lpSetNumElements(lp,num_elements-1);
+ }
+ }
+ lpSetTotalBytes(lp,new_listpack_bytes);
+ return lp;
+}
+
+/* Append the specified element 'ele' of lenght 'len' at the end of the
+ * listpack. It is implemented in terms of lpInsert(), so the return value is
+ * the same as lpInsert(). */
+unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) {
+ uint64_t listpack_bytes = lpGetTotalBytes(lp);
+ unsigned char *eofptr = lp + listpack_bytes - 1;
+ return lpInsert(lp,ele,size,eofptr,LP_BEFORE,NULL);
+}
+
+/* Remove the element pointed by 'p', and return the resulting listpack.
+ * If 'newp' is not NULL, the next element pointer (to the right of the
+ * deleted one) is returned by reference. If the deleted element was the
+ * last one, '*newp' is set to NULL. */
+unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {
+ return lpInsert(lp,NULL,0,p,LP_REPLACE,newp);
+}
+
+/* Return the total number of bytes the listpack is composed of. */
+uint32_t lpBytes(unsigned char *lp) {
+ return lpGetTotalBytes(lp);
+}
+
+/* Seek the specified element and returns the pointer to the seeked element.
+ * Positive indexes specify the zero-based element to seek from the head to
+ * the tail, negative indexes specify elements starting from the tail, where
+ * -1 means the last element, -2 the penultimate and so forth. If the index
+ * is out of range, NULL is returned. */
+unsigned char *lpSeek(unsigned char *lp, long index) {
+ int forward = 1; /* Seek forward by default. */
+
+ /* We want to seek from left to right or the other way around
+ * depending on the listpack length and the element position.
+ * However if the listpack length cannot be obtained in constant time,
+ * we always seek from left to right. */
+ uint32_t numele = lpGetNumElements(lp);
+ if (numele != LP_HDR_NUMELE_UNKNOWN) {
+ if (index < 0) index = (long)numele+index;
+ if (index < 0) return NULL; /* Index still < 0 means out of range. */
+ if (index >= numele) return NULL; /* Out of range the other side. */
+ /* We want to scan right-to-left if the element we are looking for
+ * is past the half of the listpack. */
+ if (index > numele/2) {
+ forward = 0;
+ /* Left to right scanning always expects a negative index. Convert
+ * our index to negative form. */
+ index -= numele;
+ }
+ } else {
+ /* If the listpack length is unspecified, for negative indexes we
+ * want to always scan left-to-right. */
+ if (index < 0) forward = 0;
+ }
+
+ /* Forward and backward scanning is trivially based on lpNext()/lpPrev(). */
+ if (forward) {
+ unsigned char *ele = lpFirst(lp);
+ while (index > 0 && ele) {
+ ele = lpNext(lp,ele);
+ index--;
+ }
+ return ele;
+ } else {
+ unsigned char *ele = lpLast(lp);
+ while (index < -1 && ele) {
+ ele = lpPrev(lp,ele);
+ index++;
+ }
+ return ele;
+ }
+}
+
diff --git a/src/listpack.h b/src/listpack.h
new file mode 100644
index 000000000..af67b4b41
--- /dev/null
+++ b/src/listpack.h
@@ -0,0 +1,61 @@
+/* Listpack -- A lists of strings serialization format
+ *
+ * This file implements the specification you can find at:
+ *
+ * https://github.com/antirez/listpack
+ *
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __LISTPACK_H
+#define __LISTPACK_H
+
+#include <stdint.h>
+
+#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
+
+/* lpInsert() where argument possible values: */
+#define LP_BEFORE 0
+#define LP_AFTER 1
+#define LP_REPLACE 2
+
+unsigned char *lpNew(void);
+void lpFree(unsigned char *lp);
+unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
+unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
+unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
+uint32_t lpLength(unsigned char *lp);
+unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf);
+unsigned char *lpFirst(unsigned char *lp);
+unsigned char *lpLast(unsigned char *lp);
+unsigned char *lpNext(unsigned char *lp, unsigned char *p);
+unsigned char *lpPrev(unsigned char *lp, unsigned char *p);
+uint32_t lpBytes(unsigned char *lp);
+unsigned char *lpSeek(unsigned char *lp, long index);
+
+#endif
diff --git a/src/listpack_malloc.h b/src/listpack_malloc.h
new file mode 100644
index 000000000..401ab6f74
--- /dev/null
+++ b/src/listpack_malloc.h
@@ -0,0 +1,45 @@
+/* Listpack -- A lists of strings serialization format
+ * https://github.com/antirez/listpack
+ *
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* Allocator selection.
+ *
+ * This file is used in order to change the Rax allocator at compile time.
+ * Just define the following defines to what you want to use. Also add
+ * the include of your alternate allocator if needed (not needed in order
+ * to use the default libc allocator). */
+
+#ifndef LISTPACK_ALLOC_H
+#define LISTPACK_ALLOC_H
+#include "zmalloc.h"
+#define lp_malloc zmalloc
+#define lp_realloc zrealloc
+#define lp_free zfree
+#endif
diff --git a/src/lzfP.h b/src/lzfP.h
index c6d2e096c..93c27b42d 100644
--- a/src/lzfP.h
+++ b/src/lzfP.h
@@ -79,7 +79,11 @@
* Unconditionally aligning does not cost very much, so do it if unsure
*/
#ifndef STRICT_ALIGN
-# define STRICT_ALIGN !(defined(__i386) || defined (__amd64))
+# if !(defined(__i386) || defined (__amd64))
+# define STRICT_ALIGN 1
+# else
+# define STRICT_ALIGN 0
+# endif
#endif
/*
diff --git a/src/networking.c b/src/networking.c
index fc484b676..8fba0a660 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -67,6 +67,16 @@ int listMatchObjects(void *a, void *b) {
return equalStringObjects(a,b);
}
+/* This function links the client to the global linked list of clients.
+ * unlinkClient() does the opposite, among other things. */
+void linkClient(client *c) {
+ listAddNodeTail(server.clients,c);
+ /* Note that we remember the linked list node where the client is stored,
+ * this way removing the client in unlinkClient() will not require
+ * a linear scan, but just a constant time operation. */
+ c->client_list_node = listLast(server.clients);
+}
+
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
@@ -124,8 +134,9 @@ client *createClient(int fd) {
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
- c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL);
+ c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
+ c->bpop.xread_group = NULL;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
@@ -133,9 +144,10 @@ client *createClient(int fd) {
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
+ c->client_list_node = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
- if (fd != -1) listAddNodeTail(server.clients,c);
+ if (fd != -1) linkClient(c);
initClientMultiState(c);
return c;
}
@@ -767,9 +779,10 @@ void unlinkClient(client *c) {
* fd is already set to -1. */
if (c->fd != -1) {
/* Remove from the list of active clients. */
- ln = listSearchKey(server.clients,c);
- serverAssert(ln != NULL);
- listDelNode(server.clients,ln);
+ if (c->client_list_node) {
+ listDelNode(server.clients,c->client_list_node);
+ c->client_list_node = NULL;
+ }
/* Unregister async I/O handlers and close the socket. */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
diff --git a/src/notify.c b/src/notify.c
index 94a1f2e79..9bbeb1423 100644
--- a/src/notify.c
+++ b/src/notify.c
@@ -54,6 +54,7 @@ int keyspaceEventsStringToFlags(char *classes) {
case 'e': flags |= NOTIFY_EVICTED; break;
case 'K': flags |= NOTIFY_KEYSPACE; break;
case 'E': flags |= NOTIFY_KEYEVENT; break;
+ case 't': flags |= NOTIFY_STREAM; break;
default: return -1;
}
}
@@ -79,6 +80,7 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_ZSET) res = sdscatlen(res,"z",1);
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
+ if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
}
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
diff --git a/src/object.c b/src/object.c
index 2b9403a49..4e031768b 100644
--- a/src/object.c
+++ b/src/object.c
@@ -232,6 +232,13 @@ robj *createZsetZiplistObject(void) {
return o;
}
+robj *createStreamObject(void) {
+ stream *s = streamNew();
+ robj *o = createObject(OBJ_STREAM,s);
+ o->encoding = OBJ_ENCODING_STREAM;
+ return o;
+}
+
robj *createModuleObject(moduleType *mt, void *value) {
moduleValue *mv = zmalloc(sizeof(*mv));
mv->type = mt;
@@ -303,6 +310,10 @@ void freeModuleObject(robj *o) {
zfree(mv);
}
+void freeStreamObject(robj *o) {
+ freeStream(o->ptr);
+}
+
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}
@@ -316,6 +327,7 @@ void decrRefCount(robj *o) {
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
+ case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
@@ -788,6 +800,49 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
} else {
serverPanic("Unknown hash encoding");
}
+ } else if (o->type == OBJ_STREAM) {
+ stream *s = o->ptr;
+ /* Note: to guess the size of the radix tree is not trivial, so we
+ * approximate it considering 64 bytes of data overhead for each
+ * key (the ID), and then adding the number of bare nodes, plus some
+ * overhead due by the data and child pointers. This secret recipe
+ * was obtained by checking the average radix tree created by real
+ * workloads, and then adjusting the constants to get numbers that
+ * more or less match the real memory usage.
+ *
+ * Actually the number of nodes and keys may be different depending
+ * on the insertion speed and thus the ability of the radix tree
+ * to compress prefixes. */
+ asize = sizeof(*o);
+ asize += s->rax->numele * 64;
+ asize += s->rax->numnodes * sizeof(raxNode);
+ asize += s->rax->numnodes * 32*7; /* Add a few child pointers... */
+
+ /* Now we have to add the listpacks. The last listpack is often non
+ * complete, so we estimate the size of the first N listpacks, and
+ * use the average to compute the size of the first N-1 listpacks, and
+ * finally add the real size of the last node. */
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+ raxSeek(&ri,"^",NULL,0);
+ size_t lpsize = 0, samples = 0;
+ while(samples < sample_size && raxNext(&ri)) {
+ unsigned char *lp = ri.data;
+ lpsize += lpBytes(lp);
+ samples++;
+ }
+ if (s->rax->numele <= samples) {
+ asize += lpsize;
+ } else {
+ if (samples) lpsize /= samples; /* Compute the average. */
+ asize += lpsize * (s->rax->numele-1);
+ /* No need to check if seek succeeded, we enter this branch only
+ * if there are a few elements in the radix tree. */
+ raxSeek(&ri,"$",NULL,0);
+ raxNext(&ri);
+ asize += lpBytes(ri.data);
+ }
+ raxStop(&ri);
} else if (o->type == OBJ_MODULE) {
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
@@ -1045,10 +1100,14 @@ void objectCommand(client *c) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
- addReplyError(c,"A non-LFU maxmemory policy is selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
+ addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
- addReplyLongLong(c,o->lru&255);
+ /* LFUDecrAndReturn should be called
+ * in case of the key has not been accessed for a long time,
+ * because we update the access time only
+ * when the key is read or overwritten. */
+ addReplyLongLong(c,LFUDecrAndReturn(o));
} else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try OBJECT help",
(char *)c->argv[1]->ptr);
diff --git a/src/quicklist.c b/src/quicklist.c
index c8b72743c..faa08c65f 100644
--- a/src/quicklist.c
+++ b/src/quicklist.c
@@ -149,7 +149,7 @@ REDIS_STATIC quicklistNode *quicklistCreateNode(void) {
}
/* Return cached quicklist count */
-unsigned int quicklistCount(const quicklist *ql) { return ql->count; }
+unsigned long quicklistCount(const quicklist *ql) { return ql->count; }
/* Free entire quicklist. */
void quicklistRelease(quicklist *quicklist) {
diff --git a/src/quicklist.h b/src/quicklist.h
index 8f3875900..955a22cfa 100644
--- a/src/quicklist.h
+++ b/src/quicklist.h
@@ -64,7 +64,7 @@ typedef struct quicklistLZF {
char compressed[];
} quicklistLZF;
-/* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist.
+/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
@@ -74,7 +74,7 @@ typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
- unsigned int len; /* number of quicklistNodes */
+ unsigned long len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;
@@ -154,7 +154,7 @@ int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
void *(*saver)(unsigned char *data, unsigned int sz));
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *slong);
-unsigned int quicklistCount(const quicklist *ql);
+unsigned long quicklistCount(const quicklist *ql);
int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len);
size_t quicklistGetLzf(const quicklistNode *node, void **data);
diff --git a/src/rax.c b/src/rax.c
index dda008dff..442e7bfef 100644
--- a/src/rax.c
+++ b/src/rax.c
@@ -131,7 +131,7 @@ static inline void raxStackFree(raxStack *ts) {
}
/* ----------------------------------------------------------------------------
- * Radis tree implementation
+ * Radix tree implementation
* --------------------------------------------------------------------------*/
/* Allocate a new non compressed node with the specified number of children.
@@ -873,7 +873,8 @@ raxNode *raxRemoveChild(raxNode *parent, raxNode *child) {
memmove(((char*)cp)-1,cp,(parent->size-taillen-1)*sizeof(raxNode**));
/* Move the remaining "tail" pointer at the right position as well. */
- memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+parent->iskey*sizeof(void*));
+ size_t valuelen = (parent->iskey && !parent->isnull) ? sizeof(void*) : 0;
+ memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+valuelen);
/* 4. Update size. */
parent->size--;
@@ -1092,28 +1093,36 @@ int raxRemove(rax *rax, unsigned char *s, size_t len, void **old) {
/* This is the core of raxFree(): performs a depth-first scan of the
* tree and releases all the nodes found. */
-void raxRecursiveFree(rax *rax, raxNode *n) {
+void raxRecursiveFree(rax *rax, raxNode *n, void (*free_callback)(void*)) {
debugnode("free traversing",n);
int numchildren = n->iscompr ? 1 : n->size;
raxNode **cp = raxNodeLastChildPtr(n);
while(numchildren--) {
raxNode *child;
memcpy(&child,cp,sizeof(child));
- raxRecursiveFree(rax,child);
+ raxRecursiveFree(rax,child,free_callback);
cp--;
}
debugnode("free depth-first",n);
+ if (free_callback && n->iskey && !n->isnull)
+ free_callback(raxGetData(n));
rax_free(n);
rax->numnodes--;
}
-/* Free a whole radix tree. */
-void raxFree(rax *rax) {
- raxRecursiveFree(rax,rax->head);
+/* Free a whole radix tree, calling the specified callback in order to
+ * free the auxiliary data. */
+void raxFreeWithCallback(rax *rax, void (*free_callback)(void*)) {
+ raxRecursiveFree(rax,rax->head,free_callback);
assert(rax->numnodes == 0);
rax_free(rax);
}
+/* Free a whole radix tree. */
+void raxFree(rax *rax) {
+ raxFreeWithCallback(rax,NULL);
+}
+
/* ------------------------------- Iterator --------------------------------- */
/* Initialize a Rax iterator. This call should be performed a single time
@@ -1175,7 +1184,7 @@ void raxIteratorDelChars(raxIterator *it, size_t count) {
* The function returns 1 on success or 0 on out of memory. */
int raxIteratorNextStep(raxIterator *it, int noup) {
if (it->flags & RAX_ITER_EOF) {
- return 0;
+ return 1;
} else if (it->flags & RAX_ITER_JUST_SEEKED) {
it->flags &= ~RAX_ITER_JUST_SEEKED;
return 1;
@@ -1187,10 +1196,6 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
size_t orig_stack_items = it->stack.items;
raxNode *orig_node = it->node;
- /* Clear the EOF flag: it will be set again if the EOF condition
- * is still valid. */
- it->flags &= ~RAX_ITER_EOF;
-
while(1) {
int children = it->node->iscompr ? 1 : it->node->size;
if (!noup && children) {
@@ -1291,7 +1296,7 @@ int raxSeekGreatest(raxIterator *it) {
* effect to the one of raxIteratorPrevSte(). */
int raxIteratorPrevStep(raxIterator *it, int noup) {
if (it->flags & RAX_ITER_EOF) {
- return 0;
+ return 1;
} else if (it->flags & RAX_ITER_JUST_SEEKED) {
it->flags &= ~RAX_ITER_JUST_SEEKED;
return 1;
@@ -1412,6 +1417,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
it->node = it->rt->head;
if (!raxSeekGreatest(it)) return 0;
assert(it->node->iskey);
+ it->data = raxGetData(it->node);
return 1;
}
@@ -1430,6 +1436,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
/* We found our node, since the key matches and we have an
* "equal" condition. */
if (!raxIteratorAddChars(it,ele,len)) return 0; /* OOM. */
+ it->data = raxGetData(it->node);
} else if (lt || gt) {
/* Exact key not found or eq flag not set. We have to set as current
* key the one represented by the node we stopped at, and perform
@@ -1502,6 +1509,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
* the previous sub-tree. */
if (nodechar < keychar) {
if (!raxSeekGreatest(it)) return 0;
+ it->data = raxGetData(it->node);
} else {
if (!raxIteratorAddChars(it,it->node->data,it->node->size))
return 0;
@@ -1647,6 +1655,19 @@ void raxStop(raxIterator *it) {
raxStackFree(&it->stack);
}
+/* Return if the iterator is in an EOF state. This happens when raxSeek()
+ * failed to seek an appropriate element, so that raxNext() or raxPrev()
+ * will return zero, or when an EOF condition was reached while iterating
+ * with raxNext() and raxPrev(). */
+int raxEOF(raxIterator *it) {
+ return it->flags & RAX_ITER_EOF;
+}
+
+/* Return the number of elements inside the radix tree. */
+uint64_t raxSize(rax *rax) {
+ return rax->numele;
+}
+
/* ----------------------------- Introspection ------------------------------ */
/* This function is mostly used for debugging and learning purposes.
diff --git a/src/rax.h b/src/rax.h
index 6f91f4c1b..b4e2fd91e 100644
--- a/src/rax.h
+++ b/src/rax.h
@@ -148,6 +148,7 @@ int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old);
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
void *raxFind(rax *rax, unsigned char *s, size_t len);
void raxFree(rax *rax);
+void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
@@ -155,6 +156,8 @@ int raxPrev(raxIterator *it);
int raxRandomWalk(raxIterator *it, size_t steps);
int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key_len);
void raxStop(raxIterator *it);
+int raxEOF(raxIterator *it);
void raxShow(rax *rax);
+uint64_t raxSize(rax *rax);
#endif
diff --git a/src/rdb.c b/src/rdb.c
index 00106cac4..28985b2a6 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -31,6 +31,7 @@
#include "lzf.h" /* LZF compression library */
#include "zipmap.h"
#include "endianconv.h"
+#include "stream.h"
#include <math.h>
#include <sys/types.h>
@@ -622,6 +623,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
+ case OBJ_STREAM:
+ return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
@@ -762,7 +765,39 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
} else {
serverPanic("Unknown hash encoding");
}
+ } else if (o->type == OBJ_STREAM) {
+ /* Store how many listpacks we have inside the radix tree. */
+ stream *s = o->ptr;
+ rax *rax = s->rax;
+ if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
+ nwritten += n;
+
+ /* Serialize all the listpacks inside the radix tree as they are,
+ * when loading back, we'll use the first entry of each listpack
+ * to insert it back into the radix tree. */
+ raxIterator ri;
+ raxStart(&ri,rax);
+ raxSeek(&ri,"^",NULL,0);
+ while (raxNext(&ri)) {
+ unsigned char *lp = ri.data;
+ size_t lp_bytes = lpBytes(lp);
+ if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
+ nwritten += n;
+ }
+ raxStop(&ri);
+ /* Save the number of elements inside the stream. We cannot obtain
+ * this easily later, since our macro nodes should be checked for
+ * number of items: not a great CPU / space tradeoff. */
+ if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
+ nwritten += n;
+ /* Save the last entry ID. */
+ if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
+ nwritten += n;
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;
@@ -943,6 +978,20 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
}
di = NULL; /* So that we don't release it again on error. */
+ /* If we are storing the replication information on disk, persist
+ * the script cache as well: on successful PSYNC after a restart, we need
+ * to be able to process any EVALSHA inside the replication backlog the
+ * master will send us. */
+ if (rsi && dictSize(server.lua_scripts)) {
+ di = dictGetIterator(server.lua_scripts);
+ while((de = dictNext(di)) != NULL) {
+ robj *body = dictGetVal(de);
+ if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
+ goto werr;
+ }
+ dictReleaseIterator(di);
+ }
+
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@@ -1395,6 +1444,45 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
+ } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
+ o = createStreamObject();
+ stream *s = o->ptr;
+ uint64_t listpacks = rdbLoadLen(rdb,NULL);
+
+ while(listpacks--) {
+ /* Get the master ID, the one we'll use as key of the radix tree
+ * node: the entries inside the listpack itself are delta-encoded
+ * relatively to this ID. */
+ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
+ if (sdslen(nodekey) != sizeof(streamID)) {
+ rdbExitReportCorruptRDB("Stream node key entry is not the "
+ "size of a stream ID");
+ }
+
+ /* Load the listpack. */
+ unsigned char *lp =
+ rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
+ if (lp == NULL) return NULL;
+ unsigned char *first = lpFirst(lp);
+ if (first == NULL) {
+ /* Serialized listpacks should never be empty, since on
+ * deletion we should remove the radix tree key if the
+ * resulting listpack is emtpy. */
+ rdbExitReportCorruptRDB("Empty listpack inside stream");
+ }
+
+ /* Insert the key in the radix tree. */
+ int retval = raxInsert(s->rax,
+ (unsigned char*)nodekey,sizeof(streamID),lp,NULL);
+ sdsfree(nodekey);
+ if (!retval)
+ rdbExitReportCorruptRDB("Listpack re-added with existing key");
+ }
+ /* Load total number of items inside the stream. */
+ s->length = rdbLoadLen(rdb,NULL);
+ /* Load the last entry ID. */
+ s->last_id.ms = rdbLoadLen(rdb,NULL);
+ s->last_id.seq = rdbLoadLen(rdb,NULL);
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
@@ -1589,6 +1677,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
}
} else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
+ } else if (!strcasecmp(auxkey->ptr,"lua")) {
+ /* Load the script back in memory. */
+ if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {
+ rdbExitReportCorruptRDB(
+ "Can't load Lua script from RDB file! "
+ "BODY: %s", auxval->ptr);
+ }
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
diff --git a/src/rdb.h b/src/rdb.h
index 62a13f444..ecb066fb0 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -69,8 +69,9 @@
#define RDB_ENC_INT32 2 /* 32 bit signed integer */
#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */
-/* Dup object types to RDB object types. Only reason is readability (are we
- * dealing with RDB types or with in-memory object types?). */
+/* Map object types to RDB object types. Macros starting with OBJ_ are for
+ * memory storage and may change. Instead RDB types must be fixed because
+ * we store them on disk. */
#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST 1
#define RDB_TYPE_SET 2
@@ -89,10 +90,11 @@
#define RDB_TYPE_ZSET_ZIPLIST 12
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
+#define RDB_TYPE_STREAM_LISTPACKS 15
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
-#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14))
+#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_AUX 250
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index 4027536e5..71ac50d03 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -193,12 +193,12 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
rdbCheckError("Wrong signature trying to load DB from file");
- return 1;
+ goto err;
}
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
rdbCheckError("Can't handle RDB format version %d",rdbver);
- return 1;
+ goto err;
}
startLoading(fp);
@@ -270,7 +270,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
} else {
if (!rdbIsObjectType(type)) {
rdbCheckError("Invalid object type: %d", type);
- return 1;
+ goto err;
}
rdbstate.key_type = type;
}
@@ -307,6 +307,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
rdbCheckInfo("RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
rdbCheckError("RDB CRC error");
+ goto err;
} else {
rdbCheckInfo("Checksum OK");
}
@@ -321,6 +322,8 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
} else {
rdbCheckError("Unexpected EOF reading RDB file");
}
+err:
+ if (closefile) fclose(fp);
return 1;
}
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 84eabf391..1f80bc615 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -107,6 +107,7 @@ static struct config {
char *pattern;
char *rdb_filename;
int bigkeys;
+ int hotkeys;
int stdinarg; /* get last arg from stdin. (-x option) */
char *auth;
int output; /* output mode, see OUTPUT_* defines */
@@ -710,7 +711,7 @@ int isColorTerm(void) {
return t != NULL && strstr(t,"xterm") != NULL;
}
-/* Helpe function for sdsCatColorizedLdbReply() appending colorize strings
+/* Helper function for sdsCatColorizedLdbReply() appending colorize strings
* to an SDS string. */
sds sdscatcolor(sds o, char *s, size_t len, char *color) {
if (!isColorTerm()) return sdscatlen(o,s,len);
@@ -1129,6 +1130,8 @@ static int parseOptions(int argc, char **argv) {
config.pipe_timeout = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--bigkeys")) {
config.bigkeys = 1;
+ } else if (!strcmp(argv[i],"--hotkeys")) {
+ config.hotkeys = 1;
} else if (!strcmp(argv[i],"--eval") && !lastarg) {
config.eval = argv[++i];
} else if (!strcmp(argv[i],"--ldb")) {
@@ -1229,6 +1232,8 @@ static void usage(void) {
" no reply is received within <n> seconds.\n"
" Default timeout: %d. Use 0 to wait forever.\n"
" --bigkeys Sample Redis keys looking for big keys.\n"
+" --hotkeys Sample Redis keys looking for hot keys.\n"
+" only works when maxmemory-policy is *lfu.\n"
" --scan List all keys using the SCAN command.\n"
" --pattern <pat> Useful with --scan to specify a SCAN pattern.\n"
" --intrinsic-latency <sec> Run a test to measure intrinsic system latency.\n"
@@ -2069,7 +2074,8 @@ static void pipeMode(void) {
#define TYPE_SET 2
#define TYPE_HASH 3
#define TYPE_ZSET 4
-#define TYPE_NONE 5
+#define TYPE_STREAM 5
+#define TYPE_NONE 6
static redisReply *sendScan(unsigned long long *it) {
redisReply *reply = redisCommand(context, "SCAN %llu", *it);
@@ -2128,6 +2134,8 @@ static int toIntType(char *key, char *type) {
return TYPE_HASH;
} else if(!strcmp(type, "zset")) {
return TYPE_ZSET;
+ } else if(!strcmp(type, "stream")) {
+ return TYPE_STREAM;
} else if(!strcmp(type, "none")) {
return TYPE_NONE;
} else {
@@ -2216,7 +2224,7 @@ static void findBigKeys(void) {
unsigned long long biggest[5] = {0}, counts[5] = {0}, totalsize[5] = {0};
unsigned long long sampled = 0, total_keys, totlen=0, *sizes=NULL, it=0;
sds maxkeys[5] = {0};
- char *typename[] = {"string","list","set","hash","zset"};
+ char *typename[] = {"string","list","set","hash","zset","stream"};
char *typeunit[] = {"bytes","items","members","fields","members"};
redisReply *reply, *keys;
unsigned int arrsize=0, i;
@@ -2343,6 +2351,129 @@ static void findBigKeys(void) {
exit(0);
}
+static void getKeyFreqs(redisReply *keys, unsigned long long *freqs) {
+ redisReply *reply;
+ unsigned int i;
+
+ /* Pipeline OBJECT freq commands */
+ for(i=0;i<keys->elements;i++) {
+ redisAppendCommand(context, "OBJECT freq %s", keys->element[i]->str);
+ }
+
+ /* Retrieve freqs */
+ for(i=0;i<keys->elements;i++) {
+ if(redisGetReply(context, (void**)&reply)!=REDIS_OK) {
+ fprintf(stderr, "Error getting freq for key '%s' (%d: %s)\n",
+ keys->element[i]->str, context->err, context->errstr);
+ exit(1);
+ } else if(reply->type != REDIS_REPLY_INTEGER) {
+ if(reply->type == REDIS_REPLY_ERROR) {
+ fprintf(stderr, "Error: %s\n", reply->str);
+ exit(1);
+ } else {
+ fprintf(stderr, "Warning: OBJECT freq on '%s' failed (may have been deleted)\n", keys->element[i]->str);
+ freqs[i] = 0;
+ }
+ } else {
+ freqs[i] = reply->integer;
+ }
+ freeReplyObject(reply);
+ }
+}
+
+#define HOTKEYS_SAMPLE 16
+static void findHotKeys(void) {
+ redisReply *keys, *reply;
+ unsigned long long counters[HOTKEYS_SAMPLE] = {0};
+ sds hotkeys[HOTKEYS_SAMPLE] = {NULL};
+ unsigned long long sampled = 0, total_keys, *freqs = NULL, it = 0;
+ unsigned int arrsize = 0, i, k;
+ double pct;
+
+ /* Total keys pre scanning */
+ total_keys = getDbSize();
+
+ /* Status message */
+ printf("\n# Scanning the entire keyspace to find hot keys as well as\n");
+ printf("# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec\n");
+ printf("# per 100 SCAN commands (not usually needed).\n\n");
+
+ /* SCAN loop */
+ do {
+ /* Calculate approximate percentage completion */
+ pct = 100 * (double)sampled/total_keys;
+
+ /* Grab some keys and point to the keys array */
+ reply = sendScan(&it);
+ keys = reply->element[1];
+
+ /* Reallocate our freqs array if we need to */
+ if(keys->elements > arrsize) {
+ freqs = zrealloc(freqs, sizeof(unsigned long long)*keys->elements);
+
+ if(!freqs) {
+ fprintf(stderr, "Failed to allocate storage for keys!\n");
+ exit(1);
+ }
+
+ arrsize = keys->elements;
+ }
+
+ getKeyFreqs(keys, freqs);
+
+ /* Now update our stats */
+ for(i=0;i<keys->elements;i++) {
+ sampled++;
+ /* Update overall progress */
+ if(sampled % 1000000 == 0) {
+ printf("[%05.2f%%] Sampled %llu keys so far\n", pct, sampled);
+ }
+
+ /* Use eviction pool here */
+ k = 0;
+ while (k < HOTKEYS_SAMPLE && freqs[i] > counters[k]) k++;
+ if (k == 0) continue;
+ k--;
+ if (k == 0 || counters[k] == 0) {
+ sdsfree(hotkeys[k]);
+ } else {
+ sdsfree(hotkeys[0]);
+ memmove(counters,counters+1,sizeof(counters[0])*k);
+ memmove(hotkeys,hotkeys+1,sizeof(hotkeys[0])*k);
+ }
+ counters[k] = freqs[i];
+ hotkeys[k] = sdsnew(keys->element[i]->str);
+ printf(
+ "[%05.2f%%] Hot key '%s' found so far with counter %llu\n",
+ pct, keys->element[i]->str, freqs[i]);
+ }
+
+ /* Sleep if we've been directed to do so */
+ if(sampled && (sampled %100) == 0 && config.interval) {
+ usleep(config.interval);
+ }
+
+ freeReplyObject(reply);
+ } while(it != 0);
+
+ if (freqs) zfree(freqs);
+
+ /* We're done */
+ printf("\n-------- summary -------\n\n");
+
+ printf("Sampled %llu keys in the keyspace!\n", sampled);
+
+ for (i=1; i<= HOTKEYS_SAMPLE; i++) {
+ k = HOTKEYS_SAMPLE - i;
+ if(counters[k]>0) {
+ printf("hot key found with counter: %llu\tkeyname: %s\n", counters[k], hotkeys[k]);
+ sdsfree(hotkeys[k]);
+ }
+ }
+
+ exit(0);
+}
+
/*------------------------------------------------------------------------------
* Stats mode
*--------------------------------------------------------------------------- */
@@ -2453,7 +2584,7 @@ static void statMode(void) {
sprintf(buf,"%ld",aux);
printf("%-8s",buf);
- /* Requets */
+ /* Requests */
aux = getLongInfoField(reply->str,"total_commands_processed");
sprintf(buf,"%ld (+%ld)",aux,requests == 0 ? 0 : aux-requests);
printf("%-19s",buf);
@@ -2720,6 +2851,7 @@ int main(int argc, char **argv) {
config.pipe_mode = 0;
config.pipe_timeout = REDIS_CLI_DEFAULT_PIPE_TIMEOUT;
config.bigkeys = 0;
+ config.hotkeys = 0;
config.stdinarg = 0;
config.auth = NULL;
config.eval = NULL;
@@ -2780,6 +2912,12 @@ int main(int argc, char **argv) {
findBigKeys();
}
+ /* Find hot keys */
+ if (config.hotkeys) {
+ if (cliConnect(0) == REDIS_ERR) exit(1);
+ findHotKeys();
+ }
+
/* Stat mode */
if (config.stat_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
diff --git a/src/replication.c b/src/replication.c
index cf4db3e3a..064d2bece 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -2205,7 +2205,7 @@ void replicationResurrectCachedMaster(int newfd) {
server.repl_state = REPL_STATE_CONNECTED;
/* Re-add to the list of clients. */
- listAddNodeTail(server.clients,server.master);
+ linkClient(server.master);
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
diff --git a/src/scripting.c b/src/scripting.c
index 3c6cc0786..5cbe03e4a 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -1141,18 +1141,38 @@ int redis_math_randomseed (lua_State *L) {
* EVAL and SCRIPT commands implementation
* ------------------------------------------------------------------------- */
-/* Define a lua function with the specified function name and body.
- * The function name musts be a 42 characters long string, since all the
- * functions we defined in the Lua context are in the form:
+/* Define a Lua function with the specified body.
+ * The function name will be generated in the following form:
*
* f_<hex sha1 sum>
*
- * On success C_OK is returned, and nothing is left on the Lua stack.
- * On error C_ERR is returned and an appropriate error is set in the
- * client context. */
-int luaCreateFunction(client *c, lua_State *lua, char *funcname, robj *body) {
- sds funcdef = sdsempty();
+ * The function increments the reference count of the 'body' object as a
+ * side effect of a successful call.
+ *
+ * On success a pointer to an SDS string representing the function SHA1 of the
+ * just added function is returned (and will be valid until the next call
+ * to scriptingReset() function), otherwise NULL is returned.
+ *
+ * The function handles the fact of being called with a script that already
+ * exists, and in such a case, it behaves like in the success case.
+ *
+ * If 'c' is not NULL, on error the client is informed with an appropriate
+ * error describing the nature of the problem and the Lua interpreter error. */
+sds luaCreateFunction(client *c, lua_State *lua, robj *body) {
+ char funcname[43];
+ dictEntry *de;
+
+ funcname[0] = 'f';
+ funcname[1] = '_';
+ sha1hex(funcname+2,body->ptr,sdslen(body->ptr));
+
+ sds sha = sdsnewlen(funcname+2,40);
+ if ((de = dictFind(server.lua_scripts,sha)) != NULL) {
+ sdsfree(sha);
+ return dictGetKey(de);
+ }
+ sds funcdef = sdsempty();
funcdef = sdscat(funcdef,"function ");
funcdef = sdscatlen(funcdef,funcname,42);
funcdef = sdscatlen(funcdef,"() ",3);
@@ -1160,30 +1180,35 @@ int luaCreateFunction(client *c, lua_State *lua, char *funcname, robj *body) {
funcdef = sdscatlen(funcdef,"\nend",4);
if (luaL_loadbuffer(lua,funcdef,sdslen(funcdef),"@user_script")) {
- addReplyErrorFormat(c,"Error compiling script (new function): %s\n",
- lua_tostring(lua,-1));
+ if (c != NULL) {
+ addReplyErrorFormat(c,
+ "Error compiling script (new function): %s\n",
+ lua_tostring(lua,-1));
+ }
lua_pop(lua,1);
+ sdsfree(sha);
sdsfree(funcdef);
- return C_ERR;
+ return NULL;
}
sdsfree(funcdef);
+
if (lua_pcall(lua,0,0,0)) {
- addReplyErrorFormat(c,"Error running script (new function): %s\n",
- lua_tostring(lua,-1));
+ if (c != NULL) {
+ addReplyErrorFormat(c,"Error running script (new function): %s\n",
+ lua_tostring(lua,-1));
+ }
lua_pop(lua,1);
- return C_ERR;
+ sdsfree(sha);
+ return NULL;
}
/* We also save a SHA1 -> Original script map in a dictionary
* so that we can replicate / write in the AOF all the
* EVALSHA commands as EVAL using the original script. */
- {
- int retval = dictAdd(server.lua_scripts,
- sdsnewlen(funcname+2,40),body);
- serverAssertWithInfo(c,NULL,retval == DICT_OK);
- incrRefCount(body);
- }
- return C_OK;
+ int retval = dictAdd(server.lua_scripts,sha,body);
+ serverAssertWithInfo(c ? c : server.lua_client,NULL,retval == DICT_OK);
+ incrRefCount(body);
+ return sha;
}
/* This is the Lua script "count" hook that we use to detect scripts timeout. */
@@ -1282,10 +1307,10 @@ void evalGenericCommand(client *c, int evalsha) {
addReply(c, shared.noscripterr);
return;
}
- if (luaCreateFunction(c,lua,funcname,c->argv[1]) == C_ERR) {
+ if (luaCreateFunction(c,lua,c->argv[1]) == NULL) {
lua_pop(lua,1); /* remove the error handler from the stack. */
/* The error is sent to the client by luaCreateFunction()
- * itself when it returns C_ERR. */
+ * itself when it returns NULL. */
return;
}
/* Now the following is guaranteed to return non nil */
@@ -1456,22 +1481,9 @@ void scriptCommand(client *c) {
addReply(c,shared.czero);
}
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"load")) {
- char funcname[43];
- sds sha;
-
- funcname[0] = 'f';
- funcname[1] = '_';
- sha1hex(funcname+2,c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
- sha = sdsnewlen(funcname+2,40);
- if (dictFind(server.lua_scripts,sha) == NULL) {
- if (luaCreateFunction(c,server.lua,funcname,c->argv[2])
- == C_ERR) {
- sdsfree(sha);
- return;
- }
- }
- addReplyBulkCBuffer(c,funcname+2,40);
- sdsfree(sha);
+ sds sha = luaCreateFunction(c,server.lua,c->argv[2]);
+ if (sha == NULL) return; /* The error was sent by luaCreateFunction(). */
+ addReplyBulkCBuffer(c,sha,40);
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
if (server.lua_caller == NULL) {
diff --git a/src/server.c b/src/server.c
index 457cf8f2e..d6f5f9e76 100644
--- a/src/server.c
+++ b/src/server.c
@@ -258,7 +258,7 @@ struct redisCommand redisCommandTable[] = {
{"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
{"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
{"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
- {"debug",debugCommand,-1,"as",0,NULL,0,0,0,0,0},
+ {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
{"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
@@ -302,6 +302,11 @@ struct redisCommand redisCommandTable[] = {
{"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
+ {"xadd",xaddCommand,-5,"wmF",0,NULL,1,1,1,0,0},
+ {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
+ {"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
+ {"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
+ {"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
@@ -547,10 +552,21 @@ dictType objectKeyPointerValueDictType = {
NULL, /* key dup */
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
- dictObjectDestructor, /* key destructor */
+ dictObjectDestructor, /* key destructor */
NULL /* val destructor */
};
+/* Like objectKeyPointerValueDictType(), but values can be destroyed, if
+ * not NULL, calling zfree(). */
+dictType objectKeyHeapPointerValueDictType = {
+ dictEncObjHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictEncObjKeyCompare, /* key compare */
+ dictObjectDestructor, /* key destructor */
+ dictVanillaFree /* val destructor */
+};
+
/* Set dictionary type. Keys are SDS strings, values are ot used. */
dictType setDictType = {
dictSdsHash, /* hash function */
@@ -1411,7 +1427,9 @@ void initServerConfig(void) {
server.active_defrag_running = 0;
server.notify_keyspace_events = 0;
server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
- server.bpop_blocked_clients = 0;
+ server.blocked_clients = 0;
+ memset(server.blocked_clients_by_type,0,
+ sizeof(server.blocked_clients_by_type));
server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
@@ -1549,16 +1567,29 @@ int restartServer(int flags, mstime_t delay) {
/* Check if we still have accesses to the executable that started this
* server instance. */
- if (access(server.executable,X_OK) == -1) return C_ERR;
+ if (access(server.executable,X_OK) == -1) {
+ serverLog(LL_WARNING,"Can't restart: this process has no "
+ "permissions to execute %s", server.executable);
+ return C_ERR;
+ }
/* Config rewriting. */
if (flags & RESTART_SERVER_CONFIG_REWRITE &&
server.configfile &&
- rewriteConfig(server.configfile) == -1) return C_ERR;
+ rewriteConfig(server.configfile) == -1)
+ {
+ serverLog(LL_WARNING,"Can't restart: configuration rewrite process "
+ "failed");
+ return C_ERR;
+ }
/* Perform a proper shutdown. */
if (flags & RESTART_SERVER_GRACEFULLY &&
- prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR;
+ prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
+ {
+ serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
+ return C_ERR;
+ }
/* Close all file descriptors, with the exception of stdin, stdout, strerr
* which are useful if we restart a Redis server which is not daemonized. */
@@ -1570,6 +1601,8 @@ int restartServer(int flags, mstime_t delay) {
/* Execute the server with the original command line. */
if (delay) usleep(delay*1000);
+ zfree(server.exec_argv[0]);
+ server.exec_argv[0] = zstrdup(server.executable);
execve(server.executable,server.exec_argv,environ);
/* If an error occurred here, there is nothing we can do, but exit. */
@@ -2445,8 +2478,9 @@ int processCommand(client *c) {
return C_OK;
}
- /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
- * we are a slave with a broken link with master. */
+ /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
+ * when slave-serve-stale-data is no and we are a slave with a broken
+ * link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
@@ -2490,7 +2524,7 @@ int processCommand(client *c) {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
- handleClientsBlockedOnLists();
+ handleClientsBlockedOnKeys();
}
return C_OK;
}
@@ -2909,7 +2943,7 @@ sds genRedisInfoString(char *section) {
"blocked_clients:%d\r\n",
listLength(server.clients)-listLength(server.slaves),
lol, bib,
- server.bpop_blocked_clients);
+ server.blocked_clients);
}
/* Memory */
diff --git a/src/server.h b/src/server.h
index 9b26221f0..ee3b7df5c 100644
--- a/src/server.h
+++ b/src/server.h
@@ -59,6 +59,7 @@ typedef long long mstime_t; /* millisecond time type. */
#include "anet.h" /* Networking the easy way */
#include "ziplist.h" /* Compact list data structure */
#include "intset.h" /* Compact integer set structure */
+#include "stream.h" /* Stream data type header file. */
#include "version.h" /* Version macro */
#include "util.h" /* Misc functions useful in many places */
#include "latency.h" /* Latency monitor API */
@@ -255,6 +256,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_LIST 1 /* BLPOP & co. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
+#define BLOCKED_STREAM 4 /* XREAD. */
+#define BLOCKED_NUM 5 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -424,7 +427,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define NOTIFY_ZSET (1<<7) /* z */
#define NOTIFY_EXPIRED (1<<8) /* x */
#define NOTIFY_EVICTED (1<<9) /* e */
-#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED) /* A */
+#define NOTIFY_STREAM (1<<10) /* t */
+#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
/* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
@@ -446,11 +450,11 @@ typedef long long mstime_t; /* millisecond time type. */
/* A redis object, that is a type able to hold a string / list / set */
/* The actual Redis Object */
-#define OBJ_STRING 0
-#define OBJ_LIST 1
-#define OBJ_SET 2
-#define OBJ_ZSET 3
-#define OBJ_HASH 4
+#define OBJ_STRING 0 /* String object. */
+#define OBJ_LIST 1 /* List object. */
+#define OBJ_SET 2 /* Set object. */
+#define OBJ_ZSET 3 /* Sorted set object. */
+#define OBJ_HASH 4 /* Hash object. */
/* The "module" object type is a special one that signals that the object
* is one directly managed by a Redis module. In this case the value points
@@ -463,7 +467,8 @@ typedef long long mstime_t; /* millisecond time type. */
* by a 64 bit module type ID, which has a 54 bits module-specific signature
* in order to dispatch the loading to the right module, plus a 10 bits
* encoding version. */
-#define OBJ_MODULE 5
+#define OBJ_MODULE 5 /* Module object. */
+#define OBJ_STREAM 6 /* Stream object. */
/* Extract encver / signature from a module type ID. */
#define REDISMODULE_TYPE_ENCVER_BITS 10
@@ -575,6 +580,7 @@ typedef struct RedisModuleDigest {
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
+#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
@@ -586,7 +592,7 @@ typedef struct redisObject {
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
- * and most significant 16 bits decreas time). */
+ * and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
@@ -638,12 +644,17 @@ typedef struct blockingState {
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */
- /* BLOCKED_LIST */
+ /* BLOCKED_LIST and BLOCKED_STREAM */
dict *keys; /* The keys we are waiting to terminate a blocking
- * operation such as BLPOP. Otherwise NULL. */
+ * operation such as BLPOP or XREAD. Or NULL. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
+ /* BLOCK_STREAM */
+ size_t xread_count; /* XREAD COUNT option. */
+ robj *xread_group; /* XREAD group name. */
+ mstime_t xread_retry_time, xread_retry_ttl;
+
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
@@ -722,6 +733,7 @@ typedef struct client {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
+ listNode *client_list_node; /* list node in client list */
/* Response buffer */
int bufpos;
@@ -1118,10 +1130,11 @@ struct redisServer {
unsigned long long maxmemory; /* Max number of memory bytes to use */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Pricision of random sampling */
- unsigned int lfu_log_factor; /* LFU logarithmic counter factor. */
- unsigned int lfu_decay_time; /* LFU counter decay factor. */
+ int lfu_log_factor; /* LFU logarithmic counter factor. */
+ int lfu_decay_time; /* LFU counter decay factor. */
/* Blocked clients */
- unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
+ unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
+ unsigned int blocked_clients_by_type[BLOCKED_NUM];
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
@@ -1288,6 +1301,7 @@ typedef struct {
extern struct redisServer server;
extern struct sharedObjectsStruct shared;
extern dictType objectKeyPointerValueDictType;
+extern dictType objectKeyHeapPointerValueDictType;
extern dictType setDictType;
extern dictType zsetDictType;
extern dictType clusterNodesDictType;
@@ -1386,6 +1400,7 @@ int handleClientsWithPendingWrites(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
+void linkClient(client *c);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
@@ -1411,9 +1426,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
void listTypeConvert(robj *subject, int enc);
void unblockClientWaitingData(client *c);
-void handleClientsBlockedOnLists(void);
void popGenericCommand(client *c, int where);
-void signalListAsReady(redisDb *db, robj *key);
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);
@@ -1456,6 +1469,7 @@ robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
robj *createZsetZiplistObject(void);
+robj *createStreamObject(void);
robj *createModuleObject(moduleType *mt, void *value);
int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
int checkType(client *c, robj *o, int type);
@@ -1755,6 +1769,7 @@ int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
+int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
/* Cluster */
void clusterInit(void);
@@ -1782,6 +1797,7 @@ void scriptingInit(int setup);
int ldbRemoveChild(pid_t pid);
void ldbKillForkedSessions(void);
int ldbPendingChildren(void);
+sds luaCreateFunction(client *c, lua_State *lua, robj *body);
/* Blocked clients */
void processUnblockedClients(void);
@@ -1790,6 +1806,9 @@ void unblockClient(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
+void handleClientsBlockedOnKeys(void);
+void signalKeyAsReady(redisDb *db, robj *key);
+void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);
@@ -1803,6 +1822,7 @@ void evictionPoolAlloc(void);
#define LFU_INIT_VAL 5
unsigned long LFUGetTimeInMinutes(void);
uint8_t LFULogIncr(uint8_t value);
+unsigned long LFUDecrAndReturn(robj *o);
/* Keys hashing / comparison functions for dict.c hash tables. */
uint64_t dictSdsHash(const void *key);
@@ -1991,6 +2011,11 @@ void pfdebugCommand(client *c);
void latencyCommand(client *c);
void moduleCommand(client *c);
void securityWarningCommand(client *c);
+void xaddCommand(client *c);
+void xrangeCommand(client *c);
+void xrevrangeCommand(client *c);
+void xlenCommand(client *c);
+void xreadCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/src/setproctitle.c b/src/setproctitle.c
index f44253e16..6563242de 100644
--- a/src/setproctitle.c
+++ b/src/setproctitle.c
@@ -39,7 +39,11 @@
#include <errno.h> /* errno program_invocation_name program_invocation_short_name */
#if !defined(HAVE_SETPROCTITLE)
-#define HAVE_SETPROCTITLE (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
+#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
+#define HAVE_SETPROCTITLE 1
+#else
+#define HAVE_SETPROCTITLE 0
+#endif
#endif
diff --git a/src/stream.h b/src/stream.h
new file mode 100644
index 000000000..214b6d9a5
--- /dev/null
+++ b/src/stream.h
@@ -0,0 +1,59 @@
+#ifndef STREAM_H
+#define STREAM_H
+
+#include "rax.h"
+#include "listpack.h"
+
+/* Stream item ID: a 128 bit number composed of a milliseconds time and
+ * a sequence counter. IDs generated in the same millisecond (or in a past
+ * millisecond if the clock jumped backward) will use the millisecond time
+ * of the latest generated ID and an incremented sequence. */
+typedef struct streamID {
+ uint64_t ms; /* Unix time in milliseconds. */
+ uint64_t seq; /* Sequence number. */
+} streamID;
+
+typedef struct stream {
+ rax *rax; /* The radix tree holding the stream. */
+ uint64_t length; /* Number of elements inside this stream. */
+ streamID last_id; /* Zero if there are yet no items. */
+} stream;
+
+/* We define an iterator to iterate stream items in an abstract way, without
+ * caring about the radix tree + listpack representation. Technically speaking
+ * the iterator is only used inside streamReplyWithRange(), so could just
+ * be implemented inside the function, but practically there is the AOF
+ * rewriting code that also needs to iterate the stream to emit the XADD
+ * commands. */
+typedef struct streamIterator {
+ streamID master_id; /* ID of the master entry at listpack head. */
+ uint64_t master_fields_count; /* Master entries # of fields. */
+ unsigned char *master_fields_start; /* Master entries start in listpack. */
+ unsigned char *master_fields_ptr; /* Master field to emit next. */
+ int entry_flags; /* Flags of entry we are emitting. */
+ int rev; /* True if iterating end to start (reverse). */
+ uint64_t start_key[2]; /* Start key as 128 bit big endian. */
+ uint64_t end_key[2]; /* End key as 128 bit big endian. */
+ raxIterator ri; /* Rax iterator. */
+ unsigned char *lp; /* Current listpack. */
+ unsigned char *lp_ele; /* Current listpack cursor. */
+ /* Buffers used to hold the string of lpGet() when the element is
+ * integer encoded, so that there is no string representation of the
+ * element inside the listpack itself. */
+ unsigned char field_buf[LP_INTBUF_SIZE];
+ unsigned char value_buf[LP_INTBUF_SIZE];
+} streamIterator;
+
+/* Prototypes of exported APIs. */
+
+struct client;
+
+stream *streamNew(void);
+void freeStream(stream *s);
+size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev);
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
+int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
+void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
+void streamIteratorStop(streamIterator *si);
+
+#endif
diff --git a/src/t_hash.c b/src/t_hash.c
index 700a6233a..be73932c5 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -287,8 +287,8 @@ int hashTypeDelete(robj *o, sds field) {
if (fptr != NULL) {
fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
- zl = ziplistDelete(zl,&fptr);
- zl = ziplistDelete(zl,&fptr);
+ zl = ziplistDelete(zl,&fptr); /* Delete the key. */
+ zl = ziplistDelete(zl,&fptr); /* Delete the value. */
o->ptr = zl;
deleted = 1;
}
diff --git a/src/t_list.c b/src/t_list.c
index a0a30998d..c7e6aac00 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -603,119 +603,6 @@ void rpoplpushCommand(client *c) {
* Blocking POP operations
*----------------------------------------------------------------------------*/
-/* This is how the current blocking POP works, we use BLPOP as example:
- * - If the user calls BLPOP and the key exists and contains a non empty list
- * then LPOP is called instead. So BLPOP is semantically the same as LPOP
- * if blocking is not required.
- * - If instead BLPOP is called and the key does not exists or the list is
- * empty we need to block. In order to do so we remove the notification for
- * new data to read in the client socket (so that we'll not serve new
- * requests if the blocking request is not served). Also we put the client
- * in a dictionary (db->blocking_keys) mapping keys to a list of clients
- * blocking for this keys.
- * - If a PUSH operation against a key with blocked clients waiting is
- * performed, we mark this key as "ready", and after the current command,
- * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
- * for this list, from the one that blocked first, to the last, accordingly
- * to the number of elements we have in the ready list.
- */
-
-/* Set a client in blocking mode for the specified key, with the specified
- * timeout */
-void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
- dictEntry *de;
- list *l;
- int j;
-
- c->bpop.timeout = timeout;
- c->bpop.target = target;
-
- if (target != NULL) incrRefCount(target);
-
- for (j = 0; j < numkeys; j++) {
- /* If the key already exists in the dict ignore it. */
- if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
- incrRefCount(keys[j]);
-
- /* And in the other "side", to map keys -> clients */
- de = dictFind(c->db->blocking_keys,keys[j]);
- if (de == NULL) {
- int retval;
-
- /* For every key we take a list of clients blocked for it */
- l = listCreate();
- retval = dictAdd(c->db->blocking_keys,keys[j],l);
- incrRefCount(keys[j]);
- serverAssertWithInfo(c,keys[j],retval == DICT_OK);
- } else {
- l = dictGetVal(de);
- }
- listAddNodeTail(l,c);
- }
- blockClient(c,BLOCKED_LIST);
-}
-
-/* Unblock a client that's waiting in a blocking operation such as BLPOP.
- * You should never call this function directly, but unblockClient() instead. */
-void unblockClientWaitingData(client *c) {
- dictEntry *de;
- dictIterator *di;
- list *l;
-
- serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
- di = dictGetIterator(c->bpop.keys);
- /* The client may wait for multiple keys, so unblock it for every key. */
- while((de = dictNext(di)) != NULL) {
- robj *key = dictGetKey(de);
-
- /* Remove this client from the list of clients waiting for this key. */
- l = dictFetchValue(c->db->blocking_keys,key);
- serverAssertWithInfo(c,key,l != NULL);
- listDelNode(l,listSearchKey(l,c));
- /* If the list is empty we need to remove it to avoid wasting memory */
- if (listLength(l) == 0)
- dictDelete(c->db->blocking_keys,key);
- }
- dictReleaseIterator(di);
-
- /* Cleanup the client structure */
- dictEmpty(c->bpop.keys,NULL);
- if (c->bpop.target) {
- decrRefCount(c->bpop.target);
- c->bpop.target = NULL;
- }
-}
-
-/* If the specified key has clients blocked waiting for list pushes, this
- * function will put the key reference into the server.ready_keys list.
- * Note that db->ready_keys is a hash table that allows us to avoid putting
- * the same key again and again in the list in case of multiple pushes
- * made by a script or in the context of MULTI/EXEC.
- *
- * The list will be finally processed by handleClientsBlockedOnLists() */
-void signalListAsReady(redisDb *db, robj *key) {
- readyList *rl;
-
- /* No clients blocking for this key? No need to queue it. */
- if (dictFind(db->blocking_keys,key) == NULL) return;
-
- /* Key was already signaled? No need to queue it again. */
- if (dictFind(db->ready_keys,key) != NULL) return;
-
- /* Ok, we need to queue this key into server.ready_keys. */
- rl = zmalloc(sizeof(*rl));
- rl->key = key;
- rl->db = db;
- incrRefCount(key);
- listAddNodeTail(server.ready_keys,rl);
-
- /* We also add the key in the db->ready_keys dictionary in order
- * to avoid adding it multiple times into a list with a simple O(1)
- * check. */
- incrRefCount(key);
- serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
-}
-
/* This is a helper function for handleClientsBlockedOnLists(). It's work
* is to serve a specific client (receiver) that is blocked on 'key'
* in the context of the specified 'db', doing the following:
@@ -785,97 +672,6 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
return C_OK;
}
-/* This function should be called by Redis every time a single command,
- * a MULTI/EXEC block, or a Lua script, terminated its execution after
- * being called by a client.
- *
- * All the keys with at least one client blocked that received at least
- * one new element via some PUSH operation are accumulated into
- * the server.ready_keys list. This function will run the list and will
- * serve clients accordingly. Note that the function will iterate again and
- * again as a result of serving BRPOPLPUSH we can have new blocking clients
- * to serve because of the PUSH side of BRPOPLPUSH. */
-void handleClientsBlockedOnLists(void) {
- while(listLength(server.ready_keys) != 0) {
- list *l;
-
- /* Point server.ready_keys to a fresh list and save the current one
- * locally. This way as we run the old list we are free to call
- * signalListAsReady() that may push new elements in server.ready_keys
- * when handling clients blocked into BRPOPLPUSH. */
- l = server.ready_keys;
- server.ready_keys = listCreate();
-
- while(listLength(l) != 0) {
- listNode *ln = listFirst(l);
- readyList *rl = ln->value;
-
- /* First of all remove this key from db->ready_keys so that
- * we can safely call signalListAsReady() against this key. */
- dictDelete(rl->db->ready_keys,rl->key);
-
- /* If the key exists and it's a list, serve blocked clients
- * with data. */
- robj *o = lookupKeyWrite(rl->db,rl->key);
- if (o != NULL && o->type == OBJ_LIST) {
- dictEntry *de;
-
- /* We serve clients in the same order they blocked for
- * this key, from the first blocked to the last. */
- de = dictFind(rl->db->blocking_keys,rl->key);
- if (de) {
- list *clients = dictGetVal(de);
- int numclients = listLength(clients);
-
- while(numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
- robj *dstkey = receiver->bpop.target;
- int where = (receiver->lastcmd &&
- receiver->lastcmd->proc == blpopCommand) ?
- LIST_HEAD : LIST_TAIL;
- robj *value = listTypePop(o,where);
-
- if (value) {
- /* Protect receiver->bpop.target, that will be
- * freed by the next unblockClient()
- * call. */
- if (dstkey) incrRefCount(dstkey);
- unblockClient(receiver);
-
- if (serveClientBlockedOnList(receiver,
- rl->key,dstkey,rl->db,value,
- where) == C_ERR)
- {
- /* If we failed serving the client we need
- * to also undo the POP operation. */
- listTypePush(o,value,where);
- }
-
- if (dstkey) decrRefCount(dstkey);
- decrRefCount(value);
- } else {
- break;
- }
- }
- }
-
- if (listTypeLength(o) == 0) {
- dbDelete(rl->db,rl->key);
- }
- /* We don't call signalModifiedKey() as it was already called
- * when an element was pushed on the list. */
- }
-
- /* Free this item. */
- decrRefCount(rl->key);
- zfree(rl);
- listDelNode(l,ln);
- }
- listRelease(l); /* We have the new list on place at this point. */
- }
-}
-
/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
robj *o;
@@ -930,7 +726,7 @@ void blockingPopGenericCommand(client *c, int where) {
}
/* If the list is empty or the key does not exists we must block */
- blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
+ blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}
void blpopCommand(client *c) {
@@ -956,7 +752,7 @@ void brpoplpushCommand(client *c) {
addReply(c, shared.nullbulk);
} else {
/* The list is empty and the client blocks. */
- blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
+ blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
}
} else {
if (key->type != OBJ_LIST) {
diff --git a/src/t_set.c b/src/t_set.c
index d5a801e11..8f21f71b1 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -407,7 +407,7 @@ void spopWithCountCommand(client *c) {
/* Get the count argument */
if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
if (l >= 0) {
- count = (unsigned) l;
+ count = (unsigned long) l;
} else {
addReply(c,shared.outofrangeerr);
return;
@@ -626,7 +626,7 @@ void srandmemberWithCountCommand(client *c) {
if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
if (l >= 0) {
- count = (unsigned) l;
+ count = (unsigned long) l;
} else {
/* A negative count means: return the same elements multiple times
* (i.e. don't remove the extracted element after every extraction). */
@@ -774,15 +774,21 @@ void srandmemberCommand(client *c) {
}
int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
- return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2);
+ if (setTypeSize(*(robj**)s1) > setTypeSize(*(robj**)s2)) return 1;
+ if (setTypeSize(*(robj**)s1) < setTypeSize(*(robj**)s2)) return -1;
+ return 0;
}
/* This is used by SDIFF and in this case we can receive NULL that should
* be handled as empty sets. */
int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;
+ unsigned long first = o1 ? setTypeSize(o1) : 0;
+ unsigned long second = o2 ? setTypeSize(o2) : 0;
- return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0);
+ if (first < second) return 1;
+ if (first > second) return -1;
+ return 0;
}
void sinterGenericCommand(client *c, robj **setkeys,
diff --git a/src/t_stream.c b/src/t_stream.c
new file mode 100644
index 000000000..da9939588
--- /dev/null
+++ b/src/t_stream.c
@@ -0,0 +1,1053 @@
+/*
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "endianconv.h"
+#include "stream.h"
+
+#define STREAM_BYTES_PER_LISTPACK 2048
+
+/* Every stream item inside the listpack, has a flags field that is used to
+ * mark the entry as deleted, or having the same field as the "master"
+ * entry at the start of the listpack> */
+#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
+#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
+#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
+
+/* -----------------------------------------------------------------------
+ * Low level stream encoding: a radix tree of listpacks.
+ * ----------------------------------------------------------------------- */
+
+/* Create a new stream data structure. */
+stream *streamNew(void) {
+ stream *s = zmalloc(sizeof(*s));
+ s->rax = raxNew();
+ s->length = 0;
+ s->last_id.ms = 0;
+ s->last_id.seq = 0;
+ return s;
+}
+
+/* Free a stream, including the listpacks stored inside the radix tree. */
+void freeStream(stream *s) {
+ raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
+ zfree(s);
+}
+
+/* Generate the next stream item ID given the previous one. If the current
+ * milliseconds Unix time is greater than the previous one, just use this
+ * as time part and start with sequence part of zero. Otherwise we use the
+ * previous time (and never go backward) and increment the sequence. */
+void streamNextID(streamID *last_id, streamID *new_id) {
+ uint64_t ms = mstime();
+ if (ms > last_id->ms) {
+ new_id->ms = ms;
+ new_id->seq = 0;
+ } else {
+ new_id->ms = last_id->ms;
+ new_id->seq = last_id->seq+1;
+ }
+}
+
+/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
+ * instead of a string. */
+unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
+ char buf[LONG_STR_SIZE];
+ int slen = ll2string(buf,sizeof(buf),value);
+ return lpAppend(lp,(unsigned char*)buf,slen);
+}
+
+/* This is just a wrapper for lpReplace() to directly use a 64 bit integer
+ * instead of a string to replace the current element. The function returns
+ * the new listpack as return value, and also updates the current cursor
+ * by updating '*pos'. */
+unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) {
+ char buf[LONG_STR_SIZE];
+ int slen = ll2string(buf,sizeof(buf),value);
+ return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE, pos);
+}
+
+/* This is a wrapper function for lpGet() to directly get an integer value
+ * from the listpack (that may store numbers as a string), converting
+ * the string if needed. */
+int64_t lpGetInteger(unsigned char *ele) {
+ int64_t v;
+ unsigned char *e = lpGet(ele,&v,NULL);
+ if (e == NULL) return v;
+ /* The following code path should never be used for how listpacks work:
+ * they should always be able to store an int64_t value in integer
+ * encoded form. However the implementation may change. */
+ long long ll;
+ int retval = string2ll((char*)e,v,&ll);
+ serverAssert(retval != 0);
+ v = ll;
+ return v;
+}
+
+/* Debugging function to log the full content of a listpack. Useful
+ * for development and debugging. */
+void streamLogListpackContent(unsigned char *lp) {
+ unsigned char *p = lpFirst(lp);
+ while(p) {
+ unsigned char buf[LP_INTBUF_SIZE];
+ int64_t v;
+ unsigned char *ele = lpGet(p,&v,buf);
+ serverLog(LL_WARNING,"- [%d] '%.*s'", (int)v, (int)v, ele);
+ p = lpNext(lp,p);
+ }
+}
+
+/* Convert the specified stream entry ID as a 128 bit big endian number, so
+ * that the IDs can be sorted lexicographically. */
+void streamEncodeID(void *buf, streamID *id) {
+ uint64_t e[2];
+ e[0] = htonu64(id->ms);
+ e[1] = htonu64(id->seq);
+ memcpy(buf,e,sizeof(e));
+}
+
+/* This is the reverse of streamEncodeID(): the decoded ID will be stored
+ * in the 'id' structure passed by reference. The buffer 'buf' must point
+ * to a 128 bit big-endian encoded ID. */
+void streamDecodeID(void *buf, streamID *id) {
+ uint64_t e[2];
+ memcpy(e,buf,sizeof(e));
+ id->ms = ntohu64(e[0]);
+ id->seq = ntohu64(e[1]);
+}
+
+/* Adds a new item into the stream 's' having the specified number of
+ * field-value pairs as specified in 'numfields' and stored into 'argv'.
+ * Returns the new entry ID populating the 'added_id' structure.
+ *
+ * If 'use_id' is not NULL, the ID is not auto-generated by the function,
+ * but instead the passed ID is uesd to add the new entry. In this case
+ * adding the entry may fail as specified later in this comment.
+ *
+ * The function returns C_OK if the item was added, this is always true
+ * if the ID was generated by the function. However the function may return
+ * C_ERR if an ID was given via 'use_id', but adding it failed since the
+ * current top ID is greater or equal. */
+int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamID *use_id) {
+ /* If an ID was given, check that it's greater than the last entry ID
+ * or return an error. */
+ if (use_id && (use_id->ms < s->last_id.ms ||
+ (use_id->ms == s->last_id.ms &&
+ use_id->seq <= s->last_id.seq))) return C_ERR;
+
+ /* Add the new entry. */
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+ raxSeek(&ri,"$",NULL,0);
+
+ size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
+ unsigned char *lp = NULL; /* Tail listpack pointer. */
+
+ /* Get a reference to the tail node listpack. */
+ if (raxNext(&ri)) {
+ lp = ri.data;
+ lp_bytes = lpBytes(lp);
+ }
+ raxStop(&ri);
+
+ /* Generate the new entry ID. */
+ streamID id;
+ if (use_id)
+ id = *use_id;
+ else
+ streamNextID(&s->last_id,&id);
+
+ /* We have to add the key into the radix tree in lexicographic order,
+ * to do so we consider the ID as a single 128 bit number written in
+ * big endian, so that the most significant bytes are the first ones. */
+ uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
+ streamID master_id; /* ID of the master entry in the listpack. */
+
+ /* Create a new listpack and radix tree node if needed. Note that when
+ * a new listpack is created, we populate it with a "master entry". This
+ * is just a set of fields that is taken as refernce in order to compress
+ * the stream entries that we'll add inside the listpack.
+ *
+ * Note that while we use the first added entry fields to create
+ * the master entry, the first added entry is NOT represented in the master
+ * entry, which is a stand alone object. But of course, the first entry
+ * will compress well because it's used as reference.
+ *
+ * The master entry is composed like in the following example:
+ *
+ * +-------+---------+------------+---------+--/--+---------+---------+-+
+ * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
+ * +-------+---------+------------+---------+--/--+---------+---------+-+
+ *
+ * count and deleted just represent respectively the total number of
+ * entires inside the listpack that are valid, and marked as deleted
+ * (delted flag in the entry flags set). So the total number of items
+ * actually inside the listpack (both deleted and not) is count+deleted.
+ *
+ * The real entries will be encoded with an ID that is just the
+ * millisecond and sequence difference compared to the key stored at
+ * the radix tree node containing the listpack (delta encoding), and
+ * if the fields of the entry are the same as the master enty fields, the
+ * entry flags will specify this fact and the entry fields and number
+ * of fields will be omitted (see later in the code of this function).
+ *
+ * The "0" entry at the end is the same as the 'lp-count' entry in the
+ * regular stream entries (see below), and marks the fact that there are
+ * no more entires, when we scan the stream from right to left. */
+
+ int flags = STREAM_ITEM_FLAG_NONE;
+ if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
+ master_id = id;
+ streamEncodeID(rax_key,&id);
+ /* Create the listpack having the master entry ID and fields. */
+ lp = lpNew();
+ lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
+ lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
+ lp = lpAppendInteger(lp,numfields);
+ for (int i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr;
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ }
+ lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
+ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ /* The first entry we insert, has obviously the same fields of the
+ * master entry. */
+ flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
+ } else {
+ serverAssert(ri.key_len == sizeof(rax_key));
+ memcpy(rax_key,ri.key,sizeof(rax_key));
+
+ /* Read the master ID from the radix tree key. */
+ streamDecodeID(rax_key,&master_id);
+ unsigned char *lp_ele = lpFirst(lp);
+
+ /* Update count and skip the deleted fields. */
+ int64_t count = lpGetInteger(lp_ele);
+ lp = lpReplaceInteger(lp,&lp_ele,count+1);
+ lp_ele = lpNext(lp,lp_ele); /* seek delted. */
+ lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
+
+ /* Check if the entry we are adding, have the same fields
+ * as the master entry. */
+ int master_fields_count = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp,lp_ele);
+ if (numfields == master_fields_count) {
+ int i;
+ for (i = 0; i < master_fields_count; i++) {
+ sds field = argv[i*2]->ptr;
+ int64_t e_len;
+ unsigned char buf[LP_INTBUF_SIZE];
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ /* Stop if there is a mismatch. */
+ if (sdslen(field) != (size_t)e_len ||
+ memcmp(e,field,e_len) != 0) break;
+ lp_ele = lpNext(lp,lp_ele);
+ }
+ /* All fields are the same! We can compress the field names
+ * setting a single bit in the flags. */
+ if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
+ }
+ }
+
+ /* Populate the listpack with the new entry. We use the following
+ * encoding:
+ *
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
+ * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
+ * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
+ *
+ * However if the SAMEFIELD flag is set, we have just to populate
+ * the entry with the values, so it becomes:
+ *
+ * +-----+--------+-------+-/-+-------+--------+
+ * |flags|entry-id|value-1|...|value-N|lp-count|
+ * +-----+--------+-------+-/-+-------+--------+
+ *
+ * The entry-id field is actually two separated fields: the ms
+ * and seq difference compared to the master entry.
+ *
+ * The lp-count field is a number that states the number of listpack pieces
+ * that compose the entry, so that it's possible to travel the entry
+ * in reverse order: we can just start from the end of the listpack, read
+ * the entry, and jump back N times to seek the "flags" field to read
+ * the stream full entry. */
+ lp = lpAppendInteger(lp,flags);
+ lp = lpAppendInteger(lp,id.ms - master_id.ms);
+ lp = lpAppendInteger(lp,id.seq - master_id.seq);
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppendInteger(lp,numfields);
+ for (int i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
+ }
+ /* Compute and store the lp-count field. */
+ int lp_count = numfields;
+ lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
+ if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
+ /* If the item is not compressed, it also has the fields other than
+ * the values, and an additional num-fileds field. */
+ lp_count += numfields+1;
+ }
+ lp = lpAppendInteger(lp,lp_count);
+
+ /* Insert back into the tree in order to update the listpack pointer. */
+ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ s->length++;
+ s->last_id = id;
+ if (added_id) *added_id = id;
+ return C_OK;
+}
+
+/* Trim the stream 's' to have no more than maxlen elements, and return the
+ * number of elements removed from the stream. The 'approx' option, if non-zero,
+ * specifies that the trimming must be performed in a approximated way in
+ * order to maximize performances. This means that the stream may contain
+ * more elements than 'maxlen', and elements are only removed if we can remove
+ * a *whole* node of the radix tree. The elements are removed from the head
+ * of the stream (older elements).
+ *
+ * The function may return zero if:
+ *
+ * 1) The stream is already shorter or equal to the specified max length.
+ * 2) The 'approx' option is true and the head node had not enough elements
+ * to be deleted, leaving the stream with a number of elements >= maxlen.
+ */
+int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
+ if (s->length <= maxlen) return 0;
+
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+ raxSeek(&ri,"^",NULL,0);
+
+ int64_t deleted = 0;
+ while(s->length > maxlen && raxNext(&ri)) {
+ unsigned char *lp = ri.data, *p = lpFirst(lp);
+ int64_t entries = lpGetInteger(p);
+
+ /* Check if we can remove the whole node, and still have at
+ * least maxlen elements. */
+ if (s->length - entries >= maxlen) {
+ lpFree(lp);
+ raxRemove(s->rax,ri.key,ri.key_len,NULL);
+ raxSeek(&ri,">=",ri.key,ri.key_len);
+ s->length -= entries;
+ deleted += entries;
+ continue;
+ }
+
+ /* If we cannot remove a whole element, and approx is true,
+ * stop here. */
+ if (approx) break;
+
+ /* Otherwise, we have to mark single entries inside the listpack
+ * as deleted. We start by updating the entries/deleted counters. */
+ int64_t to_delete = s->length - maxlen;
+ serverAssert(to_delete < entries);
+ lp = lpReplaceInteger(lp,&p,entries-to_delete);
+ p = lpNext(lp,p); /* Seek deleted field. */
+ int64_t marked_deleted = lpGetInteger(p);
+ lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
+ p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
+
+ /* Skip all the master fields. */
+ int64_t master_fields_count = lpGetInteger(p);
+ p = lpNext(lp,p); /* Seek the first field. */
+ for (int64_t j = 0; j < master_fields_count; j++)
+ p = lpNext(lp,p); /* Skip all master fields. */
+ p = lpNext(lp,p); /* Skip the zero master entry terminator. */
+
+ /* 'p' is now pointing to the first entry inside the listpack.
+ * We have to run entry after entry, marking entries as deleted
+ * if they are already not deleted. */
+ while(p) {
+ int flags = lpGetInteger(p);
+ int to_skip;
+
+ /* Mark the entry as deleted. */
+ if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
+ flags |= STREAM_ITEM_FLAG_DELETED;
+ lp = lpReplaceInteger(lp,&p,flags);
+ deleted++;
+ s->length--;
+ if (s->length <= maxlen) break; /* Enough entries deleted. */
+ }
+
+ p = lpNext(lp,p); /* Skip ID ms delta. */
+ p = lpNext(lp,p); /* Skip ID seq delta. */
+ p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ to_skip = master_fields_count;
+ } else {
+ to_skip = lpGetInteger(p);
+ to_skip = 1+(to_skip*2);
+ }
+
+ while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
+ p = lpNext(lp,p); /* Skip the final lp-count field. */
+ }
+
+ /* Here we should perform garbage collection in case at this point
+ * there are too many entries deleted inside the listpack. */
+ entries -= to_delete;
+ marked_deleted += to_delete;
+ if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
+ /* TODO: perform a garbage collection. */
+ }
+
+ break; /* If we are here, there was enough to delete in the current
+ node, so no need to go to the next node. */
+ }
+
+ raxStop(&ri);
+ return deleted;
+}
+
+/* Initialize the stream iterator, so that we can call iterating functions
+ * to get the next items. This requires a corresponding streamIteratorStop()
+ * at the end. The 'rev' parameter controls the direction. If it's zero the
+ * iteration is from the start to the end element (inclusive), otherwise
+ * if rev is non-zero, the iteration is reversed.
+ *
+ * Once the iterator is initalized, we iterate like this:
+ *
+ * streamIterator myiterator;
+ * streamIteratorStart(&myiterator,...);
+ * int64_t numfields;
+ * while(streamIteratorGetID(&myitereator,&ID,&numfields)) {
+ * while(numfields--) {
+ * unsigned char *key, *value;
+ * size_t key_len, value_len;
+ * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
+ *
+ * ... do what you want with key and value ...
+ * }
+ * }
+ * streamIteratorStop(&myiterator); */
+void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
+ /* Intialize the iterator and translates the iteration start/stop
+ * elements into a 128 big big-endian number. */
+ if (start) {
+ streamEncodeID(si->start_key,start);
+ } else {
+ si->start_key[0] = 0;
+ si->start_key[0] = 0;
+ }
+
+ if (end) {
+ streamEncodeID(si->end_key,end);
+ } else {
+ si->end_key[0] = UINT64_MAX;
+ si->end_key[0] = UINT64_MAX;
+ }
+
+ /* Seek the correct node in the radix tree. */
+ raxStart(&si->ri,s->rax);
+ if (!rev) {
+ if (start && (start->ms || start->seq)) {
+ raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
+ sizeof(si->start_key));
+ if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL,0);
+ } else {
+ raxSeek(&si->ri,"^",NULL,0);
+ }
+ } else {
+ if (end && (end->ms || end->seq)) {
+ raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
+ sizeof(si->end_key));
+ if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL,0);
+ } else {
+ raxSeek(&si->ri,"$",NULL,0);
+ }
+ }
+ si->lp = NULL; /* There is no current listpack right now. */
+ si->lp_ele = NULL; /* Current listpack cursor. */
+ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
+}
+
+/* Return 1 and store the current item ID at 'id' if there are still
+ * elements within the iteration range, otherwise return 0 in order to
+ * signal the iteration terminated. */
+int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
+ while(1) { /* Will stop when element > stop_key or end of radix tree. */
+ /* If the current listpack is set to NULL, this is the start of the
+ * iteration or the previous listpack was completely iterated.
+ * Go to the next node. */
+ if (si->lp == NULL || si->lp_ele == NULL) {
+ if (!si->rev && !raxNext(&si->ri)) return 0;
+ else if (si->rev && !raxPrev(&si->ri)) return 0;
+ serverAssert(si->ri.key_len == sizeof(streamID));
+ /* Get the master ID. */
+ streamDecodeID(si->ri.key,&si->master_id);
+ /* Get the master fields count. */
+ si->lp = si->ri.data;
+ si->lp_ele = lpFirst(si->lp); /* Seek items count */
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
+ si->master_fields_count = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
+ si->master_fields_start = si->lp_ele;
+ /* Skip master fileds to seek the first entry. */
+ for (uint64_t i = 0; i < si->master_fields_count; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ /* We are now pointing the zero term of the master entry. If
+ * we are iterating in reverse order, we need to seek the
+ * end of the listpack. */
+ if (si->rev) si->lp_ele = lpLast(si->lp);
+ } else if (si->rev) {
+ /* If we are itereating in the reverse order, and this is not
+ * the first entry emitted for this listpack, then we already
+ * emitted the current entry, and have to go back to the previous
+ * one. */
+ int lp_count = lpGetInteger(si->lp_ele);
+ while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ /* Seek lp-count of prev entry. */
+ si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
+
+ /* For every radix tree node, iterate the corresponding listpack,
+ * returning elements when they are within range. */
+ while(1) {
+ if (!si->rev) {
+ /* If we are going forward, skip the previous entry
+ * lp-count field (or in case of the master entry, the zero
+ * term field) */
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ if (si->lp_ele == NULL) break;
+ } else {
+ /* If we are going backward, read the number of elements this
+ * entry is composed of, and jump backward N times to seek
+ * its start. */
+ int lp_count = lpGetInteger(si->lp_ele);
+ if (lp_count == 0) { /* We reached the master entry. */
+ si->lp = NULL;
+ si->lp_ele = NULL;
+ break;
+ }
+ while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
+
+ /* Get the flags entry. */
+ int flags = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
+
+ /* Get the ID: it is encoded as difference between the master
+ * ID and this entry ID. */
+ *id = si->master_id;
+ id->ms += lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ id->seq += lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ unsigned char buf[sizeof(streamID)];
+ streamEncodeID(buf,id);
+
+ /* The number of entries is here or not depending on the
+ * flags. */
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *numfields = si->master_fields_count;
+ } else {
+ *numfields = lpGetInteger(si->lp_ele);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
+
+ /* If current >= start, and the entry is not marked as
+ * deleted, emit it. */
+ if (!si->rev) {
+ if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
+ !(flags & STREAM_ITEM_FLAG_DELETED))
+ {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
+ return 1; /* Valid item returned. */
+ }
+ } else {
+ if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
+ !(flags & STREAM_ITEM_FLAG_DELETED))
+ {
+ if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
+ return 0; /* We are already out of range. */
+ si->entry_flags = flags;
+ if (flags & STREAM_ITEM_FLAG_SAMEFIELDS)
+ si->master_fields_ptr = si->master_fields_start;
+ return 1; /* Valid item returned. */
+ }
+ }
+
+ /* If we do not emit, we have to discard if we are going
+ * forward, or seek the previous entry if we are going
+ * backward. */
+ if (!si->rev) {
+ int to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS) ?
+ *numfields : *numfields*2;
+ for (int64_t i = 0; i < to_discard; i++)
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ } else {
+ int prev_times = 4; /* flag + id ms/seq diff + numfields. */
+ while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
+ }
+ }
+
+ /* End of listpack reached. Try the next/prev radix tree node. */
+ }
+}
+
+/* Get the field and value of the current item we are iterating. This should
+ * be called immediately after streamIteratorGetID(), and for each field
+ * according to the number of fields returned by streamIteratorGetID().
+ * The function populates the field and value pointers and the corresponding
+ * lengths by reference, that are valid until the next iterator call, assuming
+ * no one touches the stream meanwhile. */
+void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
+ if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
+ *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
+ si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
+ } else {
+ *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+ }
+ *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
+ si->lp_ele = lpNext(si->lp,si->lp_ele);
+}
+
+/* Stop the stream iterator. The only cleanup we need is to free the rax
+ * itereator, since the stream iterator itself is supposed to be stack
+ * allocated. */
+void streamIteratorStop(streamIterator *si) {
+ raxStop(&si->ri);
+}
+
+/* Send the specified range to the client 'c'. The range the client will
+ * receive is between start and end inclusive, if 'count' is non zero, no more
+ * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
+ * we want all the elements from 'start' till the end of the stream. If 'rev'
+ * is non zero, elements are produced in reversed order from end to start. */
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) {
+ void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ size_t arraylen = 0;
+ streamIterator si;
+ int64_t numfields;
+ streamID id;
+
+ streamIteratorStart(&si,s,start,end,rev);
+ while(streamIteratorGetID(&si,&id,&numfields)) {
+ /* Emit a two elements array for each item. The first is
+ * the ID, the second is an array of field-value pairs. */
+ sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
+ addReplyMultiBulkLen(c,2);
+ addReplySds(c,replyid);
+ addReplyMultiBulkLen(c,numfields*2);
+
+ /* Emit the field-value pairs. */
+ while(numfields--) {
+ unsigned char *key, *value;
+ int64_t key_len, value_len;
+ streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
+ addReplyBulkCBuffer(c,key,key_len);
+ addReplyBulkCBuffer(c,value,value_len);
+ }
+ arraylen++;
+ if (count && count == arraylen) break;
+ }
+ streamIteratorStop(&si);
+ setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ return arraylen;
+}
+
+/* -----------------------------------------------------------------------
+ * Stream commands implementation
+ * ----------------------------------------------------------------------- */
+
+/* Look the stream at 'key' and return the corresponding stream object.
+ * The function creates a key setting it to an empty stream if needed. */
+robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
+ robj *o = lookupKeyWrite(c->db,key);
+ if (o == NULL) {
+ o = createStreamObject();
+ dbAdd(c->db,key,o);
+ } else {
+ if (o->type != OBJ_STREAM) {
+ addReply(c,shared.wrongtypeerr);
+ return NULL;
+ }
+ }
+ return o;
+}
+
+/* Helper function to convert a string to an unsigned long long value.
+ * The function attempts to use the faster string2ll() function inside
+ * Redis: if it fails, strtoull() is used instead. The function returns
+ * 1 if the conversion happened successfully or 0 if the number is
+ * invalid or out of range. */
+int string2ull(const char *s, unsigned long long *value) {
+ long long ll;
+ if (string2ll(s,strlen(s),&ll)) {
+ if (ll < 0) return 0; /* Negative values are out of range. */
+ *value = ll;
+ return 1;
+ }
+ errno = 0;
+ *value = strtoull(s,NULL,10);
+ if (errno == EINVAL || errno == ERANGE) return 0; /* strtoull() failed. */
+ return 1; /* Conversion done! */
+}
+
+/* Parse a stream ID in the format given by clients to Redis, that is
+ * <ms>.<seq>, and converts it into a streamID structure. If
+ * the specified ID is invalid C_ERR is returned and an error is reported
+ * to the client, otherwise C_OK is returned. The ID may be in incomplete
+ * form, just stating the milliseconds time part of the stream. In such a case
+ * the missing part is set according to the value of 'missing_seq' parameter.
+ * The IDs "-" and "+" specify respectively the minimum and maximum IDs
+ * that can be represented.
+ *
+ * If 'c' is set to NULL, no reply is sent to the client. */
+int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
+ char buf[128];
+ if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
+ memcpy(buf,o->ptr,sdslen(o->ptr)+1);
+
+ /* Handle the "-" and "+" special cases. */
+ if (buf[0] == '-' && buf[1] == '\0') {
+ id->ms = 0;
+ id->seq = 0;
+ return C_OK;
+ } else if (buf[0] == '+' && buf[1] == '\0') {
+ id->ms = UINT64_MAX;
+ id->seq = UINT64_MAX;
+ return C_OK;
+ }
+
+ /* Parse <ms>.<seq> form. */
+ char *dot = strchr(buf,'-');
+ if (dot) *dot = '\0';
+ unsigned long long ms, seq;
+ if (string2ull(buf,&ms) == 0) goto invalid;
+ if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
+ if (!dot) seq = missing_seq;
+ id->ms = ms;
+ id->seq = seq;
+ return C_OK;
+
+invalid:
+ if (c) addReplyError(c,"Invalid stream ID specified as stream "
+ "command argument");
+ return C_ERR;
+}
+
+/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
+void xaddCommand(client *c) {
+ streamID id;
+ int id_given = 0; /* Was an ID different than "*" specified? */
+ long long maxlen = 0; /* 0 means no maximum length. */
+ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
+ the maxium length is not applied verbatim. */
+ int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
+
+ /* Parse options. */
+ int i = 2; /* This is the first argument position where we could
+ find an option, or the ID. */
+ for (; i < c->argc; i++) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (opt[0] == '*' && opt[1] == '\0') {
+ /* This is just a fast path for the common case of auto-ID
+ * creation. */
+ break;
+ } else if (!strcasecmp(opt,"maxlen") && moreargs) {
+ char *next = c->argv[i+1]->ptr;
+ /* Check for the form MAXLEN ~ <count>. */
+ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
+ approx_maxlen = 1;
+ i++;
+ }
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
+ != C_OK) return;
+ i++;
+ maxlen_arg_idx = i;
+ } else {
+ /* If we are here is a syntax error or a valid ID. */
+ if (streamParseIDOrReply(NULL,c->argv[i],&id,0) == C_OK) {
+ id_given = 1;
+ break;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+ }
+ int field_pos = i+1;
+
+ /* Check arity. */
+ if ((c->argc - field_pos) < 2 || (c->argc-field_pos % 2) == 1) {
+ addReplyError(c,"wrong number of arguments for XADD");
+ return;
+ }
+
+ /* Lookup the stream at key. */
+ robj *o;
+ stream *s;
+ if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
+ s = o->ptr;
+
+ /* Append using the low level function and return the ID. */
+ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
+ &id, id_given ? &id : NULL)
+ == C_ERR)
+ {
+ addReplyError(c,"The ID specified in XADD is smaller than the "
+ "target stream top item");
+ return;
+ }
+ sds reply = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
+ addReplySds(c,reply);
+
+ signalModifiedKey(c->db,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
+ server.dirty++;
+
+ /* Remove older elements if MAXLEN was specified. */
+ if (maxlen) {
+ if (!streamTrimByLength(s,maxlen,approx_maxlen)) {
+ /* If no trimming was performed, for instance because approximated
+ * trimming length was specified, rewrite the MAXLEN argument
+ * as zero, so that the command is propagated without trimming. */
+ robj *zeroobj = createStringObjectFromLongLong(0);
+ rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj);
+ decrRefCount(zeroobj);
+ } else {
+ notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
+ }
+ }
+
+ /* Let's rewrite the ID argument with the one actually generated for
+ * AOF/replication propagation. */
+ robj *idarg = createObject(OBJ_STRING,
+ sdscatfmt(sdsempty(),"%U-%U",id.ms,id.seq));
+ rewriteClientCommandArgument(c,i,idarg);
+ decrRefCount(idarg);
+
+ /* We need to signal to blocked clients that there is new data on this
+ * stream. */
+ if (server.blocked_clients_by_type[BLOCKED_STREAM])
+ signalKeyAsReady(c->db, c->argv[1]);
+}
+
+/* XRANGE/XREVRANGE actual implementation. */
+void xrangeGenericCommand(client *c, int rev) {
+ robj *o;
+ stream *s;
+ streamID startid, endid;
+ long long count = 0;
+ robj *startarg = rev ? c->argv[3] : c->argv[2];
+ robj *endarg = rev ? c->argv[2] : c->argv[3];
+
+ if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
+ if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;
+
+ /* Parse the COUNT option if any. */
+ if (c->argc > 4) {
+ for (int j = 4; j < c->argc; j++) {
+ int additional = c->argc-j-1;
+ if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)
+ != C_OK) return;
+ if (count < 0) count = 0;
+ j++; /* Consume additional arg. */
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+ }
+
+ /* Return the specified range to the user. */
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ s = o->ptr;
+ streamReplyWithRange(c,s,&startid,&endid,count,rev);
+}
+
+/* XRANGE key start end [COUNT <n>] */
+void xrangeCommand(client *c) {
+ xrangeGenericCommand(c,0);
+}
+
+/* XREVRANGE key end start [COUNT <n>] */
+void xrevrangeCommand(client *c) {
+ xrangeGenericCommand(c,1);
+}
+
+/* XLEN */
+void xlenCommand(client *c) {
+ robj *o;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ stream *s = o->ptr;
+ addReplyLongLong(c,s->length);
+}
+
+/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
+ * [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
+ * ID_1 ID_2 ... ID_N */
+#define XREAD_BLOCKED_DEFAULT_COUNT 1000
+void xreadCommand(client *c) {
+ long long timeout = -1; /* -1 means, no BLOCK argument given. */
+ long long count = 0;
+ int streams_count = 0;
+ int streams_arg = 0;
+ #define STREAMID_STATIC_VECTOR_LEN 8
+ streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
+ streamID *ids = static_ids;
+
+ /* Parse arguments. */
+ for (int i = 1; i < c->argc; i++) {
+ int moreargs = i != c->argc-1;
+ char *o = c->argv[i]->ptr;
+ if (!strcasecmp(o,"BLOCK") && moreargs) {
+ i++;
+ if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
+ UNIT_MILLISECONDS) != C_OK) return;
+ } else if (!strcasecmp(o,"COUNT") && moreargs) {
+ i++;
+ if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
+ return;
+ if (count < 0) count = 0;
+ } else if (!strcasecmp(o,"STREAMS") && moreargs) {
+ streams_arg = i+1;
+ streams_count = (c->argc-streams_arg);
+ if ((streams_count % 2) != 0) {
+ addReplyError(c,"Unbalanced XREAD list of streams: "
+ "for each stream key an ID or '$' must be "
+ "specified.");
+ return;
+ }
+ streams_count /= 2; /* We have two arguments for each stream. */
+ break;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* STREAMS option is mandatory. */
+ if (streams_arg == 0) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+
+ /* Parse the IDs. */
+ if (streams_count > STREAMID_STATIC_VECTOR_LEN)
+ ids = zmalloc(sizeof(streamID)*streams_count);
+
+ for (int i = streams_arg + streams_count; i < c->argc; i++) {
+ /* Specifying "$" as last-known-id means that the client wants to be
+ * served with just the messages that will arrive into the stream
+ * starting from now. */
+ int id_idx = i - streams_arg - streams_count;
+ if (strcmp(c->argv[i]->ptr,"$") == 0) {
+ robj *o = lookupKeyRead(c->db,c->argv[i-streams_count]);
+ if (o) {
+ stream *s = o->ptr;
+ ids[id_idx] = s->last_id;
+ } else {
+ ids[id_idx].ms = 0;
+ ids[id_idx].seq = 0;
+ }
+ continue;
+ }
+ if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
+ goto cleanup;
+ }
+
+ /* Try to serve the client synchronously. */
+ size_t arraylen = 0;
+ void *arraylen_ptr = NULL;
+ for (int i = 0; i < streams_count; i++) {
+ robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
+ if (o == NULL) continue;
+ stream *s = o->ptr;
+ streamID *gt = ids+i; /* ID must be greater than this. */
+ if (s->last_id.ms > gt->ms ||
+ (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
+ {
+ arraylen++;
+ if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
+ /* streamReplyWithRange() handles the 'start' ID as inclusive,
+ * so start from the next ID, since we want only messages with
+ * IDs greater than start. */
+ streamID start = *gt;
+ start.seq++; /* Can't overflow, it's an uint64_t */
+
+ /* Emit the two elements sub-array consisting of the name
+ * of the stream and the data we extracted from it. */
+ addReplyMultiBulkLen(c,2);
+ addReplyBulk(c,c->argv[i+streams_arg]);
+ streamReplyWithRange(c,s,&start,NULL,count,0);
+ }
+ }
+
+ /* We replied synchronously! Set the top array len and return to caller. */
+ if (arraylen) {
+ setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ goto cleanup;
+ }
+
+ /* Block if needed. */
+ if (timeout != -1) {
+ /* If we are inside a MULTI/EXEC and the list is empty the only thing
+ * we can do is treating it as a timeout (even with timeout 0). */
+ if (c->flags & CLIENT_MULTI) {
+ addReply(c,shared.nullmultibulk);
+ goto cleanup;
+ }
+ blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
+ timeout, NULL, ids);
+ /* If no COUNT is given and we block, set a relatively small count:
+ * in case the ID provided is too low, we do not want the server to
+ * block just to serve this client a huge stream of messages. */
+ c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
+ c->bpop.xread_group = NULL; /* Not used for now. */
+ goto cleanup;
+ }
+
+ /* No BLOCK option, nor any stream we can serve. Reply as with a
+ * timeout happened. */
+ addReply(c,shared.nullmultibulk);
+ /* Continue to cleanup... */
+
+cleanup:
+ /* Cleanup. */
+ if (ids != static_ids) zfree(ids);
+}
+
+
diff --git a/tests/instances.tcl b/tests/instances.tcl
index 2ba67ac19..357b34818 100644
--- a/tests/instances.tcl
+++ b/tests/instances.tcl
@@ -318,7 +318,7 @@ proc end_tests {} {
puts "GOOD! No errors."
exit 0
} else {
- puts "WARNING $::failed tests faield."
+ puts "WARNING $::failed test(s) failed."
exit 1
}
}
diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl
index d91969e3e..3d9e5527a 100644
--- a/tests/integration/psync2.tcl
+++ b/tests/integration/psync2.tcl
@@ -10,7 +10,7 @@ start_server {} {
# Config
set debug_msg 0 ; # Enable additional debug messages
- set no_exit 0; ; # Do not exit at end of the test
+ set no_exit 0 ; # Do not exit at end of the test
set duration 20 ; # Total test seconds
@@ -175,6 +175,69 @@ start_server {} {
assert {$sync_count == $new_sync_count}
}
+ test "PSYNC2: Slave RDB restart with EVALSHA in backlog issue #4483" {
+ # Pick a random slave
+ set slave_id [expr {($master_id+1)%5}]
+ set sync_count [status $R($master_id) sync_full]
+
+ # Make sure to replicate the first EVAL while the salve is online
+ # so that it's part of the scripts the master believes it's safe
+ # to propagate as EVALSHA.
+ $R($master_id) EVAL {return redis.call("incr","__mycounter")} 0
+ $R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
+
+ # Wait for the two to sync
+ wait_for_condition 50 1000 {
+ [$R($master_id) debug digest] == [$R($slave_id) debug digest]
+ } else {
+ fail "Slave not reconnecting"
+ }
+
+ # Prevent the slave from receiving master updates, and at
+ # the same time send a new script several times to the
+ # master, so that we'll end with EVALSHA into the backlog.
+ $R($slave_id) slaveof 127.0.0.1 0
+
+ $R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
+ $R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
+ $R($master_id) EVALSHA e6e0b547500efcec21eddb619ac3724081afee89 0
+
+ catch {
+ $R($slave_id) config rewrite
+ $R($slave_id) debug restart
+ }
+
+ # Reconfigure the slave correctly again, when it's back online.
+ set retry 50
+ while {$retry} {
+ if {[catch {
+ $R($slave_id) slaveof $master_host $master_port
+ }]} {
+ after 1000
+ } else {
+ break
+ }
+ incr retry -1
+ }
+
+ # The master should be back at 4 slaves eventually
+ wait_for_condition 50 1000 {
+ [status $R($master_id) connected_slaves] == 4
+ } else {
+ fail "Slave not reconnecting"
+ }
+ set new_sync_count [status $R($master_id) sync_full]
+ assert {$sync_count == $new_sync_count}
+
+ # However if the slave started with the full state of the
+ # scripting engine, we should now have the same digest.
+ wait_for_condition 50 1000 {
+ [$R($master_id) debug digest] == [$R($slave_id) debug digest]
+ } else {
+ fail "Debug digest mismatch between master and slave in post-restart handshake"
+ }
+ }
+
if {$no_exit} {
while 1 { puts -nonewline .; flush stdout; after 1000}
}
diff --git a/tests/integration/replication-3.tcl b/tests/integration/replication-3.tcl
index 50dcb9a9a..580be7602 100644
--- a/tests/integration/replication-3.tcl
+++ b/tests/integration/replication-3.tcl
@@ -100,7 +100,6 @@ start_server {tags {"repl"}} {
close $fd
puts "Master - Slave inconsistency"
puts "Run diff -u against /tmp/repldump*.txt for more info"
-
}
set old_digest [r debug digest]
@@ -109,5 +108,27 @@ start_server {tags {"repl"}} {
set new_digest [r debug digest]
assert {$old_digest eq $new_digest}
}
+
+ test {SLAVE can reload "lua" AUX RDB fields of duplicated scripts} {
+ # Force a Slave full resynchronization
+ r debug change-repl-id
+ r -1 client kill type master
+
+ # Check that after a full resync the slave can still load
+ # correctly the RDB file: such file will contain "lua" AUX
+ # sections with scripts already in the memory of the master.
+
+ wait_for_condition 50 100 {
+ [s -1 master_link_status] eq {up}
+ } else {
+ fail "Replication not started."
+ }
+
+ wait_for_condition 50 100 {
+ [r debug digest] eq [r -1 debug digest]
+ } else {
+ fail "DEBUG DIGEST mismatch after full SYNC with many scripts"
+ }
+ }
}
}
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 41c867803..7def9a7f6 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -26,6 +26,7 @@ set ::all_tests {
unit/type/set
unit/type/zset
unit/type/hash
+ unit/type/stream
unit/sort
unit/expire
unit/other
diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl
new file mode 100644
index 000000000..d7b5ca2a8
--- /dev/null
+++ b/tests/unit/type/stream.tcl
@@ -0,0 +1,256 @@
+# return value is like strcmp() and similar.
+proc streamCompareID {a b} {
+ if {$a eq $b} {return 0}
+ lassign [split $a -] a_ms a_seq
+ lassign [split $b -] b_ms b_seq
+ if {$a_ms > $b_ms} {return 1}
+ if {$a_ms < $b_ms} {return -1}
+ # Same ms case, compare seq.
+ if {$a_seq > $b_seq} {return 1}
+ if {$a_seq < $b_seq} {return -1}
+}
+
+# return the ID immediately greater than the specified one.
+# Note that this function does not care to handle 'seq' overflow
+# since it's a 64 bit value.
+proc streamNextID {id} {
+ lassign [split $id -] ms seq
+ incr seq
+ join [list $ms $seq] -
+}
+
+# Generate a random stream entry ID with the ms part between min and max
+# and a low sequence number (0 - 999 range), in order to stress test
+# XRANGE against a Tcl implementation implementing the same concept
+# with Tcl-only code in a linear array.
+proc streamRandomID {min_id max_id} {
+ lassign [split $min_id -] min_ms min_seq
+ lassign [split $max_id -] max_ms max_seq
+ set delta [expr {$max_ms-$min_ms+1}]
+ set ms [expr {$min_ms+[randomInt $delta]}]
+ set seq [randomInt 1000]
+ return $ms-$seq
+}
+
+# Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
+# XRANGE implementation.
+proc streamSimulateXRANGE {items start end} {
+ set res {}
+ foreach i $items {
+ set this_id [lindex $i 0]
+ if {[streamCompareID $this_id $start] >= 0} {
+ if {[streamCompareID $this_id $end] <= 0} {
+ lappend res $i
+ }
+ }
+ }
+ return $res
+}
+
+set content {} ;# Will be populated with Tcl side copy of the stream content.
+
+start_server {
+ tags {"stream"}
+} {
+ test {XADD can add entries into a stream that XRANGE can fetch} {
+ r XADD mystream * item 1 value a
+ r XADD mystream * item 2 value b
+ assert_equal 2 [r XLEN mystream]
+ set items [r XRANGE mystream - +]
+ assert_equal [lindex $items 0 1] {item 1 value a}
+ assert_equal [lindex $items 1 1] {item 2 value b}
+ }
+
+ test {XADD IDs are incremental} {
+ set id1 [r XADD mystream * item 1 value a]
+ set id2 [r XADD mystream * item 2 value b]
+ set id3 [r XADD mystream * item 3 value c]
+ assert {[streamCompareID $id1 $id2] == -1}
+ assert {[streamCompareID $id2 $id3] == -1}
+ }
+
+ test {XADD IDs are incremental when ms is the same as well} {
+ r multi
+ r XADD mystream * item 1 value a
+ r XADD mystream * item 2 value b
+ r XADD mystream * item 3 value c
+ lassign [r exec] id1 id2 id3
+ assert {[streamCompareID $id1 $id2] == -1}
+ assert {[streamCompareID $id2 $id3] == -1}
+ }
+
+ test {XADD with MAXLEN option} {
+ r DEL mystream
+ for {set j 0} {$j < 1000} {incr j} {
+ if {rand() < 0.9} {
+ r XADD mystream MAXLEN 5 * xitem $j
+ } else {
+ r XADD mystream MAXLEN 5 * yitem $j
+ }
+ }
+ set res [r xrange mystream - +]
+ set expected 995
+ foreach r $res {
+ assert {[lindex $r 1 1] == $expected}
+ incr expected
+ }
+ }
+
+ test {XADD mass insertion and XLEN} {
+ r DEL mystream
+ r multi
+ for {set j 0} {$j < 10000} {incr j} {
+ # From time to time insert a field with a different set
+ # of fields in order to stress the stream compression code.
+ if {rand() < 0.9} {
+ r XADD mystream * item $j
+ } else {
+ r XADD mystream * item $j otherfield foo
+ }
+ }
+ r exec
+
+ set items [r XRANGE mystream - +]
+ for {set j 0} {$j < 10000} {incr j} {
+ assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
+ }
+ assert {[r xlen mystream] == $j}
+ }
+
+ test {XRANGE COUNT works as expected} {
+ assert {[llength [r xrange mystream - + COUNT 10]] == 10}
+ }
+
+ test {XREVRANGE COUNT works as expected} {
+ assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
+ }
+
+ test {XRANGE can be used to iterate the whole stream} {
+ set last_id "-"
+ set j 0
+ while 1 {
+ set elements [r xrange mystream $last_id + COUNT 100]
+ if {[llength $elements] == 0} break
+ foreach e $elements {
+ assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
+ incr j;
+ }
+ set last_id [streamNextID [lindex $elements end 0]]
+ }
+ assert {$j == 10000}
+ }
+
+ test {XREVRANGE returns the reverse of XRANGE} {
+ assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
+ }
+
+ test {XREAD with non empty stream} {
+ set res [r XREAD COUNT 1 STREAMS mystream 0.0]
+ assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
+ }
+
+ test {Non blocking XREAD with empty streams} {
+ set res [r XREAD STREAMS s1 s2 0.0 0.0]
+ assert {$res eq {}}
+ }
+
+ test {XREAD with non empty second stream} {
+ set res [r XREAD COUNT 1 STREAMS nostream mystream 0.0 0.0]
+ assert {[lindex $res 0 0] eq {mystream}}
+ assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
+ }
+
+ test {Blocking XREAD waiting new data} {
+ r XADD s2 * old abcd1234
+ set rd [redis_deferring_client]
+ $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
+ r XADD s2 * new abcd1234
+ set res [$rd read]
+ assert {[lindex $res 0 0] eq {s2}}
+ assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
+ }
+
+ test {Blocking XREAD waiting old data} {
+ set rd [redis_deferring_client]
+ $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0.0 $
+ r XADD s2 * foo abcd1234
+ set res [$rd read]
+ assert {[lindex $res 0 0] eq {s2}}
+ assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
+ }
+
+ test "XREAD: XADD + DEL should not awake client" {
+ set rd [redis_deferring_client]
+ r del s1
+ $rd XREAD BLOCK 20000 STREAMS s1 $
+ r multi
+ r XADD s1 * old abcd1234
+ r DEL s1
+ r exec
+ r XADD s1 * new abcd1234
+ set res [$rd read]
+ assert {[lindex $res 0 0] eq {s1}}
+ assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
+ }
+
+ test "XREAD: XADD + DEL + LPUSH should not awake client" {
+ set rd [redis_deferring_client]
+ r del s1
+ $rd XREAD BLOCK 20000 STREAMS s1 $
+ r multi
+ r XADD s1 * old abcd1234
+ r DEL s1
+ r LPUSH s1 foo bar
+ r exec
+ r DEL s1
+ r XADD s1 * new abcd1234
+ set res [$rd read]
+ assert {[lindex $res 0 0] eq {s1}}
+ assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
+ }
+
+ test {XREAD with same stream name multiple times should work} {
+ r XADD s2 * old abcd1234
+ set rd [redis_deferring_client]
+ $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
+ r XADD s2 * new abcd1234
+ set res [$rd read]
+ assert {[lindex $res 0 0] eq {s2}}
+ assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
+ }
+
+ test {XREAD + multiple XADD inside transaction} {
+ r XADD s2 * old abcd1234
+ set rd [redis_deferring_client]
+ $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
+ r MULTI
+ r XADD s2 * field one
+ r XADD s2 * field two
+ r XADD s2 * field three
+ r EXEC
+ set res [$rd read]
+ assert {[lindex $res 0 0] eq {s2}}
+ assert {[lindex $res 0 1 0 1] eq {field one}}
+ assert {[lindex $res 0 1 1 1] eq {field two}}
+ }
+
+ test {XRANGE fuzzing} {
+ set low_id [lindex $items 0 0]
+ set high_id [lindex $items end 0]
+ for {set j 0} {$j < 100} {incr j} {
+ set start [streamRandomID $low_id $high_id]
+ set end [streamRandomID $low_id $high_id]
+ set range [r xrange mystream $start $end]
+ set tcl_range [streamSimulateXRANGE $items $start $end]
+ if {$range ne $tcl_range} {
+ puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
+ puts "---"
+ puts "XRANGE: '$range'"
+ puts "---"
+ puts "TCL: '$tcl_range'"
+ puts "---"
+ fail "XRANGE fuzzing failed, check logs for details"
+ }
+ }
+ }
+}