summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorperryitay <85821686+perryitay@users.noreply.github.com>2021-11-03 20:47:18 +0200
committerGitHub <noreply@github.com>2021-11-03 20:47:18 +0200
commitf27083a4a8a6682e391a533724c904c69852c0a0 (patch)
tree74a12c1cbb3f7885ef5275a96227b12925efc114 /src
parentf11a2d4dd764c996b2d0c0cb5abde13f2445b40c (diff)
downloadredis-f27083a4a8a6682e391a533724c904c69852c0a0.tar.gz
Add support for list type to store elements larger than 4GB (#9357)
Redis lists are stored in quicklist, which is currently a linked list of ziplists. Ziplists are limited to storing elements no larger than 4GB, so when bigger items are added they're getting truncated. This PR changes quicklists so that they're capable of storing large items in quicklist nodes that are plain string buffers rather than ziplist. As part of the PR there were few other changes in redis: 1. new DEBUG sub-commands: - QUICKLIST-PACKED-THRESHOLD - set the threshold of for the node type to be plan or ziplist. default (1GB) - QUICKLIST <key> - Shows low level info about the quicklist encoding of <key> 2. rdb format change: - A new type was added - RDB_TYPE_LIST_QUICKLIST_2 . - container type (packed / plain) was added to the beginning of the rdb object (before the actual node list). 3. testing: - Tests that requires over 100MB will be by default skipped. a new flag was added to 'runtest' to run the large memory tests (not used by default) Co-authored-by: sundb <sundbcn@gmail.com> Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src')
-rw-r--r--src/debug.c33
-rw-r--r--src/defrag.c4
-rw-r--r--src/object.c6
-rw-r--r--src/quicklist.c490
-rw-r--r--src/quicklist.h31
-rw-r--r--src/rdb.c47
-rw-r--r--src/rdb.h3
-rw-r--r--src/redis-check-rdb.c3
-rw-r--r--src/t_list.c37
9 files changed, 476 insertions, 178 deletions
diff --git a/src/debug.c b/src/debug.c
index 114b1bd92..77b145b61 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -29,9 +29,11 @@
*/
#include "server.h"
+#include "util.h"
#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */
#include "crc64.h"
#include "bio.h"
+#include "quicklist.h"
#include <arpa/inet.h>
#include <signal.h>
@@ -459,6 +461,9 @@ void debugCommand(client *c) {
" Setting it to 0 disables expiring keys in background when they are not",
" accessed (otherwise the Redis behavior). Setting it to 1 reenables back the",
" default.",
+"QUICKLIST-PACKED-THRESHOLD <size>",
+" Sets the threshold for elements to be inserted as plain vs packed nodes",
+" Default value is 1GB, allows values up to 4GB",
"SET-SKIP-CHECKSUM-VALIDATION <0|1>",
" Enables or disables checksum checks for RDB files and RESTORE's payload.",
"SLEEP <seconds>",
@@ -469,6 +474,9 @@ void debugCommand(client *c) {
" Return the size of different Redis core C structures.",
"ZIPLIST <key>",
" Show low level info about the ziplist encoding of <key>.",
+"QUICKLIST <key> [<0|1>]",
+" Show low level info about the quicklist encoding of <key>."
+" The optional argument (0 by default) sets the level of detail",
"CLIENT-EVICTION",
" Show low level client eviction pools info (maxmemory-clients).",
"PAUSE-CRON <0|1>",
@@ -652,6 +660,21 @@ NULL
ziplistRepr(o->ptr);
addReplyStatus(c,"Ziplist structure printed on stdout");
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"quicklist") && (c->argc == 3 || c->argc == 4)) {
+ robj *o;
+
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr))
+ == NULL) return;
+
+ int full = 0;
+ if (c->argc == 4)
+ full = atoi(c->argv[3]->ptr);
+ if (o->encoding != OBJ_ENCODING_QUICKLIST) {
+ addReplyError(c,"Not a quicklist encoded object.");
+ } else {
+ quicklistRepr(o->ptr, full);
+ addReplyStatus(c,"Quicklist structure printed on stdout");
+ }
} else if (!strcasecmp(c->argv[1]->ptr,"populate") &&
c->argc >= 3 && c->argc <= 5) {
long keys, j;
@@ -782,6 +805,16 @@ NULL
{
server.active_expire_enabled = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"quicklist-packed-threshold") &&
+ c->argc == 3)
+ {
+ int memerr;
+ unsigned long long sz = memtoull((const char *)c->argv[2]->ptr, &memerr);
+ if (memerr || !quicklistisSetPackedThreshold(sz)) {
+ addReplyError(c, "argument must be a memory value bigger then 1 and smaller than 4gb");
+ } else {
+ addReply(c,shared.ok);
+ }
} else if (!strcasecmp(c->argv[1]->ptr,"set-skip-checksum-validation") &&
c->argc == 3)
{
diff --git a/src/defrag.c b/src/defrag.c
index fe2a0585e..ad5e35bb5 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -416,8 +416,8 @@ long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
*node_ref = node = newnode;
defragged++;
}
- if ((newzl = activeDefragAlloc(node->zl)))
- defragged++, node->zl = newzl;
+ if ((newzl = activeDefragAlloc(node->entry)))
+ defragged++, node->entry = newzl;
return defragged;
}
diff --git a/src/object.c b/src/object.c
index c2d917ae4..0ef41f065 100644
--- a/src/object.c
+++ b/src/object.c
@@ -415,9 +415,9 @@ void dismissListObject(robj *o, size_t size_hint) {
quicklistNode *node = ql->head;
while (node) {
if (quicklistNodeIsCompressed(node)) {
- dismissMemory(node->zl, ((quicklistLZF*)node->zl)->sz);
+ dismissMemory(node->entry, ((quicklistLZF*)node->entry)->sz);
} else {
- dismissMemory(node->zl, node->sz);
+ dismissMemory(node->entry, node->sz);
}
node = node->next;
}
@@ -1000,7 +1000,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
quicklistNode *node = ql->head;
asize = sizeof(*o)+sizeof(quicklist);
do {
- elesize += sizeof(quicklistNode)+zmalloc_size(node->zl);
+ elesize += sizeof(quicklistNode)+zmalloc_size(node->entry);
samples++;
} while ((node = node->next) && samples < sample_size);
asize += (double)elesize/samples*ql->len;
diff --git a/src/quicklist.c b/src/quicklist.c
index 93442ecfb..d8a940112 100644
--- a/src/quicklist.c
+++ b/src/quicklist.c
@@ -28,6 +28,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+#include <stdio.h>
#include <string.h> /* for memcpy */
#include "quicklist.h"
#include "zmalloc.h"
@@ -37,10 +38,6 @@
#include "lzf.h"
#include "redisassert.h"
-#if defined(REDIS_TEST) || defined(REDIS_TEST_VERBOSE)
-#include <stdio.h> /* for printf (debug printing), snprintf (genstr) */
-#endif
-
#ifndef REDIS_STATIC
#define REDIS_STATIC static
#endif
@@ -50,6 +47,21 @@
* just one byte, it still won't overflow the 16 bit count field. */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
+/* packed_threshold is initialized to 1gb*/
+static size_t packed_threshold = (1 << 30);
+
+/* set threshold for PLAIN nodes, the real limit is 4gb */
+#define isLargeElement(size) ((size) >= packed_threshold)
+
+int quicklistisSetPackedThreshold(size_t sz) {
+ /* Don't allow threshold to be set above or even slightly below 4GB */
+ if (sz > (1ull<<32) - (1<<20)) {
+ return 0;
+ }
+ packed_threshold = sz;
+ return 1;
+}
+
/* Maximum size in bytes of any multi-element ziplist.
* Larger values will live in their own isolated ziplists.
* This is used only if we're limited by record count. when we're limited by
@@ -144,7 +156,7 @@ quicklist *quicklistNew(int fill, int compress) {
REDIS_STATIC quicklistNode *quicklistCreateNode(void) {
quicklistNode *node;
node = zmalloc(sizeof(*node));
- node->zl = NULL;
+ node->entry = NULL;
node->count = 0;
node->sz = 0;
node->next = node->prev = NULL;
@@ -167,7 +179,7 @@ void quicklistRelease(quicklist *quicklist) {
while (len--) {
next = current->next;
- zfree(current->zl);
+ zfree(current->entry);
quicklist->count -= current->count;
zfree(current);
@@ -194,7 +206,7 @@ REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz);
/* Cancel if compression fails or doesn't compress small enough */
- if (((lzf->sz = lzf_compress(node->zl, node->sz, lzf->compressed,
+ if (((lzf->sz = lzf_compress(node->entry, node->sz, lzf->compressed,
node->sz)) == 0) ||
lzf->sz + MIN_COMPRESS_IMPROVE >= node->sz) {
/* lzf_compress aborts/rejects compression if value not compressible. */
@@ -202,8 +214,8 @@ REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
return 0;
}
lzf = zrealloc(lzf, sizeof(*lzf) + lzf->sz);
- zfree(node->zl);
- node->zl = (unsigned char *)lzf;
+ zfree(node->entry);
+ node->entry = (unsigned char *)lzf;
node->encoding = QUICKLIST_NODE_ENCODING_LZF;
node->recompress = 0;
return 1;
@@ -225,14 +237,14 @@ REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) {
#endif
void *decompressed = zmalloc(node->sz);
- quicklistLZF *lzf = (quicklistLZF *)node->zl;
+ quicklistLZF *lzf = (quicklistLZF *)node->entry;
if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) {
/* Someone requested decompress, but we can't decompress. Not good. */
zfree(decompressed);
return 0;
}
zfree(lzf);
- node->zl = decompressed;
+ node->entry = decompressed;
node->encoding = QUICKLIST_NODE_ENCODING_RAW;
return 1;
}
@@ -258,11 +270,41 @@ REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) {
* Pointer to LZF data is assigned to '*data'.
* Return value is the length of compressed LZF data. */
size_t quicklistGetLzf(const quicklistNode *node, void **data) {
- quicklistLZF *lzf = (quicklistLZF *)node->zl;
+ quicklistLZF *lzf = (quicklistLZF *)node->entry;
*data = lzf->compressed;
return lzf->sz;
}
+void quicklistRepr(unsigned char *ql, int full) {
+ int i = 0;
+ quicklist *quicklist = (struct quicklist*) ql;
+ printf("{count : %ld}\n", quicklist->count);
+ printf("{len : %ld}\n", quicklist->len);
+ printf("{fill : %d}\n", quicklist->fill);
+ printf("{compress : %d}\n", quicklist->compress);
+ printf("{bookmark_count : %d}\n", quicklist->bookmark_count);
+ quicklistNode* node = quicklist->head;
+
+ while(node != NULL) {
+ printf("{quicklist node(%d)\n", i++);
+ printf("{container : %s, encoding: %s, size: %zu}\n", QL_NODE_IS_PLAIN(node) ? "PLAIN": "PACKED",
+ (node->encoding == QUICKLIST_NODE_ENCODING_RAW) ? "RAW": "LZF", node->sz);
+
+ if (full) {
+ if (node->container == QUICKLIST_NODE_CONTAINER_ZIPLIST) {
+ printf("{ ziplist:\n");
+ ziplistRepr(node->entry);
+ printf("}\n");
+
+ } else if (QL_NODE_IS_PLAIN(node)) {
+ printf("{ entry : %s }\n", node->entry);
+ }
+ printf("}\n");
+ }
+ node = node->next;
+ }
+}
+
#define quicklistAllowsCompression(_ql) ((_ql)->compress != 0)
/* Force 'quicklist' to meet compression guidelines set by compress depth.
@@ -430,6 +472,9 @@ REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
if (unlikely(!node))
return 0;
+ if (unlikely(QL_NODE_IS_PLAIN(node) || isLargeElement(sz)))
+ return 0;
+
int ziplist_overhead;
/* size of previous offset */
if (sz < 254)
@@ -465,6 +510,9 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a,
if (!a || !b)
return 0;
+ if (unlikely(QL_NODE_IS_PLAIN(a) || QL_NODE_IS_PLAIN(b)))
+ return 0;
+
/* approximate merged ziplist size (- 11 to remove one ziplist
* header/trailer) */
unsigned int merge_sz = a->sz + b->sz - 11;
@@ -482,24 +530,45 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a,
#define quicklistNodeUpdateSz(node) \
do { \
- (node)->sz = ziplistBlobLen((node)->zl); \
+ (node)->sz = ziplistBlobLen((node)->entry); \
} while (0)
+static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) {
+ quicklistNode *new_node = quicklistCreateNode();
+ new_node->entry = zmalloc(sz);
+ new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN;
+ memcpy(new_node->entry, value, sz);
+ new_node->sz = sz;
+ new_node->count++;
+ return new_node;
+}
+
+static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node,
+ void *value, size_t sz, int after) {
+ __quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after);
+ quicklist->count++;
+}
+
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
- assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
+
+ if (unlikely(isLargeElement(sz))) {
+ __quicklistInsertPlainNode(quicklist, quicklist->head, value, sz, 0);
+ return 1;
+ }
+
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
- quicklist->head->zl =
- ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
+ quicklist->head->entry =
+ ziplistPush(quicklist->head->entry, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();
- node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
+ node->entry = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
@@ -515,15 +584,19 @@ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
* Returns 1 if new tail created. */
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_tail = quicklist->tail;
- assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
+ if (unlikely(isLargeElement(sz))) {
+ __quicklistInsertPlainNode(quicklist, quicklist->tail, value, sz, 1);
+ return 1;
+ }
+
if (likely(
_quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
- quicklist->tail->zl =
- ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
+ quicklist->tail->entry =
+ ziplistPush(quicklist->tail->entry, value, sz, ZIPLIST_TAIL);
quicklistNodeUpdateSz(quicklist->tail);
} else {
quicklistNode *node = quicklistCreateNode();
- node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
+ node->entry = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
@@ -539,14 +612,22 @@ int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
void quicklistAppendZiplist(quicklist *quicklist, unsigned char *zl) {
quicklistNode *node = quicklistCreateNode();
- node->zl = zl;
- node->count = ziplistLen(node->zl);
+ node->entry = zl;
+ node->count = ziplistLen(node->entry);
node->sz = ziplistBlobLen(zl);
_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
quicklist->count += node->count;
}
+/* Create new node consisting of a pre-formed plain node.
+ * Used for loading RDBs where entire plain node has been stored
+ * to be retrieved later.
+ * data - the data to add (pointer becomes the responsibility of quicklist) */
+void quicklistAppendPlainNode(quicklist *quicklist, unsigned char *data, size_t sz) {
+ __quicklistInsertPlainNode(quicklist, quicklist->tail, data, sz, 1);
+}
+
/* Append all values of ziplist 'zl' individually into 'quicklist'.
*
* This allows us to restore old RDB ziplists into new quicklists
@@ -622,7 +703,7 @@ REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
* now have compressed nodes needing to be decompressed. */
__quicklistCompress(quicklist, NULL);
- zfree(node->zl);
+ zfree(node->entry);
zfree(node);
}
@@ -638,7 +719,11 @@ REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
- node->zl = ziplistDelete(node->zl, p);
+ if (unlikely(QL_NODE_IS_PLAIN(node))) {
+ __quicklistDelNode(quicklist, node);
+ return 1;
+ }
+ node->entry = ziplistDelete(node->entry, p);
node->count--;
if (node->count == 0) {
gone = 1;
@@ -686,11 +771,33 @@ void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
/* Replace quicklist entry by 'data' with length 'sz'. */
void quicklistReplaceEntry(quicklist *quicklist, quicklistEntry *entry,
- void *data, int sz) {
- /* quicklistNext() and quicklistIndex() provide an uncompressed node */
- entry->node->zl = ziplistReplace(entry->node->zl, entry->zi, data, sz);
- quicklistNodeUpdateSz(entry->node);
- quicklistCompress(quicklist, entry->node);
+ void *data, size_t sz) {
+ if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) {
+ entry->node->entry = ziplistReplace(entry->node->entry, entry->zi, data, sz);
+ quicklistNodeUpdateSz(entry->node);
+ /* quicklistNext() and quicklistIndex() provide an uncompressed node */
+ quicklistCompress(quicklist, entry->node);
+ } else if (QL_NODE_IS_PLAIN(entry->node)) {
+ if (isLargeElement(sz)) {
+ zfree(entry->node->entry);
+ entry->node->entry = zmalloc(sz);
+ entry->node->sz = sz;
+ memcpy(entry->node->entry, data, sz);
+ quicklistCompress(quicklist, entry->node);
+ } else {
+ quicklistInsertAfter(quicklist, entry, data, sz);
+ __quicklistDelNode(quicklist, entry->node);
+ }
+ } else {
+ quicklistInsertAfter(quicklist, entry, data, sz);
+ if (entry->node->count == 1)
+ __quicklistDelNode(quicklist, entry->node);
+ else {
+ unsigned char *p = ziplistIndex(entry->node->entry, -1);
+ quicklistDelIndex(quicklist, entry->node, &p);
+ quicklistCompress(quicklist, entry->node->next);
+ }
+ }
}
/* Replace quicklist entry at offset 'index' by 'data' with length 'sz'.
@@ -698,7 +805,7 @@ void quicklistReplaceEntry(quicklist *quicklist, quicklistEntry *entry,
* Returns 1 if replace happened.
* Returns 0 if replace failed and no changes happened. */
int quicklistReplaceAtIndex(quicklist *quicklist, long index, void *data,
- int sz) {
+ size_t sz) {
quicklistEntry entry;
if (likely(quicklistIndex(quicklist, index, &entry))) {
quicklistReplaceEntry(quicklist, &entry, data, sz);
@@ -728,17 +835,17 @@ REDIS_STATIC quicklistNode *_quicklistZiplistMerge(quicklist *quicklist,
quicklistDecompressNode(a);
quicklistDecompressNode(b);
- if ((ziplistMerge(&a->zl, &b->zl))) {
+ if ((ziplistMerge(&a->entry, &b->entry))) {
/* We merged ziplists! Now remove the unused quicklistNode. */
quicklistNode *keep = NULL, *nokeep = NULL;
- if (!a->zl) {
+ if (!a->entry) {
nokeep = a;
keep = b;
- } else if (!b->zl) {
+ } else if (!b->entry) {
nokeep = b;
keep = a;
}
- keep->count = ziplistLen(keep->zl);
+ keep->count = ziplistLen(keep->entry);
quicklistNodeUpdateSz(keep);
nokeep->count = 0;
@@ -828,10 +935,10 @@ REDIS_STATIC quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset,
size_t zl_sz = node->sz;
quicklistNode *new_node = quicklistCreateNode();
- new_node->zl = zmalloc(zl_sz);
+ new_node->entry = zmalloc(zl_sz);
/* Copy original ziplist so we can split it */
- memcpy(new_node->zl, node->zl, zl_sz);
+ memcpy(new_node->entry, node->entry, zl_sz);
/* Ranges to be trimmed: -1 here means "continue deleting until the list ends" */
int orig_start = after ? offset + 1 : 0;
@@ -842,12 +949,12 @@ REDIS_STATIC quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset,
D("After %d (%d); ranges: [%d, %d], [%d, %d]", after, offset, orig_start,
orig_extent, new_start, new_extent);
- node->zl = ziplistDeleteRange(node->zl, orig_start, orig_extent);
- node->count = ziplistLen(node->zl);
+ node->entry = ziplistDeleteRange(node->entry, orig_start, orig_extent);
+ node->count = ziplistLen(node->entry);
quicklistNodeUpdateSz(node);
- new_node->zl = ziplistDeleteRange(new_node->zl, new_start, new_extent);
- new_node->count = ziplistLen(new_node->zl);
+ new_node->entry = ziplistDeleteRange(new_node->entry, new_start, new_extent);
+ new_node->count = ziplistLen(new_node->entry);
quicklistNodeUpdateSz(new_node);
D("After split lengths: orig (%d), new (%d)", node->count, new_node->count);
@@ -864,13 +971,16 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
- assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */
if (!node) {
/* we have no reference node, so let's create only node in the list */
D("No node given!");
+ if (unlikely(isLargeElement(sz))) {
+ __quicklistInsertPlainNode(quicklist, quicklist->tail, value, sz, after);
+ return;
+ }
new_node = quicklistCreateNode();
- new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
+ new_node->entry = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
@@ -902,15 +1012,29 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
}
}
+ if (unlikely(isLargeElement(sz))) {
+ if (QL_NODE_IS_PLAIN(node) || (at_tail && after) || (at_head && !after)) {
+ __quicklistInsertPlainNode(quicklist, node, value, sz, after);
+ } else {
+ quicklistDecompressNodeForUse(node);
+ new_node = _quicklistSplitNode(node, entry->offset, after);
+ quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz);
+ __quicklistInsertNode(quicklist, node, entry_node, after);
+ __quicklistInsertNode(quicklist, entry_node, new_node, after);
+ quicklist->count++;
+ }
+ return;
+ }
+
/* Now determine where and how to insert the new element */
if (!full && after) {
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node);
- unsigned char *next = ziplistNext(node->zl, entry->zi);
+ unsigned char *next = ziplistNext(node->entry, entry->zi);
if (next == NULL) {
- node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
+ node->entry = ziplistPush(node->entry, value, sz, ZIPLIST_TAIL);
} else {
- node->zl = ziplistInsert(node->zl, next, value, sz);
+ node->entry = ziplistInsert(node->entry, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
@@ -918,7 +1042,7 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
} else if (!full && !after) {
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
- node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
+ node->entry = ziplistInsert(node->entry, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
@@ -928,7 +1052,7 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
- new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
+ new_node->entry = ziplistPush(new_node->entry, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
@@ -938,7 +1062,7 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
- new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
+ new_node->entry = ziplistPush(new_node->entry, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
@@ -948,7 +1072,7 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
* - create new node and attach to quicklist */
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
- new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
+ new_node->entry = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
@@ -958,7 +1082,7 @@ REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
- new_node->zl = ziplistPush(new_node->zl, value, sz,
+ new_node->entry = ziplistPush(new_node->entry, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
@@ -1046,11 +1170,11 @@ int quicklistDelRange(quicklist *quicklist, const long start,
"node count: %u",
extent, del, entry.offset, delete_entire_node, node->count);
- if (delete_entire_node) {
+ if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
__quicklistDelNode(quicklist, node);
} else {
quicklistDecompressNodeForUse(node);
- node->zl = ziplistDeleteRange(node->zl, entry.offset, del);
+ node->entry = ziplistDeleteRange(node->entry, entry.offset, del);
quicklistNodeUpdateSz(node);
node->count -= del;
quicklist->count -= del;
@@ -1068,9 +1192,12 @@ int quicklistDelRange(quicklist *quicklist, const long start,
return 1;
}
-/* Passthrough to ziplistCompare() */
-int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len) {
- return ziplistCompare(p1, p2, p2_len);
+/* compare between a two entries */
+int quicklistCompare(quicklistEntry* entry, unsigned char *p2, const size_t p2_len) {
+ if (unlikely(QL_NODE_IS_PLAIN(entry->node))) {
+ return ((entry->sz == p2_len) && (memcmp(entry->value, p2, p2_len) == 0));
+ }
+ return ziplistCompare(entry->zi, p2, p2_len);
}
/* Returns a quicklist iterator 'iter'. After the initialization every
@@ -1163,10 +1290,16 @@ int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;
int offset_update = 0;
+ int plain = QL_NODE_IS_PLAIN(iter->current);
if (!iter->zi) {
/* If !zi, use current index. */
quicklistDecompressNodeForUse(iter->current);
- iter->zi = ziplistIndex(iter->current->zl, iter->offset);
+ if (unlikely(plain))
+ iter->zi = iter->current->entry;
+ else
+ iter->zi = ziplistIndex(iter->current->entry, iter->offset);
+ } else if (unlikely(plain)) {
+ iter->zi = NULL;
} else {
/* else, use existing iterator offset and get prev/next as necessary. */
if (iter->direction == AL_START_HEAD) {
@@ -1176,7 +1309,7 @@ int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
nextFn = ziplistPrev;
offset_update = -1;
}
- iter->zi = nextFn(iter->current->zl, iter->zi);
+ iter->zi = nextFn(iter->current->entry, iter->zi);
iter->offset += offset_update;
}
@@ -1184,8 +1317,15 @@ int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
entry->offset = iter->offset;
if (iter->zi) {
+ if (unlikely(plain)) {
+ entry->value = entry->node->entry;
+ entry->sz = entry->node->sz;
+ return 1;
+ }
/* Populate value from existing ziplist position */
- ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
+ unsigned int sz = 0;
+ ziplistGet(entry->zi, &entry->value, &sz, &entry->longval);
+ entry->sz = sz;
return 1;
} else {
/* We ran out of ziplist entries.
@@ -1228,19 +1368,20 @@ quicklist *quicklistDup(quicklist *orig) {
quicklistNode *node = quicklistCreateNode();
if (current->encoding == QUICKLIST_NODE_ENCODING_LZF) {
- quicklistLZF *lzf = (quicklistLZF *)current->zl;
+ quicklistLZF *lzf = (quicklistLZF *)current->entry;
size_t lzf_sz = sizeof(*lzf) + lzf->sz;
- node->zl = zmalloc(lzf_sz);
- memcpy(node->zl, current->zl, lzf_sz);
+ node->entry = zmalloc(lzf_sz);
+ memcpy(node->entry, current->entry, lzf_sz);
} else if (current->encoding == QUICKLIST_NODE_ENCODING_RAW) {
- node->zl = zmalloc(current->sz);
- memcpy(node->zl, current->zl, current->sz);
+ node->entry = zmalloc(current->sz);
+ memcpy(node->entry, current->entry, current->sz);
}
node->count = current->count;
copy->count += node->count;
node->sz = current->sz;
node->encoding = current->encoding;
+ node->container = current->container;
_quicklistInsertNodeAfter(copy, copy->tail, node);
}
@@ -1311,21 +1452,46 @@ int quicklistIndex(const quicklist *quicklist, const long long idx,
}
quicklistDecompressNodeForUse(entry->node);
- entry->zi = ziplistIndex(entry->node->zl, entry->offset);
- if (!ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval))
+
+ if (unlikely(QL_NODE_IS_PLAIN(entry->node))) {
+ entry->value = entry->node->entry;
+ entry->sz = entry->node->sz;
+ return 1;
+ }
+
+ entry->zi = ziplistIndex(entry->node->entry, entry->offset);
+ unsigned int sz = 0;
+ if (!ziplistGet(entry->zi, &entry->value, &sz, &entry->longval))
assert(0); /* This can happen on corrupt ziplist with fake entry count. */
/* The caller will use our result, so we don't re-compress here.
* The caller can recompress or delete the node as needed. */
+ entry->sz = sz;
return 1;
}
+static void quicklistRotatePlain(quicklist *quicklist) {
+ quicklistNode *new_head = quicklist->tail;
+ quicklistNode *new_tail = quicklist->tail->prev;
+ quicklist->head->prev = new_head;
+ new_tail->next = NULL;
+ new_head->next = quicklist->head;
+ new_head->prev = NULL;
+ quicklist->head = new_head;
+ quicklist->tail = new_tail;
+}
+
/* Rotate quicklist by moving the tail element to the head. */
void quicklistRotate(quicklist *quicklist) {
if (quicklist->count <= 1)
return;
+ if (unlikely(QL_NODE_IS_PLAIN(quicklist->tail))) {
+ quicklistRotatePlain(quicklist);
+ return;
+ }
+
/* First, get the tail entry */
- unsigned char *p = ziplistIndex(quicklist->tail->zl, -1);
+ unsigned char *p = ziplistIndex(quicklist->tail->entry, -1);
unsigned char *value, *tmp;
long long longval;
unsigned int sz;
@@ -1353,7 +1519,7 @@ void quicklistRotate(quicklist *quicklist) {
* tail ziplist and PushHead() could have reallocated our single ziplist,
* which would make our pre-existing 'p' unusable. */
if (quicklist->len == 1) {
- p = ziplistIndex(quicklist->tail->zl, -1);
+ p = ziplistIndex(quicklist->tail->entry, -1);
}
/* Remove tail entry. */
@@ -1372,8 +1538,8 @@ void quicklistRotate(quicklist *quicklist) {
* Return value of 1 means check 'data' and 'sval' for values.
* If 'data' is set, use 'data' and 'sz'. Otherwise, use 'sval'. */
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
- unsigned int *sz, long long *sval,
- void *(*saver)(unsigned char *data, unsigned int sz)) {
+ size_t *sz, long long *sval,
+ void *(*saver)(unsigned char *data, size_t sz)) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
@@ -1399,7 +1565,16 @@ int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
return 0;
}
- p = ziplistIndex(node->zl, pos);
+ if (unlikely(QL_NODE_IS_PLAIN(node))) {
+ if (data)
+ *data = saver(node->entry, node->sz);
+ if (sz)
+ *sz = node->sz;
+ quicklistDelIndex(quicklist, node, NULL);
+ return 1;
+ }
+
+ p = ziplistIndex(node->entry, pos);
if (ziplistGet(p, &vstr, &vlen, &vlong)) {
if (vstr) {
if (data)
@@ -1419,7 +1594,7 @@ int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
}
/* Return a malloc'd copy of data passed in */
-REDIS_STATIC void *_quicklistSaver(unsigned char *data, unsigned int sz) {
+REDIS_STATIC void *_quicklistSaver(unsigned char *data, size_t sz) {
unsigned char *vstr;
if (data) {
vstr = zmalloc(sz);
@@ -1433,9 +1608,9 @@ REDIS_STATIC void *_quicklistSaver(unsigned char *data, unsigned int sz) {
*
* Returns malloc'd value from quicklist */
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
- unsigned int *sz, long long *slong) {
+ size_t *sz, long long *slong) {
unsigned char *vstr;
- unsigned int vlen;
+ size_t vlen;
long long vlong;
if (quicklist->count == 0)
return 0;
@@ -1613,7 +1788,8 @@ static int _itrprintr(quicklist *ql, int print, int forward) {
prev = entry.node;
}
if (print) {
- printf("[%3d (%2d)]: [%.*s] (%lld)\n", i, p, entry.sz,
+ int size = (entry.sz > (1<<20)) ? 1<<20 : entry.sz;
+ printf("[%3d (%2d)]: [%.*s] (%lld)\n", i, p, size,
(char *)entry.value, entry.longval);
}
i++;
@@ -1671,18 +1847,18 @@ static int _ql_verify(quicklist *ql, uint32_t len, uint32_t count,
}
if (ql->head && head_count != ql->head->count &&
- head_count != ziplistLen(ql->head->zl)) {
+ head_count != ziplistLen(ql->head->entry)) {
yell("quicklist head count wrong: expected %d, "
"got cached %d vs. actual %d",
- head_count, ql->head->count, ziplistLen(ql->head->zl));
+ head_count, ql->head->count, ziplistLen(ql->head->entry));
errors++;
}
if (ql->tail && tail_count != ql->tail->count &&
- tail_count != ziplistLen(ql->tail->zl)) {
+ tail_count != ziplistLen(ql->tail->entry)) {
yell("quicklist tail count wrong: expected %d, "
"got cached %u vs. actual %d",
- tail_count, ql->tail->count, ziplistLen(ql->tail->zl));
+ tail_count, ql->tail->count, ziplistLen(ql->tail->entry));
errors++;
}
@@ -1696,7 +1872,7 @@ static int _ql_verify(quicklist *ql, uint32_t len, uint32_t count,
if (node->encoding != QUICKLIST_NODE_ENCODING_RAW) {
yell("Incorrect compression: node %d is "
"compressed at depth %d ((%u, %u); total "
- "nodes: %lu; size: %u; recompress: %d)",
+ "nodes: %lu; size: %zu; recompress: %d)",
at, ql->compress, low_raw, high_raw, ql->len, node->sz,
node->recompress);
errors++;
@@ -1706,7 +1882,7 @@ static int _ql_verify(quicklist *ql, uint32_t len, uint32_t count,
!node->attempted_compress) {
yell("Incorrect non-compression: node %d is NOT "
"compressed at depth %d ((%u, %u); total "
- "nodes: %lu; size: %u; recompress: %d; attempted: %d)",
+ "nodes: %lu; size: %zu; recompress: %d; attempted: %d)",
at, ql->compress, low_raw, high_raw, ql->len, node->sz,
node->recompress, node->attempted_compress);
errors++;
@@ -1829,6 +2005,74 @@ int quicklistTest(int argc, char *argv[], int accurate) {
quicklistRelease(ql);
}
+ TEST("Comprassion Plain node") {
+ quicklistisSetPackedThreshold(1);
+ quicklist *ql = quicklistNew(-2, 1);
+ for (int i = 0; i < 500; i++)
+ /* Set to 256 to allow the node to be triggered to compress,
+ * if it is less than 48(nocompress), the test will be successful. */
+ quicklistPushHead(ql, genstr("hello", i), 256);
+
+ quicklistIter *iter = quicklistGetIterator(ql, AL_START_TAIL);
+ quicklistEntry entry;
+ int i = 0;
+ while (quicklistNext(iter, &entry)) {
+ char *h = genstr("hello", i);
+ if (strcmp((char *)entry.value, h))
+ ERR("value [%s] didn't match [%s] at position %d",
+ entry.value, h, i);
+ i++;
+ }
+ quicklistReleaseIterator(iter);
+ quicklistRelease(ql);
+ }
+
+ TEST("NEXT plain node")
+ {
+ packed_threshold = 3;
+ quicklist *ql = quicklistNew(-2, options[_i]);
+ char *strings[] = {"hello1", "hello2", "h3", "h4", "hello5"};
+
+ for (int i = 0; i < 5; ++i)
+ quicklistPushHead(ql, strings[i], strlen(strings[i]));
+
+ quicklistEntry entry;
+ quicklistIter *iter = quicklistGetIterator(ql, AL_START_TAIL);
+ int j = 0;
+
+ while(quicklistNext(iter, &entry) != 0) {
+ assert(strncmp(strings[j], (char *)entry.value, strlen(strings[j])) == 0);
+ j++;
+ }
+ quicklistReleaseIterator(iter);
+ quicklistRelease(ql);
+ }
+
+ TEST("rotate plain node ") {
+ unsigned char *data = NULL;
+ size_t sz;
+ long long lv;
+ int i =0;
+ packed_threshold = 5;
+ quicklist *ql = quicklistNew(-2, options[_i]);
+ quicklistPushHead(ql, "hello1", 6);
+ quicklistPushHead(ql, "hello4", 6);
+ quicklistPushHead(ql, "hello3", 6);
+ quicklistPushHead(ql, "hello2", 6);
+ quicklistRotate(ql);
+
+ for(i = 1 ; i < 5; i++) {
+ quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv);
+ int temp_char = data[5];
+ zfree(data);
+ assert(temp_char == ('0' + i));
+ }
+
+ ql_verify(ql, 0, 0, 0, 0);
+ quicklistRelease(ql);
+ packed_threshold = (1 << 30);
+ }
+
TEST("rotate one val once") {
for (int f = 0; f < fill_count; f++) {
quicklist *ql = quicklistNew(fills[f], options[_i]);
@@ -1877,15 +2121,17 @@ int quicklistTest(int argc, char *argv[], int accurate) {
char *populate = genstr("hello", 331);
quicklistPushHead(ql, populate, 32);
unsigned char *data;
- unsigned int sz;
+ size_t sz;
long long lv;
ql_info(ql);
assert(quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv));
assert(data != NULL);
assert(sz == 32);
- if (strcmp(populate, (char *)data))
- ERR("Pop'd value (%.*s) didn't equal original value (%s)", sz,
+ if (strcmp(populate, (char *)data)) {
+ int size = sz;
+ ERR("Pop'd value (%.*s) didn't equal original value (%s)", size,
data, populate);
+ }
zfree(data);
ql_verify(ql, 0, 0, 0, 0);
quicklistRelease(ql);
@@ -1895,7 +2141,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
quicklist *ql = quicklistNew(-2, options[_i]);
quicklistPushHead(ql, "55513", 5);
unsigned char *data;
- unsigned int sz;
+ size_t sz;
long long lv;
ql_info(ql);
assert(quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv));
@@ -1912,15 +2158,17 @@ int quicklistTest(int argc, char *argv[], int accurate) {
ql_info(ql);
for (int i = 0; i < 500; i++) {
unsigned char *data;
- unsigned int sz;
+ size_t sz;
long long lv;
int ret = quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv);
assert(ret == 1);
assert(data != NULL);
assert(sz == 32);
- if (strcmp(genstr("hello", 499 - i), (char *)data))
+ if (strcmp(genstr("hello", 499 - i), (char *)data)) {
+ int size = sz;
ERR("Pop'd value (%.*s) didn't equal original value (%s)",
- sz, data, genstr("hello", 499 - i));
+ size, data, genstr("hello", 499 - i));
+ }
zfree(data);
}
ql_verify(ql, 0, 0, 0, 0);
@@ -1933,17 +2181,19 @@ int quicklistTest(int argc, char *argv[], int accurate) {
quicklistPushHead(ql, genstr("hello", i), 32);
for (int i = 0; i < 5000; i++) {
unsigned char *data;
- unsigned int sz;
+ size_t sz;
long long lv;
int ret = quicklistPop(ql, QUICKLIST_HEAD, &data, &sz, &lv);
if (i < 500) {
assert(ret == 1);
assert(data != NULL);
assert(sz == 32);
- if (strcmp(genstr("hello", 499 - i), (char *)data))
+ if (strcmp(genstr("hello", 499 - i), (char *)data)) {
+ int size = sz;
ERR("Pop'd value (%.*s) didn't equal original value "
"(%s)",
- sz, data, genstr("hello", 499 - i));
+ size, data, genstr("hello", 499 - i));
+ }
zfree(data);
} else {
assert(ret == 0);
@@ -2008,7 +2258,8 @@ int quicklistTest(int argc, char *argv[], int accurate) {
/* verify results */
quicklistIndex(ql, 0, &entry);
if (strncmp((char *)entry.value, "abc", 3)) {
- ERR("Value 0 didn't match, instead got: %.*s", entry.sz,
+ int sz = entry.sz;
+ ERR("Value 0 didn't match, instead got: %.*s", sz,
entry.value);
}
quicklistRelease(ql);
@@ -2023,8 +2274,9 @@ int quicklistTest(int argc, char *argv[], int accurate) {
/* verify results */
quicklistIndex(ql, 0, &entry);
+ int sz = entry.sz;
if (strncmp((char *)entry.value, "abc", 3)) {
- ERR("Value 0 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 0 didn't match, instead got: %.*s", sz,
entry.value);
}
quicklistRelease(ql);
@@ -2040,13 +2292,15 @@ int quicklistTest(int argc, char *argv[], int accurate) {
/* verify results */
quicklistIndex(ql, 0, &entry);
+ int sz = entry.sz;
if (strncmp((char *)entry.value, "hello", 5)) {
- ERR("Value 0 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 0 didn't match, instead got: %.*s", sz,
entry.value);
}
quicklistIndex(ql, 1, &entry);
+ sz = entry.sz;
if (strncmp((char *)entry.value, "abc", 3)) {
- ERR("Value 1 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 1 didn't match, instead got: %.*s", sz,
entry.value);
}
@@ -2063,13 +2317,15 @@ int quicklistTest(int argc, char *argv[], int accurate) {
/* verify results */
quicklistIndex(ql, 0, &entry);
+ int sz = entry.sz;
if (strncmp((char *)entry.value, "abc", 3)) {
- ERR("Value 0 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 0 didn't match, instead got: %.*s", sz,
entry.value);
}
quicklistIndex(ql, 1, &entry);
+ sz = entry.sz;
if (strncmp((char *)entry.value, "hello", 5)) {
- ERR("Value 1 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 1 didn't match, instead got: %.*s", sz,
entry.value);
}
@@ -2129,28 +2385,30 @@ int quicklistTest(int argc, char *argv[], int accurate) {
/* verify results */
quicklistIndex(ql, 0, &entry);
+ int sz = entry.sz;
+
if (strncmp((char *)entry.value, "abc", 3))
- ERR("Value 0 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 0 didn't match, instead got: %.*s", sz,
entry.value);
quicklistIndex(ql, 1, &entry);
if (strncmp((char *)entry.value, "def", 3))
- ERR("Value 1 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 1 didn't match, instead got: %.*s", sz,
entry.value);
quicklistIndex(ql, 2, &entry);
if (strncmp((char *)entry.value, "bar", 3))
- ERR("Value 2 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 2 didn't match, instead got: %.*s", sz,
entry.value);
quicklistIndex(ql, 3, &entry);
if (strncmp((char *)entry.value, "bob", 3))
- ERR("Value 3 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 3 didn't match, instead got: %.*s", sz,
entry.value);
quicklistIndex(ql, 4, &entry);
if (strncmp((char *)entry.value, "foo", 3))
- ERR("Value 4 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 4 didn't match, instead got: %.*s", sz,
entry.value);
quicklistIndex(ql, 5, &entry);
if (strncmp((char *)entry.value, "zoo", 3))
- ERR("Value 5 didn't match, instead got: %.*s", entry.sz,
+ ERR("Value 5 didn't match, instead got: %.*s", sz,
entry.value);
quicklistReleaseIterator(iter);
quicklistRelease(ql);
@@ -2276,8 +2534,9 @@ int quicklistTest(int argc, char *argv[], int accurate) {
for (int i = 0; i < 50; i++)
quicklistPushTail(ql, genstr("hello", i + 1), 32);
quicklistEntry entry;
+ int sz = entry.sz;
if (quicklistIndex(ql, 50, &entry))
- ERR("Index found at 50 with 50 list: %.*s", entry.sz,
+ ERR("Index found at 50 with 50 list: %.*s", sz,
entry.value);
quicklistRelease(ql);
}
@@ -2469,7 +2728,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
quicklistEntry entry;
int i = 0;
while (quicklistNext(iter, &entry)) {
- if (quicklistCompare(entry.zi, (unsigned char *)"bar", 3)) {
+ if (quicklistCompare(&entry, (unsigned char *)"bar", 3)) {
quicklistDelEntry(iter, &entry);
}
i++;
@@ -2482,9 +2741,10 @@ int quicklistTest(int argc, char *argv[], int accurate) {
while (quicklistNext(iter, &entry)) {
/* Result must be: abc, foo, foobar, foobared, zap, test,
* foo */
+ int sz = entry.sz;
if (strncmp((char *)entry.value, result[i], entry.sz)) {
ERR("No match at position %d, got %.*s instead of %s",
- i, entry.sz, entry.value, result[i]);
+ i, sz, entry.value, result[i]);
}
i++;
}
@@ -2497,7 +2757,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
i = 0;
int del = 2;
while (quicklistNext(iter, &entry)) {
- if (quicklistCompare(entry.zi, (unsigned char *)"foo", 3)) {
+ if (quicklistCompare(&entry, (unsigned char *)"foo", 3)) {
quicklistDelEntry(iter, &entry);
del--;
}
@@ -2517,10 +2777,11 @@ int quicklistTest(int argc, char *argv[], int accurate) {
while (quicklistNext(iter, &entry)) {
/* Result must be: abc, foo, foobar, foobared, zap, test,
* foo */
+ int sz = entry.sz;
if (strncmp((char *)entry.value, resultB[resB - 1 - i],
- entry.sz)) {
+ sz)) {
ERR("No match at position %d, got %.*s instead of %s",
- i, entry.sz, entry.value, resultB[resB - 1 - i]);
+ i, sz, entry.value, resultB[resB - 1 - i]);
}
i++;
}
@@ -2543,7 +2804,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
quicklistIter *iter = quicklistGetIterator(ql, AL_START_TAIL);
int i = 0;
while (quicklistNext(iter, &entry)) {
- if (quicklistCompare(entry.zi, (unsigned char *)"hij", 3)) {
+ if (quicklistCompare(&entry, (unsigned char *)"hij", 3)) {
quicklistDelEntry(iter, &entry);
}
i++;
@@ -2558,7 +2819,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
i = 0;
char *vals[] = {"abc", "def", "jkl", "oop"};
while (quicklistNext(iter, &entry)) {
- if (!quicklistCompare(entry.zi, (unsigned char *)vals[i],
+ if (!quicklistCompare(&entry, (unsigned char *)vals[i],
3)) {
ERR("Value at %d didn't match %s\n", i, vals[i]);
}
@@ -2652,9 +2913,10 @@ int quicklistTest(int argc, char *argv[], int accurate) {
ERR("B! got instead: %lld", entry.longval);
quicklistPushTail(ql, "bobobob", 7);
quicklistIndex(ql, -1, &entry);
+ int sz = entry.sz;
if (strncmp((char *)entry.value, "bobobob", 7))
ERR("Tail doesn't match bobobob, it's %.*s instead",
- entry.sz, entry.value);
+ sz, entry.value);
for (int i = 0; i < 12; i++) {
quicklistIndex(ql, i, &entry);
if (entry.longval != nums[5 + i])
@@ -2781,7 +3043,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
if (node->encoding != QUICKLIST_NODE_ENCODING_RAW) {
ERR("Incorrect compression: node %d is "
"compressed at depth %d ((%u, %u); total "
- "nodes: %lu; size: %u)",
+ "nodes: %lu; size: %zu)",
at, depth, low_raw, high_raw, ql->len,
node->sz);
}
@@ -2789,7 +3051,7 @@ int quicklistTest(int argc, char *argv[], int accurate) {
if (node->encoding != QUICKLIST_NODE_ENCODING_LZF) {
ERR("Incorrect non-compression: node %d is NOT "
"compressed at depth %d ((%u, %u); total "
- "nodes: %lu; size: %u; attempted: %d)",
+ "nodes: %lu; size: %zu; attempted: %d)",
at, depth, low_raw, high_raw, ql->len,
node->sz, node->attempted_compress);
}
diff --git a/src/quicklist.h b/src/quicklist.h
index 173d9a419..e9bf07161 100644
--- a/src/quicklist.h
+++ b/src/quicklist.h
@@ -46,8 +46,8 @@
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
- unsigned char *zl;
- unsigned int sz; /* ziplist size in bytes */
+ unsigned char *entry;
+ size_t sz; /* entry size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
@@ -56,13 +56,13 @@ typedef struct quicklistNode {
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
-/* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
+/* quicklistLZF is a 8+N byte struct holding 'sz' followed by 'compressed'.
* 'sz' is byte length of 'compressed' field.
* 'compressed' is LZF data with total (compressed) length 'sz'
* NOTE: uncompressed length is stored in quicklistNode->sz.
- * When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
+ * When quicklistNode->entry is compressed, node->entry points to a quicklistLZF */
typedef struct quicklistLZF {
- unsigned int sz; /* LZF size in bytes*/
+ size_t sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
@@ -127,7 +127,7 @@ typedef struct quicklistEntry {
unsigned char *zi;
unsigned char *value;
long long longval;
- unsigned int sz;
+ size_t sz;
int offset;
} quicklistEntry;
@@ -142,9 +142,11 @@ typedef struct quicklistEntry {
#define QUICKLIST_NOCOMPRESS 0
/* quicklist container formats */
-#define QUICKLIST_NODE_CONTAINER_NONE 1
+#define QUICKLIST_NODE_CONTAINER_PLAIN 1
#define QUICKLIST_NODE_CONTAINER_ZIPLIST 2
+#define QL_NODE_IS_PLAIN(node) ((node)->container == QUICKLIST_NODE_CONTAINER_PLAIN)
+
#define quicklistNodeIsCompressed(node) \
((node)->encoding == QUICKLIST_NODE_ENCODING_LZF)
@@ -160,6 +162,7 @@ int quicklistPushTail(quicklist *quicklist, void *value, const size_t sz);
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where);
void quicklistAppendZiplist(quicklist *quicklist, unsigned char *zl);
+void quicklistAppendPlainNode(quicklist *quicklist, unsigned char *data, size_t sz);
quicklist *quicklistAppendValuesFromZiplist(quicklist *quicklist,
unsigned char *zl);
quicklist *quicklistCreateFromZiplist(int fill, int compress,
@@ -170,9 +173,9 @@ void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz);
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry);
void quicklistReplaceEntry(quicklist *quicklist, quicklistEntry *entry,
- void *data, int sz);
+ void *data, size_t sz);
int quicklistReplaceAtIndex(quicklist *quicklist, long index, void *data,
- int sz);
+ const size_t sz);
int quicklistDelRange(quicklist *quicklist, const long start, const long stop);
quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction);
quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
@@ -185,19 +188,21 @@ int quicklistIndex(const quicklist *quicklist, const long long index,
quicklistEntry *entry);
void quicklistRotate(quicklist *quicklist);
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
- unsigned int *sz, long long *sval,
- void *(*saver)(unsigned char *data, unsigned int sz));
+ size_t *sz, long long *sval,
+ void *(*saver)(unsigned char *data, size_t sz));
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
- unsigned int *sz, long long *slong);
+ size_t *sz, long long *slong);
unsigned long quicklistCount(const quicklist *ql);
-int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len);
+int quicklistCompare(quicklistEntry *entry, unsigned char *p2, const size_t p2_len);
size_t quicklistGetLzf(const quicklistNode *node, void **data);
+void quicklistRepr(unsigned char *ql, int full);
/* bookmarks */
int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node);
int quicklistBookmarkDelete(quicklist *ql, const char *name);
quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name);
void quicklistBookmarksClear(quicklist *ql);
+int quicklistisSetPackedThreshold(size_t sz);
#ifdef REDIS_TEST
int quicklistTest(int argc, char *argv[], int accurate);
diff --git a/src/rdb.c b/src/rdb.c
index ad7ede6c4..a56a3133c 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -658,7 +658,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
- return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
+ return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
@@ -813,13 +813,16 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
nwritten += n;
while(node) {
+ if ((n = rdbSaveLen(rdb,node->container)) == -1) return -1;
+ nwritten += n;
+
if (quicklistNodeIsCompressed(node)) {
void *data;
size_t compress_len = quicklistGetLzf(node, &data);
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
nwritten += n;
} else {
- if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
+ if ((n = rdbSaveRawString(rdb,node->entry,node->sz)) == -1) return -1;
nwritten += n;
}
node = node->next;
@@ -1934,36 +1937,58 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
/* All pairs should be read by now */
serverAssert(len == 0);
- } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {
+ } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST || rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if (len == 0) goto emptykey;
o = createQuicklistObject();
quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
-
+ uint64_t container = QUICKLIST_NODE_CONTAINER_ZIPLIST;
while (len--) {
size_t encoded_len;
- unsigned char *zl =
+
+ if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
+ if ((container = rdbLoadLen(rdb,NULL)) == RDB_LENERR) {
+ decrRefCount(o);
+ return NULL;
+ }
+
+ if (container != QUICKLIST_NODE_CONTAINER_ZIPLIST && container != QUICKLIST_NODE_CONTAINER_PLAIN) {
+ rdbReportCorruptRDB("Quicklist integrity check failed.");
+ decrRefCount(o);
+ return NULL;
+ }
+ }
+
+ unsigned char *data =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,&encoded_len);
- if (zl == NULL) {
+ if (data == NULL || (encoded_len == 0)) {
+ zfree(data);
decrRefCount(o);
return NULL;
}
+
+ if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
+ quicklistAppendPlainNode(o->ptr, data, encoded_len);
+ zfree(data);
+ continue;
+ }
+
if (deep_integrity_validation) server.stat_dump_payload_sanitizations++;
- if (!ziplistValidateIntegrity(zl, encoded_len, deep_integrity_validation, NULL, NULL)) {
+ if (!ziplistValidateIntegrity(data, encoded_len, deep_integrity_validation, NULL, NULL)) {
rdbReportCorruptRDB("Ziplist integrity check failed.");
decrRefCount(o);
- zfree(zl);
+ zfree(data);
return NULL;
}
/* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */
- if (ziplistLen(zl) == 0) {
- zfree(zl);
+ if (ziplistLen(data) == 0) {
+ zfree(data);
continue;
} else {
- quicklistAppendZiplist(o->ptr, zl);
+ quicklistAppendZiplist(o->ptr, data);
}
}
diff --git a/src/rdb.h b/src/rdb.h
index 71332c8ba..1188f1032 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -93,10 +93,11 @@
#define RDB_TYPE_STREAM_LISTPACKS 15
#define RDB_TYPE_HASH_LISTPACK 16
#define RDB_TYPE_ZSET_LISTPACK 17
+#define RDB_TYPE_LIST_QUICKLIST_2 18
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
-#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 17))
+#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index 34d5fb27f..40de87374 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -92,7 +92,8 @@ char *rdb_type_string[] = {
"quicklist",
"stream",
"hash-listpack",
- "zset-listpack"
+ "zset-listpack",
+ "quicklist-v2"
};
/* Show a few stats collected into 'rdbstate' */
diff --git a/src/t_list.c b/src/t_list.c
index 8a4b5dca8..a0a4ed216 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -29,8 +29,6 @@
#include "server.h"
-#define LIST_MAX_ITEM_SIZE ((1ull<<32)-1024)
-
/*-----------------------------------------------------------------------------
* List API
*----------------------------------------------------------------------------*/
@@ -55,7 +53,7 @@ void listTypePush(robj *subject, robj *value, int where) {
}
}
-void *listPopSaver(unsigned char *data, unsigned int sz) {
+void *listPopSaver(unsigned char *data, size_t sz) {
return createStringObject((char*)data,sz);
}
@@ -186,7 +184,7 @@ void listTypeReplace(listTypeEntry *entry, robj *value) {
int listTypeEqual(listTypeEntry *entry, robj *o) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
- return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
+ return quicklistCompare(&entry->entry,o->ptr,sdslen(o->ptr));
} else {
serverPanic("Unknown list encoding");
}
@@ -210,7 +208,7 @@ void listTypeConvert(robj *subject, int enc) {
size_t zlen = server.list_max_ziplist_size;
int depth = server.list_compress_depth;
subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
- subject->encoding = OBJ_ENCODING_QUICKLIST;
+ subject->encoding = enc;
} else {
serverPanic("Unsupported list conversion");
}
@@ -229,7 +227,7 @@ robj *listTypeDup(robj *o) {
switch (o->encoding) {
case OBJ_ENCODING_QUICKLIST:
lobj = createObject(OBJ_LIST, quicklistDup(o->ptr));
- lobj->encoding = OBJ_ENCODING_QUICKLIST;
+ lobj->encoding = o->encoding;
break;
default:
serverPanic("Unknown list encoding");
@@ -256,13 +254,6 @@ int listTypeDelRange(robj *subject, long start, long count) {
void pushGenericCommand(client *c, int where, int xx) {
int j;
- for (j = 2; j < c->argc; j++) {
- if (sdslen(c->argv[j]->ptr) > LIST_MAX_ITEM_SIZE) {
- addReplyError(c, "Element too large");
- return;
- }
- }
-
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
@@ -326,11 +317,6 @@ void linsertCommand(client *c) {
return;
}
- if (sdslen(c->argv[4]->ptr) > LIST_MAX_ITEM_SIZE) {
- addReplyError(c, "Element too large");
- return;
- }
-
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;
@@ -398,11 +384,6 @@ void lsetCommand(client *c) {
long index;
robj *value = c->argv[3];
- if (sdslen(value->ptr) > LIST_MAX_ITEM_SIZE) {
- addReplyError(c, "Element too large");
- return;
- }
-
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;
@@ -702,11 +683,6 @@ void lposCommand(client *c) {
int direction = LIST_TAIL;
long rank = 1, count = -1, maxlen = 0; /* Count -1: option not given. */
- if (sdslen(ele->ptr) > LIST_MAX_ITEM_SIZE) {
- addReplyError(c, "Element too large");
- return;
- }
-
/* Parse the optional arguments. */
for (int j = 3; j < c->argc; j++) {
char *opt = c->argv[j]->ptr;
@@ -802,11 +778,6 @@ void lremCommand(client *c) {
long toremove;
long removed = 0;
- if (sdslen(obj->ptr) > LIST_MAX_ITEM_SIZE) {
- addReplyError(c, "Element too large");
- return;
- }
-
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
return;