summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile2
-rw-r--r--src/listpack.c783
-rw-r--r--src/listpack.h61
-rw-r--r--src/listpack_malloc.h44
-rw-r--r--src/object.c7
-rw-r--r--src/rax.c24
-rw-r--r--src/rax.h1
-rw-r--r--src/rdb.h3
-rw-r--r--src/server.c2
-rw-r--r--src/server.h9
-rw-r--r--src/stream.h21
-rw-r--r--src/t_stream.c376
12 files changed, 1323 insertions, 10 deletions
diff --git a/src/Makefile b/src/Makefile
index 86e0b3fe0..b896b1263 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -144,7 +144,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.c
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/listpack.c b/src/listpack.c
new file mode 100644
index 000000000..e2702b65c
--- /dev/null
+++ b/src/listpack.c
@@ -0,0 +1,783 @@
+/* Listpack -- A lists of strings serialization format
+ *
+ * This file implements the specification you can find at:
+ *
+ * https://github.com/antirez/listpack
+ *
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdint.h>
+#include <limits.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "listpack.h"
+#include "listpack_malloc.h"
+
+#define LP_HDR_SIZE 6 /* 32 bit total len + 16 bit number of elements. */
+#define LP_HDR_NUMELE_UNKNOWN UINT16_MAX
+#define LP_MAX_INT_ENCODING_LEN 9
+#define LP_MAX_BACKLEN_SIZE 5
+#define LP_MAX_ENTRY_BACKLEN 34359738367ULL
+#define LP_ENCODING_INT 0
+#define LP_ENCODING_STRING 1
+
+#define LP_ENCODING_7BIT_UINT 0
+#define LP_ENCODING_7BIT_UINT_MASK 0x80
+#define LP_ENCODING_IS_7BIT_UINT(byte) (((byte)&LP_ENCODING_7BIT_UINT_MASK)==LP_ENCODING_7BIT_UINT)
+
+#define LP_ENCODING_6BIT_STR 0x80
+#define LP_ENCODING_6BIT_STR_MASK 0xC0
+#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR)
+
+#define LP_ENCODING_13BIT_INT 0xC0
+#define LP_ENCODING_13BIT_INT_MASK 0xE0
+#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT)
+
+#define LP_ENCODING_12BIT_STR 0xE0
+#define LP_ENCODING_12BIT_STR_MASK 0xF0
+#define LP_ENCODING_IS_12BIT_STR(byte) (((byte)&LP_ENCODING_12BIT_STR_MASK)==LP_ENCODING_12BIT_STR)
+
+#define LP_ENCODING_16BIT_INT 0xF1
+#define LP_ENCODING_16BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_16BIT_INT(byte) (((byte)&LP_ENCODING_16BIT_INT_MASK)==LP_ENCODING_16BIT_INT)
+
+#define LP_ENCODING_24BIT_INT 0xF2
+#define LP_ENCODING_24BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_24BIT_INT(byte) (((byte)&LP_ENCODING_24BIT_INT_MASK)==LP_ENCODING_24BIT_INT)
+
+#define LP_ENCODING_32BIT_INT 0xF3
+#define LP_ENCODING_32BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_32BIT_INT(byte) (((byte)&LP_ENCODING_32BIT_INT_MASK)==LP_ENCODING_32BIT_INT)
+
+#define LP_ENCODING_64BIT_INT 0xF4
+#define LP_ENCODING_64BIT_INT_MASK 0xFF
+#define LP_ENCODING_IS_64BIT_INT(byte) (((byte)&LP_ENCODING_64BIT_INT_MASK)==LP_ENCODING_64BIT_INT)
+
+#define LP_ENCODING_32BIT_STR 0xF0
+#define LP_ENCODING_32BIT_STR_MASK 0xFF
+#define LP_ENCODING_IS_32BIT_STR(byte) (((byte)&LP_ENCODING_32BIT_STR_MASK)==LP_ENCODING_32BIT_STR)
+
+#define LP_EOF 0xFF
+
+#define LP_ENCODING_6BIT_STR_LEN(p) ((p)[0] & 0x3F)
+#define LP_ENCODING_12BIT_STR_LEN(p) ((((p)[0] & 0xF) << 8) | (p)[1])
+#define LP_ENCODING_32BIT_STR_LEN(p) (((uint32_t)(p)[1]<<0) | \
+ ((uint32_t)(p)[2]<<8) | \
+ ((uint32_t)(p)[3]<<16) | \
+ ((uint32_t)(p)[4]<<24))
+
+#define lpGetTotalBytes(p) (((uint32_t)(p)[0]<<0) | \
+ ((uint32_t)(p)[1]<<8) | \
+ ((uint32_t)(p)[2]<<16) | \
+ ((uint32_t)(p)[3]<<24))
+
+#define lpGetNumElements(p) (((uint32_t)(p)[4]<<0) | \
+ ((uint32_t)(p)[5]<<8))
+#define lpSetTotalBytes(p,v) do { \
+ (p)[0] = (v)&0xff; \
+ (p)[1] = ((v)>>8)&0xff; \
+ (p)[2] = ((v)>>16)&0xff; \
+ (p)[3] = ((v)>>24)&0xff; \
+} while(0)
+
+#define lpSetNumElements(p,v) do { \
+ (p)[4] = (v)&0xff; \
+ (p)[5] = ((v)>>8)&0xff; \
+} while(0)
+
+/* Convert a string into a signed 64 bit integer.
+ * The function returns 1 if the string could be parsed into a (non-overflowing)
+ * signed 64 bit int, 0 otherwise. The 'value' will be set to the parsed value
+ * when the function returns success.
+ *
+ * Note that this function demands that the string strictly represents
+ * a int64 value: no spaces or other characters before or after the string
+ * representing the number are accepted, nor zeroes at the start if not
+ * for the string "0" representing the zero number.
+ *
+ * Because of its strictness, it is safe to use this function to check if
+ * you can convert a string into a long long, and obtain back the string
+ * from the number without any loss in the string representation. *
+ *
+ * -----------------------------------------------------------------------------
+ *
+ * Credits: this function was adapted from the Redis source code, file
+ * "utils.c", function string2ll(), and is copyright:
+ *
+ * Copyright(C) 2011, Pieter Noordhuis
+ * Copyright(C) 2011, Salvatore Sanfilippo
+ *
+ * The function is released under the BSD 3-clause license.
+ */
+int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
+ const char *p = s;
+ unsigned long plen = 0;
+ int negative = 0;
+ uint64_t v;
+
+ if (plen == slen)
+ return 0;
+
+ /* Special case: first and only digit is 0. */
+ if (slen == 1 && p[0] == '0') {
+ if (value != NULL) *value = 0;
+ return 1;
+ }
+
+ if (p[0] == '-') {
+ negative = 1;
+ p++; plen++;
+
+ /* Abort on only a negative sign. */
+ if (plen == slen)
+ return 0;
+ }
+
+ /* First digit should be 1-9, otherwise the string should just be 0. */
+ if (p[0] >= '1' && p[0] <= '9') {
+ v = p[0]-'0';
+ p++; plen++;
+ } else if (p[0] == '0' && slen == 1) {
+ *value = 0;
+ return 1;
+ } else {
+ return 0;
+ }
+
+ while (plen < slen && p[0] >= '0' && p[0] <= '9') {
+ if (v > (UINT64_MAX / 10)) /* Overflow. */
+ return 0;
+ v *= 10;
+
+ if (v > (UINT64_MAX - (p[0]-'0'))) /* Overflow. */
+ return 0;
+ v += p[0]-'0';
+
+ p++; plen++;
+ }
+
+ /* Return if not all bytes were used. */
+ if (plen < slen)
+ return 0;
+
+ if (negative) {
+ if (v > ((uint64_t)(-(INT64_MIN+1))+1)) /* Overflow. */
+ return 0;
+ if (value != NULL) *value = -v;
+ } else {
+ if (v > INT64_MAX) /* Overflow. */
+ return 0;
+ if (value != NULL) *value = v;
+ }
+ return 1;
+}
+
+/* Create a new, empty listpack.
+ * On success the new listpack is returned, otherwise an error is returned. */
+unsigned char *lpNew(void) {
+ unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
+ if (lp == NULL) return NULL;
+ lpSetTotalBytes(lp,LP_HDR_SIZE+1);
+ lpSetNumElements(lp,0);
+ lp[LP_HDR_SIZE] = LP_EOF;
+ return lp;
+}
+
+/* Free the specified listpack. */
+void lpFree(unsigned char *lp) {
+ lp_free(lp);
+}
+
+/* Given an element 'ele' of size 'size', determine if the element can be
+ * represented inside the listpack encoded as integer, and returns
+ * LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
+ * encoding is possible.
+ *
+ * If the LP_ENCODING_INT is returned, the function stores the integer encoded
+ * representation of the element in the 'intenc' buffer.
+ *
+ * Regardless of the returned encoding, 'enclen' is populated by reference to
+ * the number of bytes that the string or integer encoded element will require
+ * in order to be represented. */
+int lpEncodeGetType(unsigned char *ele, uint32_t size, unsigned char *intenc, uint64_t *enclen) {
+ int64_t v;
+ if (lpStringToInt64((const char*)ele, size, &v)) {
+ if (v >= 0 && v <= 127) {
+ /* Single byte 0-127 integer. */
+ intenc[0] = v;
+ *enclen = 1;
+ } else if (v >= -4096 && v <= 4095) {
+ /* 13 bit integer. */
+ if (v < 0) v = ((int64_t)1<<13)+v;
+ intenc[0] = (v>>8)|LP_ENCODING_13BIT_INT;
+ intenc[1] = v&0xff;
+ *enclen = 2;
+ } else if (v >= -32768 && v <= 32767) {
+ /* 16 bit integer. */
+ if (v < 0) v = ((int64_t)1<<16)+v;
+ intenc[0] = LP_ENCODING_16BIT_INT;
+ intenc[1] = v&0xff;
+ intenc[2] = v>>8;
+ *enclen = 3;
+ } else if (v >= -8388608 && v <= 8388607) {
+ /* 24 bit integer. */
+ if (v < 0) v = ((int64_t)1<<24)+v;
+ intenc[0] = LP_ENCODING_24BIT_INT;
+ intenc[1] = v&0xff;
+ intenc[2] = (v>>8)&0xff;
+ intenc[3] = v>>16;
+ *enclen = 4;
+ } else if (v >= -2147483648 && v <= 2147483647) {
+ /* 32 bit integer. */
+ if (v < 0) v = ((int64_t)1<<32)+v;
+ intenc[0] = LP_ENCODING_32BIT_INT;
+ intenc[1] = v&0xff;
+ intenc[2] = (v>>8)&0xff;
+ intenc[3] = (v>>16)&0xff;
+ intenc[4] = v>>24;
+ *enclen = 5;
+ } else {
+ /* 64 bit integer. */
+ uint64_t uv = v;
+ intenc[0] = LP_ENCODING_64BIT_INT;
+ intenc[1] = uv&0xff;
+ intenc[2] = (uv>>8)&0xff;
+ intenc[3] = (uv>>16)&0xff;
+ intenc[4] = (uv>>24)&0xff;
+ intenc[5] = (uv>>32)&0xff;
+ intenc[6] = (uv>>40)&0xff;
+ intenc[7] = (uv>>48)&0xff;
+ intenc[8] = uv>>56;
+ *enclen = 9;
+ }
+ return LP_ENCODING_INT;
+ } else {
+ if (size < 64) *enclen = 1+size;
+ else if (size < 4096) *enclen = 2+size;
+ else *enclen = 4+size;
+ return LP_ENCODING_STRING;
+ }
+}
+
+/* Store a reverse-encoded variable length field, representing the length
+ * of the previous element of size 'l', in the target buffer 'buf'.
+ * The function returns the number of bytes used to encode it, from
+ * 1 to 5. If 'buf' is NULL the funciton just returns the number of bytes
+ * needed in order to encode the backlen. */
+unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {
+ if (l <= 127) {
+ if (buf) buf[0] = l;
+ return 1;
+ } else if (l < 16383) {
+ if (buf) {
+ buf[0] = l>>7;
+ buf[1] = (l&127)|128;
+ }
+ return 2;
+ } else if (l < 2097151) {
+ if (buf) {
+ buf[0] = l>>14;
+ buf[1] = ((l>>7)&127)|128;
+ buf[2] = (l&127)|128;
+ }
+ return 3;
+ } else if (l < 268435455) {
+ if (buf) {
+ buf[0] = l>>21;
+ buf[1] = ((l>>14)&127)|128;
+ buf[2] = ((l>>7)&127)|128;
+ buf[3] = (l&127)|128;
+ }
+ return 4;
+ } else {
+ if (buf) {
+ buf[0] = l>>28;
+ buf[1] = ((l>>21)&127)|128;
+ buf[2] = ((l>>14)&127)|128;
+ buf[3] = ((l>>7)&127)|128;
+ buf[4] = (l&127)|128;
+ }
+ return 5;
+ }
+}
+
+/* Decode the backlen and returns it. If the encoding looks invalid (more than
+ * 5 bytes are used), UINT64_MAX is returned to report the problem. */
+uint64_t lpDecodeBacklen(unsigned char *p) {
+ uint64_t val = 0;
+ uint64_t shift = 0;
+ do {
+ val |= (uint64_t)(p[0] & 127) << shift;
+ if (!(p[0] & 128)) break;
+ shift += 7;
+ p--;
+ if (shift > 28) return UINT64_MAX;
+ } while(1);
+ return val;
+}
+
+/* Encode the string element pointed by 's' of size 'len' in the target
+ * buffer 's'. The function should be called with 'buf' having always enough
+ * space for encoding the string. This is done by calling lpEncodeGetType()
+ * before calling this function. */
+void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) {
+ if (len < 64) {
+ buf[0] = len | LP_ENCODING_6BIT_STR;
+ memcpy(buf+1,s,len);
+ } else if (len < 4096) {
+ buf[0] = (len >> 8) | LP_ENCODING_12BIT_STR;
+ buf[1] = len & 0xff;
+ memcpy(buf+2,s,len);
+ } else {
+ buf[0] = LP_ENCODING_32BIT_STR;
+ buf[1] = len & 0xff;
+ buf[2] = (len >> 8) & 0xff;
+ buf[3] = (len >> 16) & 0xff;
+ buf[4] = (len >> 24) & 0xff;
+ memcpy(buf+4,s,len);
+ }
+}
+
+/* Return the encoded length of the listpack element pointed by 'p'. If the
+ * element encoding is wrong then 0 is returned. */
+uint32_t lpCurrentEncodedSize(unsigned char *p) {
+ if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;
+ if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p);
+ if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2;
+ if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3;
+ if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4;
+ if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5;
+ if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9;
+ if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p);
+ if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p);
+ if (p[0] == LP_EOF) return 1;
+ return 0;
+}
+
+/* Skip the current entry returning the next. It is invalid to call this
+ * function if the current element is the EOF element at the end of the
+ * listpack, however, while this function is used to implement lpNext(),
+ * it does not return NULL when the EOF element is encountered. */
+unsigned char *lpSkip(unsigned char *p) {
+ unsigned long entrylen = lpCurrentEncodedSize(p);
+ entrylen += lpEncodeBacklen(NULL,entrylen);
+ p += entrylen;
+ return p;
+}
+
+/* If 'p' points to an element of the listpack, calling lpNext() will return
+ * the pointer to the next element (the one on the right), or NULL if 'p'
+ * already pointed to the last element of the listpack. */
+unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
+ ((void) lp); /* lp is not used for now. However lpPrev() uses it. */
+ p = lpSkip(p);
+ if (p[0] == LP_EOF) return NULL;
+ return p;
+}
+
+/* If 'p' points to an element of the listpack, calling lpPrev() will return
+ * the pointer to the preivous element (the one on the left), or NULL if 'p'
+ * already pointed to the first element of the listpack. */
+unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
+ if (p-lp == LP_HDR_SIZE) return NULL;
+ p--; /* Seek the first backlen byte of the last element. */
+ uint64_t prevlen = lpDecodeBacklen(p);
+ prevlen += lpEncodeBacklen(NULL,prevlen);
+ return p-prevlen+1; /* Seek the first byte of the previous entry. */
+}
+
+/* Return a pointer to the first element of the listpack, or NULL if the
+ * listpack has no elements. */
+unsigned char *lpFirst(unsigned char *lp) {
+ lp += LP_HDR_SIZE; /* Skip the header. */
+ if (lp[0] == LP_EOF) return NULL;
+ return lp;
+}
+
+/* Return a pointer to the last element of the listpack, or NULL if the
+ * listpack has no elements. */
+unsigned char *lpLast(unsigned char *lp) {
+ unsigned char *p = lp+lpGetTotalBytes(lp)-1; /* Seek EOF element. */
+ return lpPrev(lp,p); /* Will return NULL if EOF is the only element. */
+}
+
+/* Return the number of elements inside the listpack. This function attempts
+ * to use the cached value when within range, otherwise a full scan is
+ * needed. As a side effect of calling this function, the listpack header
+ * could be modified, because if the count is found to be already within
+ * the 'numele' header field range, the new value is set. */
+uint32_t lpLength(unsigned char *lp) {
+ uint32_t numele = lpGetNumElements(lp);
+ if (numele != LP_HDR_NUMELE_UNKNOWN) return numele;
+
+ /* Too many elements inside the listpack. We need to scan in order
+ * to get the total number. */
+ uint32_t count = 0;
+ unsigned char *p = lpFirst(lp);
+ while(p) {
+ count++;
+ p = lpNext(lp,p);
+ }
+
+ /* If the count is again within range of the header numele field,
+ * set it. */
+ if (count < LP_HDR_NUMELE_UNKNOWN) lpSetNumElements(lp,count);
+ return count;
+}
+
+/* Return the listpack element pointed by 'p'.
+ *
+ * The function changes behavior depending on the passed 'intbuf' value.
+ * Specifically, if 'intbuf' is NULL:
+ *
+ * If the element is internally encoded as an integer, the function returns
+ * NULL and populates the integer value by reference in 'count'. Otherwise if
+ * the element is encoded as a string a pointer to the string (pointing inside
+ * the listpack itself) is returned, and 'count' is set to the length of the
+ * string.
+ *
+ * If instead 'intbuf' points to a buffer passed by the caller, that must be
+ * at least LP_INTBUF_SIZE bytes, the function always returns the element as
+ * it was a string (returning the pointer to the string and setting the
+ * 'count' argument to the string length by reference). However if the element
+ * is encoded as an integer, the 'intbuf' buffer is used in order to store
+ * the string representation.
+ *
+ * The user should use one or the other form depending on what the value will
+ * be used for. If there is immediate usage for an integer value returned
+ * by the function, than to pass a buffer (and convert it back to a number)
+ * is of course useless.
+ *
+ * If the function is called against a badly encoded ziplist, so that there
+ * is no valid way to parse it, the function returns like if there was an
+ * integer encoded with value 12345678900000000 + <unrecognized byte>, this may
+ * be an hint to understand that something is wrong. To crash in this case is
+ * not sensible because of the different requirements of the application using
+ * this lib.
+ *
+ * Similarly, there is no error returned since the listpack normally can be
+ * assumed to be valid, so that would be a very high API cost. However a function
+ * in order to check the integrity of the listpack at load time is provided,
+ * check lpIsValid(). */
+unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
+ int64_t val;
+ uint64_t uval, negstart, negmax;
+
+ if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
+ negstart = UINT64_MAX; /* 7 bit ints are always positive. */
+ negmax = 0;
+ uval = p[0] & 0x7f;
+ } else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
+ *count = LP_ENCODING_6BIT_STR_LEN(p);
+ return p+1;
+ } else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
+ uval = ((p[0]&0x1f)<<8) | p[1];
+ negstart = (uint64_t)1<<12;
+ negmax = 8191;
+ } else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8;
+ negstart = (uint64_t)1<<15;
+ negmax = UINT16_MAX;
+ } else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8 |
+ (uint64_t)p[3]<<16;
+ negstart = (uint64_t)1<<23;
+ negmax = UINT32_MAX>>8;
+ } else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8 |
+ (uint64_t)p[3]<<16 |
+ (uint64_t)p[4]<<24;
+ negstart = (uint64_t)1<<31;
+ negmax = UINT32_MAX;
+ } else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
+ uval = (uint64_t)p[1] |
+ (uint64_t)p[2]<<8 |
+ (uint64_t)p[3]<<16 |
+ (uint64_t)p[4]<<24 |
+ (uint64_t)p[5]<<32 |
+ (uint64_t)p[6]<<40 |
+ (uint64_t)p[7]<<48 |
+ (uint64_t)p[8]<<56;
+ negstart = (uint64_t)1<<63;
+ negmax = UINT64_MAX;
+ } else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
+ *count = LP_ENCODING_12BIT_STR_LEN(p);
+ return p+2;
+ } else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
+ *count = LP_ENCODING_32BIT_STR_LEN(p);
+ return p+5;
+ } else {
+ uval = 12345678900000000ULL + p[0];
+ negstart = UINT64_MAX;
+ negmax = 0;
+ }
+
+ /* We reach this code path only for integer encodings.
+ * Convert the unsigned value to the signed one using two's complement
+ * rule. */
+ if (uval >= negstart) {
+ /* This three steps conversion should avoid undefined behaviors
+ * in the unsigned -> signed conversion. */
+ uval = negmax-uval;
+ val = uval;
+ val = -val-1;
+ } else {
+ val = uval;
+ }
+
+ /* Return the string representation of the integer or the value itself
+ * depending on intbuf being NULL or not. */
+ if (intbuf) {
+ *count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",val);
+ return intbuf;
+ } else {
+ *count = val;
+ return NULL;
+ }
+}
+
+/* Insert, delete or replace the specified element 'ele' of lenght 'len' at
+ * the specified position 'p', with 'p' being a listpack element pointer
+ * obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or
+ * lpSeek().
+ *
+ * The element is inserted before, after, or replaces the element pointed
+ * by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER
+ * or LP_REPLACE.
+ *
+ * If 'ele' is set to NULL, the function removes the element pointed by 'p'
+ * instead of inserting one.
+ *
+ * Returns NULL on out of memory or when the listpack total length would exceed
+ * the max allowed size of 2^32-1, otherwise the new pointer to the listpack
+ * holding the new element is returned (and the old pointer passed is no longer
+ * considered valid)
+ *
+ * If 'newp' is not NULL, at the end of a successful call '*newp' will be set
+ * to the address of the element just added, so that it will be possible to
+ * continue an interation with lpNext() and lpPrev().
+ *
+ * For deletion operations ('ele' set to NULL) 'newp' is set to the next
+ * element, on the right of the deleted one, or to NULL if the deleted element
+ * was the last one. */
+unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
+ unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
+ unsigned char backlen[LP_MAX_BACKLEN_SIZE];
+
+ uint64_t enclen; /* The length of the encoded element. */
+
+ /* An element pointer set to NULL means deletion, which is conceptually
+ * replacing the element with a zero-length element. So whatever we
+ * get passed as 'where', set it to LP_REPLACE. */
+ if (ele == NULL) where = LP_REPLACE;
+
+ /* If we need to insert after the current element, we just jump to the
+ * next element (that could be the EOF one) and handle the case of
+ * inserting before. So the function will actually deal with just two
+ * cases: LP_BEFORE and LP_REPLACE. */
+ if (where == LP_AFTER) {
+ p = lpSkip(p);
+ where = LP_BEFORE;
+ }
+
+ /* Store the offset of the element 'p', so that we can obtain its
+ * address again after a reallocation. */
+ unsigned long poff = p-lp;
+
+ /* Calling lpEncodeGetType() results into the encoded version of the
+ * element to be stored into 'intenc' in case it is representable as
+ * an integer: in that case, the function returns LP_ENCODING_INT.
+ * Otherwise if LP_ENCODING_STR is returned, we'll have to call
+ * lpEncodeString() to actually write the encoded string on place later.
+ *
+ * Whatever the returned encoding is, 'enclen' is populated with the
+ * length of the encoded element. */
+ int enctype;
+ if (ele) {
+ enctype = lpEncodeGetType(ele,size,intenc,&enclen);
+ } else {
+ enctype = -1;
+ enclen = 0;
+ }
+
+ /* We need to also encode the backward-parsable length of the element
+ * and append it to the end: this allows to traverse the listpack from
+ * the end to the start. */
+ unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
+ uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
+ uint32_t replaced_len = 0;
+ if (where == LP_REPLACE) {
+ replaced_len = lpCurrentEncodedSize(p);
+ replaced_len += lpEncodeBacklen(NULL,replaced_len);
+ }
+
+ uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
+ - replaced_len;
+ if (new_listpack_bytes > UINT32_MAX) return NULL;
+
+ /* We now need to reallocate in order to make space or shrink the
+ * allocation (in case 'when' value is LP_REPLACE and the new element is
+ * smaller). However we do that before memmoving the memory to
+ * make room for the new element if the final allocation will get
+ * larger, or we do it after if the final allocation will get smaller. */
+
+ unsigned char *dst = lp + poff; /* May be updated after reallocation. */
+
+ /* Realloc before: we need more room. */
+ if (new_listpack_bytes > old_listpack_bytes) {
+ if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
+ dst = lp + poff;
+ }
+
+ /* Setup the listpack relocating the elements to make the exact room
+ * we need to store the new one. */
+ if (where == LP_BEFORE) {
+ memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
+ } else { /* LP_REPLACE. */
+ long lendiff = (enclen+backlen_size)-replaced_len;
+ memmove(dst+replaced_len+lendiff,
+ dst+replaced_len,
+ old_listpack_bytes-poff-replaced_len);
+ }
+
+ /* Realloc after: we need to free space. */
+ if (new_listpack_bytes < old_listpack_bytes) {
+ if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
+ dst = lp + poff;
+ }
+
+ /* Store the entry. */
+ if (newp) {
+ *newp = dst;
+ /* In case of deletion, set 'newp' to NULL if the next element is
+ * the EOF element. */
+ if (!ele && dst[0] == LP_EOF) *newp = NULL;
+ }
+ if (ele) {
+ if (enctype == LP_ENCODING_INT) {
+ memcpy(dst,intenc,enclen);
+ } else {
+ lpEncodeString(dst,ele,size);
+ }
+ dst += enclen;
+ memcpy(dst,backlen,backlen_size);
+ dst += backlen_size;
+ }
+
+ /* Update header. */
+ if (where != LP_REPLACE || ele == NULL) {
+ uint32_t num_elements = lpGetNumElements(lp);
+ if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
+ if (ele)
+ lpSetNumElements(lp,num_elements+1);
+ else
+ lpSetNumElements(lp,num_elements-1);
+ }
+ }
+ lpSetTotalBytes(lp,new_listpack_bytes);
+ return lp;
+}
+
+/* Append the specified element 'ele' of lenght 'len' at the end of the
+ * listpack. It is implemented in terms of lpInsert(), so the return value is
+ * the same as lpInsert(). */
+unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) {
+ uint64_t listpack_bytes = lpGetTotalBytes(lp);
+ unsigned char *eofptr = lp + listpack_bytes - 1;
+ return lpInsert(lp,ele,size,eofptr,LP_BEFORE,NULL);
+}
+
+/* Remove the element pointed by 'p', and return the resulting listpack.
+ * If 'newp' is not NULL, the next element pointer (to the right of the
+ * deleted one) is returned by reference. If the deleted element was the
+ * last one, '*newp' is set to NULL. */
+unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {
+ return lpInsert(lp,NULL,0,p,LP_REPLACE,newp);
+}
+
+/* Return the total number of bytes the listpack is composed of. */
+uint32_t lpBytes(unsigned char *lp) {
+ return lpGetTotalBytes(lp);
+}
+
+/* Seek the specified element and returns the pointer to the seeked element.
+ * Positive indexes specify the zero-based element to seek from the head to
+ * the tail, negative indexes specify elements starting from the tail, where
+ * -1 means the last element, -2 the penultimate and so forth. If the index
+ * is out of range, NULL is returned. */
+unsigned char *lpSeek(unsigned char *lp, long index) {
+ int forward = 1; /* Seek forward by default. */
+
+ /* We want to seek from left to right or the other way around
+ * depending on the listpack length and the element position.
+ * However if the listpack length cannot be obtained in constant time,
+ * we always seek from left to right. */
+ uint32_t numele = lpGetNumElements(lp);
+ if (numele != LP_HDR_NUMELE_UNKNOWN) {
+ if (index < 0) index = (long)numele+index;
+ if (index < 0) return NULL; /* Index still < 0 means out of range. */
+ if (index >= numele) return NULL; /* Out of range the other side. */
+ /* We want to scan right-to-left if the element we are looking for
+ * is past the half of the listpack. */
+ if (index > numele/2) {
+ forward = 0;
+ /* Left to right scanning always expects a negative index. Convert
+ * our index to negative form. */
+ index -= numele;
+ }
+ } else {
+ /* If the listpack length is unspecified, for negative indexes we
+ * want to always scan left-to-right. */
+ if (index < 0) forward = 0;
+ }
+
+ /* Forward and backward scanning is trivially based on lpNext()/lpPrev(). */
+ if (forward) {
+ unsigned char *ele = lpFirst(lp);
+ while (index > 0 && ele) {
+ ele = lpNext(lp,ele);
+ index--;
+ }
+ return ele;
+ } else {
+ unsigned char *ele = lpLast(lp);
+ while (index < -1 && ele) {
+ ele = lpPrev(lp,ele);
+ index++;
+ }
+ return ele;
+ }
+}
+
diff --git a/src/listpack.h b/src/listpack.h
new file mode 100644
index 000000000..af67b4b41
--- /dev/null
+++ b/src/listpack.h
@@ -0,0 +1,61 @@
+/* Listpack -- A lists of strings serialization format
+ *
+ * This file implements the specification you can find at:
+ *
+ * https://github.com/antirez/listpack
+ *
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __LISTPACK_H
+#define __LISTPACK_H
+
+#include <stdint.h>
+
+#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
+
+/* lpInsert() where argument possible values: */
+#define LP_BEFORE 0
+#define LP_AFTER 1
+#define LP_REPLACE 2
+
+unsigned char *lpNew(void);
+void lpFree(unsigned char *lp);
+unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
+unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
+unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
+uint32_t lpLength(unsigned char *lp);
+unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf);
+unsigned char *lpFirst(unsigned char *lp);
+unsigned char *lpLast(unsigned char *lp);
+unsigned char *lpNext(unsigned char *lp, unsigned char *p);
+unsigned char *lpPrev(unsigned char *lp, unsigned char *p);
+uint32_t lpBytes(unsigned char *lp);
+unsigned char *lpSeek(unsigned char *lp, long index);
+
+#endif
diff --git a/src/listpack_malloc.h b/src/listpack_malloc.h
new file mode 100644
index 000000000..a3a077fcd
--- /dev/null
+++ b/src/listpack_malloc.h
@@ -0,0 +1,44 @@
+/* Listpack -- A lists of strings serialization format
+ * https://github.com/antirez/listpack
+ *
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* Allocator selection.
+ *
+ * This file is used in order to change the Rax allocator at compile time.
+ * Just define the following defines to what you want to use. Also add
+ * the include of your alternate allocator if needed (not needed in order
+ * to use the default libc allocator). */
+
+#ifndef LISTPACK_ALLOC_H
+#define LISTPACK_ALLOC_H
+#define lp_malloc malloc
+#define lp_realloc realloc
+#define lp_free free
+#endif
diff --git a/src/object.c b/src/object.c
index d2f8d53c5..8eeb5c6c1 100644
--- a/src/object.c
+++ b/src/object.c
@@ -232,6 +232,13 @@ robj *createZsetZiplistObject(void) {
return o;
}
+robj *createStreamObject(void) {
+ stream *s = streamNew();
+ robj *o = createObject(OBJ_STREAM,s);
+ o->encoding = OBJ_ENCODING_STREAM;
+ return o;
+}
+
robj *createModuleObject(moduleType *mt, void *value) {
moduleValue *mv = zmalloc(sizeof(*mv));
mv->type = mt;
diff --git a/src/rax.c b/src/rax.c
index dda008dff..b4f5ae05d 100644
--- a/src/rax.c
+++ b/src/rax.c
@@ -131,7 +131,7 @@ static inline void raxStackFree(raxStack *ts) {
}
/* ----------------------------------------------------------------------------
- * Radis tree implementation
+ * Radix tree implementation
* --------------------------------------------------------------------------*/
/* Allocate a new non compressed node with the specified number of children.
@@ -873,7 +873,8 @@ raxNode *raxRemoveChild(raxNode *parent, raxNode *child) {
memmove(((char*)cp)-1,cp,(parent->size-taillen-1)*sizeof(raxNode**));
/* Move the remaining "tail" pointer at the right position as well. */
- memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+parent->iskey*sizeof(void*));
+ size_t valuelen = (parent->iskey && !parent->isnull) ? sizeof(void*) : 0;
+ memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+valuelen);
/* 4. Update size. */
parent->size--;
@@ -1175,7 +1176,7 @@ void raxIteratorDelChars(raxIterator *it, size_t count) {
* The function returns 1 on success or 0 on out of memory. */
int raxIteratorNextStep(raxIterator *it, int noup) {
if (it->flags & RAX_ITER_EOF) {
- return 0;
+ return 1;
} else if (it->flags & RAX_ITER_JUST_SEEKED) {
it->flags &= ~RAX_ITER_JUST_SEEKED;
return 1;
@@ -1187,10 +1188,6 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
size_t orig_stack_items = it->stack.items;
raxNode *orig_node = it->node;
- /* Clear the EOF flag: it will be set again if the EOF condition
- * is still valid. */
- it->flags &= ~RAX_ITER_EOF;
-
while(1) {
int children = it->node->iscompr ? 1 : it->node->size;
if (!noup && children) {
@@ -1291,7 +1288,7 @@ int raxSeekGreatest(raxIterator *it) {
* effect to the one of raxIteratorPrevSte(). */
int raxIteratorPrevStep(raxIterator *it, int noup) {
if (it->flags & RAX_ITER_EOF) {
- return 0;
+ return 1;
} else if (it->flags & RAX_ITER_JUST_SEEKED) {
it->flags &= ~RAX_ITER_JUST_SEEKED;
return 1;
@@ -1412,6 +1409,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
it->node = it->rt->head;
if (!raxSeekGreatest(it)) return 0;
assert(it->node->iskey);
+ it->data = raxGetData(it->node);
return 1;
}
@@ -1430,6 +1428,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
/* We found our node, since the key matches and we have an
* "equal" condition. */
if (!raxIteratorAddChars(it,ele,len)) return 0; /* OOM. */
+ it->data = raxGetData(it->node);
} else if (lt || gt) {
/* Exact key not found or eq flag not set. We have to set as current
* key the one represented by the node we stopped at, and perform
@@ -1502,6 +1501,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
* the previous sub-tree. */
if (nodechar < keychar) {
if (!raxSeekGreatest(it)) return 0;
+ it->data = raxGetData(it->node);
} else {
if (!raxIteratorAddChars(it,it->node->data,it->node->size))
return 0;
@@ -1647,6 +1647,14 @@ void raxStop(raxIterator *it) {
raxStackFree(&it->stack);
}
+/* Return if the iterator is in an EOF state. This happens when raxSeek()
+ * failed to seek an appropriate element, so that raxNext() or raxPrev()
+ * will return zero, or when an EOF condition was reached while iterating
+ * with raxNext() and raxPrev(). */
+int raxEOF(raxIterator *it) {
+ return it->flags & RAX_ITER_EOF;
+}
+
/* ----------------------------- Introspection ------------------------------ */
/* This function is mostly used for debugging and learning purposes.
diff --git a/src/rax.h b/src/rax.h
index 6f91f4c1b..f6985c373 100644
--- a/src/rax.h
+++ b/src/rax.h
@@ -155,6 +155,7 @@ int raxPrev(raxIterator *it);
int raxRandomWalk(raxIterator *it, size_t steps);
int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key_len);
void raxStop(raxIterator *it);
+int raxEOF(raxIterator *it);
void raxShow(rax *rax);
#endif
diff --git a/src/rdb.h b/src/rdb.h
index 62a13f444..bf1150455 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -89,10 +89,11 @@
#define RDB_TYPE_ZSET_ZIPLIST 12
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
+#define RDB_TYPE_STREAM_LISTPACKS 15
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
-#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14))
+#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_AUX 250
diff --git a/src/server.c b/src/server.c
index 7498a25fd..2c3647db6 100644
--- a/src/server.c
+++ b/src/server.c
@@ -302,6 +302,8 @@ struct redisCommand redisCommandTable[] = {
{"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
+ {"xadd",xaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
+ {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
diff --git a/src/server.h b/src/server.h
index 11eb36f3d..38a76d008 100644
--- a/src/server.h
+++ b/src/server.h
@@ -59,6 +59,7 @@ typedef long long mstime_t; /* millisecond time type. */
#include "anet.h" /* Networking the easy way */
#include "ziplist.h" /* Compact list data structure */
#include "intset.h" /* Compact integer set structure */
+#include "stream.h" /* Stream data type header file. */
#include "version.h" /* Version macro */
#include "util.h" /* Misc functions useful in many places */
#include "latency.h" /* Latency monitor API */
@@ -451,6 +452,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define OBJ_SET 2
#define OBJ_ZSET 3
#define OBJ_HASH 4
+#define OBJ_STREAM 5
/* The "module" object type is a special one that signals that the object
* is one directly managed by a Redis module. In this case the value points
@@ -575,6 +577,7 @@ typedef struct RedisModuleDigest {
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
+#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
@@ -1414,6 +1417,9 @@ void handleClientsBlockedOnLists(void);
void popGenericCommand(client *c, int where);
void signalListAsReady(redisDb *db, robj *key);
+/* Stream data type. */
+stream *streamNew(void);
+
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);
void initClientMultiState(client *c);
@@ -1455,6 +1461,7 @@ robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
robj *createZsetZiplistObject(void);
+robj *createStreamObject(void);
robj *createModuleObject(moduleType *mt, void *value);
int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
int checkType(client *c, robj *o, int type);
@@ -1992,6 +1999,8 @@ void pfdebugCommand(client *c);
void latencyCommand(client *c);
void moduleCommand(client *c);
void securityWarningCommand(client *c);
+void xaddCommand(client *c);
+void xrangeCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/src/stream.h b/src/stream.h
new file mode 100644
index 000000000..065c328eb
--- /dev/null
+++ b/src/stream.h
@@ -0,0 +1,21 @@
+#ifndef STREAM_H
+#define STREAM_H
+
+#include "rax.h"
+
+/* Stream item ID: a 128 bit number composed of a milliseconds time and
+ * a sequence counter. IDs generated in the same millisecond (or in a past
+ * millisecond if the clock jumped backward) will use the millisecond time
+ * of the latest generated ID and an incremented sequence. */
+typedef struct streamID {
+ uint64_t ms; /* Unix time in milliseconds. */
+ uint64_t seq; /* Sequence number. */
+} streamID;
+
+typedef struct stream {
+ rax *rax; /* The radix tree holding the stream. */
+ uint64_t length; /* Number of elements inside this stream. */
+ streamID last_id; /* Zero if there are yet no items. */
+} stream;
+
+#endif
diff --git a/src/t_stream.c b/src/t_stream.c
new file mode 100644
index 000000000..c64f5059f
--- /dev/null
+++ b/src/t_stream.c
@@ -0,0 +1,376 @@
+/*
+ * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* TODO:
+ * - After loading a stream, populate the last ID.
+ */
+
+#include "server.h"
+#include "listpack.h"
+#include "endianconv.h"
+#include "stream.h"
+
+#define STREAM_BYTES_PER_LISTPACK 4096
+
+/* -----------------------------------------------------------------------
+ * Low level stream encoding: a radix tree of listpacks.
+ * ----------------------------------------------------------------------- */
+
+/* Create a new stream data structure. */
+stream *streamNew(void) {
+ stream *s = zmalloc(sizeof(*s));
+ s->rax = raxNew();
+ s->length = 0;
+ s->last_id.ms = 0;
+ s->last_id.seq = 0;
+ return s;
+}
+
+/* Generate the next stream item ID given the previous one. If the current
+ * milliseconds Unix time is greater than the previous one, just use this
+ * as time part and start with sequence part of zero. Otherwise we use the
+ * previous time (and never go backward) and increment the sequence. */
+void streamNextID(streamID *last_id, streamID *new_id) {
+ uint64_t ms = mstime();
+ if (ms > last_id->ms) {
+ new_id->ms = ms;
+ new_id->seq = 0;
+ } else {
+ new_id->ms = last_id->ms;
+ new_id->seq = last_id->seq+1;
+ }
+}
+
+/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
+ * instead of a string. */
+unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
+ char buf[LONG_STR_SIZE];
+ int slen = ll2string(buf,sizeof(buf),value);
+ return lpAppend(lp,(unsigned char*)buf,slen);
+}
+
+/* This is a wrapper function for lpGet() to directly get an integer value
+ * from the listpack (that may store numbers as a string), converting
+ * the string if needed. */
+int64_t lpGetInteger(unsigned char *ele) {
+ int64_t v;
+ unsigned char *e = lpGet(ele,&v,NULL);
+ if (e == NULL) return v;
+ /* The following code path should never be used for how listpacks work:
+ * they should always be able to store an int64_t value in integer
+ * encoded form. However the implementation may change. */
+ int retval = string2ll((char*)e,v,&v);
+ serverAssert(retval != 0);
+ return v;
+}
+
+/* Convert the specified stream entry ID as a 128 bit big endian number, so
+ * that the IDs can be sorted lexicographically. */
+void streamEncodeID(void *buf, streamID *id) {
+ uint64_t e[2];
+ e[0] = htonu64(id->ms);
+ e[1] = htonu64(id->seq);
+ memcpy(buf,e,sizeof(e));
+}
+
+/* This is the reverse of streamEncodeID(): the decoded ID will be stored
+ * in the 'id' structure passed by reference. The buffer 'buf' must point
+ * to a 128 bit big-endian encoded ID. */
+void streamDecodeID(void *buf, streamID *id) {
+ uint64_t e[2];
+ memcpy(e,buf,sizeof(e));
+ id->ms = ntohu64(e[0]);
+ id->seq = ntohu64(e[1]);
+}
+
+/* Adds a new item into the stream 's' having the specified number of
+ * field-value pairs as specified in 'numfields' and stored into 'argv'.
+ * Returns the new entry ID populating the 'added_id' structure. */
+void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) {
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+ raxSeek(&ri,"$",NULL,0);
+
+ size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
+ unsigned char *lp = NULL; /* Tail listpack pointer. */
+
+ /* Get a reference to the tail node listpack. */
+ if (raxNext(&ri)) {
+ lp = ri.data;
+ lp_bytes = lpBytes(lp);
+ }
+ raxStop(&ri);
+
+ /* Generate the new entry ID. */
+ streamID id;
+ streamNextID(&s->last_id,&id);
+
+ /* We have to add the key into the radix tree in lexicographic order,
+ * to do so we consider the ID as a single 128 bit number written in
+ * big endian, so that the most significant bytes are the first ones. */
+ uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
+ uint64_t entry_id[2]; /* Entry ID of the new item as 128 bit string. */
+ streamEncodeID(entry_id,&id);
+
+ /* Create a new listpack and radix tree node if needed. */
+ if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
+ lp = lpNew();
+ rax_key[0] = entry_id[0];
+ rax_key[1] = entry_id[1];
+ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ } else {
+ serverAssert(ri.key_len == sizeof(rax_key));
+ memcpy(rax_key,ri.key,sizeof(rax_key));
+ }
+
+ /* Populate the listpack with the new entry. */
+ lp = lpAppend(lp,(unsigned char*)entry_id,sizeof(entry_id));
+ lp = lpAppendInteger(lp,numfields);
+ for (int i = 0; i < numfields; i++) {
+ sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
+ lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
+ lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
+ }
+
+ /* Insert back into the tree in order to update the listpack pointer. */
+ raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
+ s->length++;
+ s->last_id = id;
+ if (added_id) *added_id = id;
+ raxShow(s->rax);
+}
+
+/* Send the specified range to the client 'c'. The range the client will
+ * receive is between start and end inclusive, if 'count' is non zero, no more
+ * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
+ * we want all the elements from 'start' till the end of the stream. */
+size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
+ void *arraylen_ptr = addDeferredMultiBulkLength(c);
+ size_t arraylen = 0;
+
+ /* Seek the radix tree node that contains our start item. */
+ uint64_t key[2];
+ uint64_t end_key[2];
+ streamEncodeID(key,start);
+ if (end) streamEncodeID(end_key,end);
+ raxIterator ri;
+ raxStart(&ri,s->rax);
+
+ /* Seek the correct node in the radix tree. */
+ if (start->ms || start->seq) {
+ raxSeek(&ri,"<=",(unsigned char*)key,sizeof(key));
+ if (raxEOF(&ri)) raxSeek(&ri,">",(unsigned char*)key,sizeof(key));
+ } else {
+ raxSeek(&ri,"^",NULL,0);
+ }
+
+ /* For every radix tree node, iterate the corresponding listpack,
+ * returning elmeents when they are within range. */
+ while (raxNext(&ri)) {
+ serverAssert(ri.key_len == sizeof(key));
+ unsigned char *lp = ri.data;
+ unsigned char *lp_ele = lpFirst(lp);
+ while(lp_ele) {
+ int64_t e_len;
+ unsigned char buf[LP_INTBUF_SIZE];
+ unsigned char *e = lpGet(lp_ele,&e_len,buf);
+ serverAssert(e_len == sizeof(streamID));
+
+ /* Seek next field: number of elements. */
+ lp_ele = lpNext(lp,lp_ele);
+ if (memcmp(e,key,sizeof(key)) >= 0) { /* If current >= start */
+ if (end && memcmp(e,end_key,sizeof(key)) > 0) {
+ break; /* We are already out of range. */
+ }
+ streamID thisid;
+ streamDecodeID(e,&thisid);
+ sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",
+ thisid.ms,thisid.seq);
+
+ /* Emit this stream entry in the client output. */
+ addReplyMultiBulkLen(c,2);
+ addReplySds(c,replyid);
+ int64_t numfields = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp,lp_ele);
+ addReplyMultiBulkLen(c,numfields*2);
+ for (int64_t i = 0; i < numfields; i++) {
+ /* Emit two items (key-value) per iteration. */
+ for (int k = 0; k < 2; k++) {
+ e = lpGet(lp_ele,&e_len,buf);
+ addReplyBulkCBuffer(c,e,e_len);
+ lp_ele = lpNext(lp,lp_ele);
+ }
+ }
+
+ arraylen++;
+ if (count && count == arraylen) break;
+ } else {
+ /* If we do not emit, we have to discard. */
+ int64_t numfields = lpGetInteger(lp_ele);
+ lp_ele = lpNext(lp,lp_ele);
+ for (int64_t i = 0; i < numfields*2; i++)
+ lp_ele = lpNext(lp,lp_ele);
+ }
+ }
+ if (count && count == arraylen) break;
+ }
+ raxStop(&ri);
+ setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
+ return arraylen;
+}
+
+/* -----------------------------------------------------------------------
+ * Stream commands implementation
+ * ----------------------------------------------------------------------- */
+
+/* Look the stream at 'key' and return the corresponding stream object.
+ * The function creates a key setting it to an empty stream if needed. */
+robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
+ robj *o = lookupKeyWrite(c->db,key);
+ if (o == NULL) {
+ o = createStreamObject();
+ dbAdd(c->db,key,o);
+ } else {
+ if (o->type != OBJ_STREAM) {
+ addReply(c,shared.wrongtypeerr);
+ return NULL;
+ }
+ }
+ return o;
+}
+
+/* Helper function to convert a string to an unsigned long long value.
+ * The function attempts to use the faster string2ll() function inside
+ * Redis: if it fails, strtoull() is used instead. The function returns
+ * 1 if the conversion happened successfully or 0 if the number is
+ * invalid or out of range. */
+int string2ull(const char *s, unsigned long long *value) {
+ long long ll;
+ if (string2ll(s,strlen(s),&ll)) {
+ if (ll < 0) return 0; /* Negative values are out of range. */
+ *value = ll;
+ return 1;
+ }
+ errno = 0;
+ *value = strtoull(s,NULL,10);
+ if (errno == EINVAL || errno == ERANGE) return 0; /* strtoull() failed. */
+ return 1; /* Conversion done! */
+}
+
+/* Parse a stream ID in the format given by clients to Redis, that is
+ * <ms>.<seq>, and converts it into a streamID structure. If
+ * the specified ID is invalid C_ERR is returned and an error is reported
+ * to the client, otherwise C_OK is returned. The ID may be in incomplete
+ * form, just stating the milliseconds time part of the stream. In such a case
+ * the missing part is set according to the value of 'missing_seq' parameter.
+ * The IDs "-" and "+" specify respectively the minimum and maximum IDs
+ * that can be represented. */
+int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
+ char buf[128];
+ if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
+ memcpy(buf,o->ptr,sdslen(o->ptr)+1);
+
+ /* Handle the "-" and "+" special cases. */
+ if (buf[0] == '-' && buf[1] == '\0') {
+ id->ms = 0;
+ id->seq = 0;
+ return C_OK;
+ } else if (buf[0] == '+' && buf[1] == '\0') {
+ id->ms = UINT64_MAX;
+ id->seq = UINT64_MAX;
+ return C_OK;
+ }
+
+ /* Parse <ms>.<seq> form. */
+ char *dot = strchr(buf,'.');
+ if (dot) *dot = '\0';
+ uint64_t ms, seq;
+ if (string2ull(buf,&ms) == 0) goto invalid;
+ if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
+ if (!dot) seq = missing_seq;
+ id->ms = ms;
+ id->seq = seq;
+ return C_OK;
+
+invalid:
+ addReplyError(c,"Invalid stream ID specified as stream command argument");
+ return C_ERR;
+}
+
+/* XADD key [field value] [field value] ... */
+void xaddCommand(client *c) {
+ if ((c->argc % 2) == 1) {
+ addReplyError(c,"wrong number of arguments for XADD");
+ return;
+ }
+
+ /* Lookup the stream at key. */
+ robj *o;
+ stream *s;
+ if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
+ s = o->ptr;
+
+ /* Append using the low level function and return the ID. */
+ streamID id;
+ streamAppendItem(s,c->argv+2,(c->argc-2)/2,&id);
+ sds reply = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq);
+ addReplySds(c,reply);
+
+ signalModifiedKey(c->db,c->argv[1]);
+ notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id);
+ server.dirty++;
+}
+
+/* XRANGE key start end [COUNT <n>] */
+void xrangeCommand(client *c) {
+ robj *o;
+ stream *s;
+ streamID startid, endid;
+ long long count = 0;
+
+ if (streamParseIDOrReply(c,c->argv[2],&startid,0) == C_ERR) return;
+ if (streamParseIDOrReply(c,c->argv[3],&endid,UINT64_MAX) == C_ERR) return;
+
+ /* Parse the COUNT option if any. */
+ if (c->argc > 4) {
+ if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) {
+ if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK)
+ return;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* Return the specified range to the user. */
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
+ || checkType(c,o,OBJ_STREAM)) return;
+ s = o->ptr;
+ streamReplyWithRange(c,s,&startid,&endid,count);
+}