summaryrefslogtreecommitdiff
path: root/src/listpack.c
diff options
context:
space:
mode:
authorsundb <sundbcn@gmail.com>2021-09-09 23:18:53 +0800
committerGitHub <noreply@github.com>2021-09-09 18:18:53 +0300
commit3ca6972ecd3e9963f65b9cb1ff050ad60f03563e (patch)
tree9f82d9d622f5d161bc0d72c83f8f31555f5a652d /src/listpack.c
parent86b0de5c41c96225377b83090fbdbe0209c2d9b9 (diff)
downloadredis-3ca6972ecd3e9963f65b9cb1ff050ad60f03563e.tar.gz
Replace all usage of ziplist with listpack for t_zset (#9366)
Part two of implementing #8702 (zset), after #8887. ## Description of the feature Replaced all uses of ziplist with listpack in t_zset, and optimized some of the code to optimize performance. ## Rdb format changes New `RDB_TYPE_ZSET_LISTPACK` rdb type. ## Rdb loading improvements: 1) Pre-expansion of dict for validation of duplicate data for listpack and ziplist. 2) Simplifying the release of empty key objects when RDB loading. 3) Unify ziplist and listpack data verify methods for zset and hash, and move code to rdb.c. ## Interface changes 1) New `zset-max-listpack-entries` config is an alias for `zset-max-ziplist-entries` (same with `zset-max-listpack-value`). 2) OBJECT ENCODING will return listpack instead of ziplist. ## Listpack improvements: 1) Add `lpDeleteRange` and `lpDeleteRangeWithEntry` functions to delete a range of entries from listpack. 2) Improve the performance of `lpCompare`, converting from string to integer is faster than converting from integer to string. 3) Replace `snprintf` with `ll2string` to improve performance in converting numbers to strings in `lpGet()`. ## Zset improvements: 1) Improve the performance of `zzlFind` method, use `lpFind` instead of `lpCompare` in a loop. 2) Use `lpDeleteRangeWithEntry` instead of `lpDelete` twice to delete a element of zset. ## Tests 1) Add some unittests for `lpDeleteRange` and `lpDeleteRangeWithEntry` function. 2) Add zset RDB loading test. 3) Add benchmark test for `lpCompare` and `ziplsitCompare`. 4) Add empty listpack zset corrupt dump test.
Diffstat (limited to 'src/listpack.c')
-rw-r--r--src/listpack.c251
1 files changed, 243 insertions, 8 deletions
diff --git a/src/listpack.c b/src/listpack.c
index 9bb1c8c95..1849a5479 100644
--- a/src/listpack.c
+++ b/src/listpack.c
@@ -43,6 +43,7 @@
#include "listpack.h"
#include "listpack_malloc.h"
#include "redisassert.h"
+#include "util.h"
#define LP_HDR_SIZE 6 /* 32 bit total len + 16 bit number of elements. */
#define LP_HDR_NUMELE_UNKNOWN UINT16_MAX
@@ -642,7 +643,7 @@ static inline unsigned char *lpGetWithSize(unsigned char *p, int64_t *count, uns
/* Return the string representation of the integer or the value itself
* depending on intbuf being NULL or not. */
if (intbuf) {
- *count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",(long long)val);
+ *count = ll2string((char*)intbuf,LP_INTBUF_SIZE,(long long)val);
return intbuf;
} else {
*count = val;
@@ -909,6 +910,13 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char
return lp;
}
+/* This is just a wrapper for lpInsert() to directly use a string. */
+unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen,
+ unsigned char *p, int where, unsigned char **newp)
+{
+ return lpInsert(lp, s, NULL, slen, p, where, newp);
+}
+
/* This is just a wrapper for lpInsert() to directly use a 64 bit integer
* instead of a string. */
unsigned char *lpInsertInteger(unsigned char *lp, long long lval, unsigned char *p, int where, unsigned char **newp) {
@@ -972,6 +980,73 @@ unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **new
return lpInsert(lp,NULL,NULL,0,p,LP_REPLACE,newp);
}
+/* Delete a range of entries from the listpack start with the element pointed by 'p'. */
+unsigned char *lpDeleteRangeWithEntry(unsigned char *lp, unsigned char **p, unsigned long num) {
+ size_t bytes = lpBytes(lp);
+ unsigned long deleted = 0;
+ unsigned char *eofptr = lp + bytes - 1;
+ unsigned char *first, *tail;
+ first = tail = *p;
+
+ if (num == 0) return lp; /* Nothing to delete, return ASAP. */
+
+ /* Find the next entry to the last entry that needs to be deleted.
+ * lpLength may be unreliable due to corrupt data, so we cannot
+ * treat 'num' as the number of elements to be deleted. */
+ while (num--) {
+ deleted++;
+ tail = lpSkip(tail);
+ if (tail[0] == LP_EOF) break;
+ lpAssertValidEntry(lp, bytes, tail);
+ }
+
+ /* Store the offset of the element 'first', so that we can obtain its
+ * address again after a reallocation. */
+ unsigned long poff = first-lp;
+
+ /* Move tail to the front of the ziplist */
+ memmove(first, tail, eofptr - tail + 1);
+ lpSetTotalBytes(lp, bytes - (tail - first));
+ uint32_t numele = lpGetNumElements(lp);
+ if (numele != LP_HDR_NUMELE_UNKNOWN)
+ lpSetNumElements(lp, numele-deleted);
+ lp = lpShrinkToFit(lp);
+
+ /* Store the entry. */
+ *p = lp+poff;
+ if ((*p)[0] == LP_EOF) *p = NULL;
+
+ return lp;
+}
+
+/* Delete a range of entries from the listpack. */
+unsigned char *lpDeleteRange(unsigned char *lp, long index, unsigned long num) {
+ unsigned char *p;
+ uint32_t numele = lpGetNumElements(lp);
+
+ if (num == 0) return lp; /* Nothing to delete, return ASAP. */
+ if ((p = lpSeek(lp, index)) == NULL) return lp;
+
+ /* If we know we're gonna delete beyond the end of the listpack, we can just move
+ * the EOF marker, and there's no need to iterate through the entries,
+ * but if we can't be sure how many entries there are, we rather avoid calling lpLength
+ * since that means an additional iteration on all elements.
+ *
+ * Note that index could overflow, but we use the value after seek, so when we
+ * use it no overflow happens. */
+ if (numele != LP_HDR_NUMELE_UNKNOWN && index < 0) index = (long)numele + index;
+ if (numele != LP_HDR_NUMELE_UNKNOWN && (numele - (unsigned long)index) <= num) {
+ p[0] = LP_EOF;
+ lpSetTotalBytes(lp, p - lp + 1);
+ lpSetNumElements(lp, index);
+ lp = lpShrinkToFit(lp);
+ } else {
+ lp = lpDeleteRangeWithEntry(lp, &p, num);
+ }
+
+ return lp;
+}
+
/* Return the total number of bytes the listpack is composed of. */
size_t lpBytes(unsigned char *lp) {
return lpGetTotalBytes(lp);
@@ -1112,6 +1187,7 @@ int lpValidateIntegrity(unsigned char *lp, size_t size, int deep,
/* Validate the individual entries. */
uint32_t count = 0;
+ uint32_t numele = lpGetNumElements(lp);
unsigned char *p = lp + LP_HDR_SIZE;
while(p && p[0] != LP_EOF) {
unsigned char *prev = p;
@@ -1122,28 +1198,39 @@ int lpValidateIntegrity(unsigned char *lp, size_t size, int deep,
return 0;
/* Optionally let the caller validate the entry too. */
- if (entry_cb && !entry_cb(prev, cb_userdata))
+ if (entry_cb && !entry_cb(prev, numele, cb_userdata))
return 0;
count++;
}
/* Check that the count in the header is correct */
- uint32_t numele = lpGetNumElements(lp);
if (numele != LP_HDR_NUMELE_UNKNOWN && numele != count)
return 0;
return 1;
}
+/* Compare entry pointer to by 'p' with string 's' of length 'slen'.
+ * Return 1 if equal. */
unsigned int lpCompare(unsigned char *p, unsigned char *s, uint32_t slen) {
- unsigned char buf[LP_INTBUF_SIZE];
unsigned char *value;
int64_t sz;
-
if (p[0] == LP_EOF) return 0;
- value = lpGet(p, &sz, buf);
- return (slen == sz) && memcmp(value,s,slen) == 0;
+
+ value = lpGet(p, &sz, NULL);
+ if (value) {
+ return (slen == sz) && memcmp(value,s,slen) == 0;
+ } else {
+ /* We use lpStringToInt64() to get an integer representation of the
+ * string 's' and compare it to 'sval', it's much faster than convert
+ * integer to string and comparing. */
+ int64_t sval;
+ if (lpStringToInt64((const char*)s, slen, &sval))
+ return sz == sval;
+ }
+
+ return 0;
}
/* uint compare for qsort */
@@ -1391,8 +1478,9 @@ static void verifyEntry(unsigned char *p, unsigned char *s, size_t slen) {
assert(lpCompare(p, s, slen));
}
-static int lpValidation(unsigned char *p, void *userdata) {
+static int lpValidation(unsigned char *p, unsigned int head_count, void *userdata) {
UNUSED(p);
+ UNUSED(head_count);
int ret;
long *count = userdata;
@@ -1537,6 +1625,114 @@ int listpackTest(int argc, char *argv[], int accurate) {
lpFree(lp);
}
+ TEST("Delete whole listpack when num == -1");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, 0, -1);
+ assert(lpLength(lp) == 0);
+ assert(lp[LP_HDR_SIZE] == LP_EOF);
+ assert(lpBytes(lp) == (LP_HDR_SIZE + 1));
+ zfree(lp);
+
+ lp = createList();
+ unsigned char *ptr = lpFirst(lp);
+ lp = lpDeleteRangeWithEntry(lp, &ptr, -1);
+ assert(lpLength(lp) == 0);
+ assert(lp[LP_HDR_SIZE] == LP_EOF);
+ assert(lpBytes(lp) == (LP_HDR_SIZE + 1));
+ zfree(lp);
+ }
+
+ TEST("Delete whole listpack with negative index");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, -4, 4);
+ assert(lpLength(lp) == 0);
+ assert(lp[LP_HDR_SIZE] == LP_EOF);
+ assert(lpBytes(lp) == (LP_HDR_SIZE + 1));
+ zfree(lp);
+
+ lp = createList();
+ unsigned char *ptr = lpSeek(lp, -4);
+ lp = lpDeleteRangeWithEntry(lp, &ptr, 4);
+ assert(lpLength(lp) == 0);
+ assert(lp[LP_HDR_SIZE] == LP_EOF);
+ assert(lpBytes(lp) == (LP_HDR_SIZE + 1));
+ zfree(lp);
+ }
+
+ TEST("Delete inclusive range 0,0");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, 0, 1);
+ assert(lpLength(lp) == 3);
+ assert(lpSkip(lpLast(lp))[0] == LP_EOF); /* check set LP_EOF correctly */
+ zfree(lp);
+
+ lp = createList();
+ unsigned char *ptr = lpFirst(lp);
+ lp = lpDeleteRangeWithEntry(lp, &ptr, 1);
+ assert(lpLength(lp) == 3);
+ assert(lpSkip(lpLast(lp))[0] == LP_EOF); /* check set LP_EOF correctly */
+ zfree(lp);
+ }
+
+ TEST("Delete inclusive range 0,1");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, 0, 2);
+ assert(lpLength(lp) == 2);
+ verifyEntry(lpFirst(lp), (unsigned char*)mixlist[2], strlen(mixlist[2]));
+ zfree(lp);
+
+ lp = createList();
+ unsigned char *ptr = lpFirst(lp);
+ lp = lpDeleteRangeWithEntry(lp, &ptr, 2);
+ assert(lpLength(lp) == 2);
+ verifyEntry(lpFirst(lp), (unsigned char*)mixlist[2], strlen(mixlist[2]));
+ zfree(lp);
+ }
+
+ TEST("Delete inclusive range 1,2");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, 1, 2);
+ assert(lpLength(lp) == 2);
+ verifyEntry(lpFirst(lp), (unsigned char*)mixlist[0], strlen(mixlist[0]));
+ zfree(lp);
+
+ lp = createList();
+ unsigned char *ptr = lpSeek(lp, 1);
+ lp = lpDeleteRangeWithEntry(lp, &ptr, 2);
+ assert(lpLength(lp) == 2);
+ verifyEntry(lpFirst(lp), (unsigned char*)mixlist[0], strlen(mixlist[0]));
+ zfree(lp);
+ }
+
+ TEST("Delete with start index out of range");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, 5, 1);
+ assert(lpLength(lp) == 4);
+ zfree(lp);
+ }
+
+ TEST("Delete with num overflow");
+ {
+ lp = createList();
+ lp = lpDeleteRange(lp, 1, 5);
+ assert(lpLength(lp) == 1);
+ verifyEntry(lpFirst(lp), (unsigned char*)mixlist[0], strlen(mixlist[0]));
+ zfree(lp);
+
+ lp = createList();
+ unsigned char *ptr = lpSeek(lp, 1);
+ lp = lpDeleteRangeWithEntry(lp, &ptr, 5);
+ assert(lpLength(lp) == 1);
+ verifyEntry(lpFirst(lp), (unsigned char*)mixlist[0], strlen(mixlist[0]));
+ zfree(lp);
+ }
+
TEST("Delete foo while iterating") {
lp = createList();
p = lpFirst(lp);
@@ -1817,6 +2013,21 @@ int listpackTest(int argc, char *argv[], int accurate) {
lpFree(lp);
}
+ TEST("Test number of elements exceeds LP_HDR_NUMELE_UNKNOWN") {
+ lp = lpNew(0);
+ for (int i = 0; i < LP_HDR_NUMELE_UNKNOWN + 1; i++)
+ lp = lpAppend(lp, (unsigned char*)"1", 1);
+
+ assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN);
+ assert(lpLength(lp) == LP_HDR_NUMELE_UNKNOWN+1);
+
+ lp = lpDeleteRange(lp, -2, 2);
+ assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN);
+ assert(lpLength(lp) == LP_HDR_NUMELE_UNKNOWN-1);
+ assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN-1); /* update length after lpLength */
+ lpFree(lp);
+ }
+
TEST("Stress with random payloads of different encoding") {
unsigned long long start = usec();
int i,j,len,where;
@@ -1951,6 +2162,30 @@ int listpackTest(int argc, char *argv[], int accurate) {
printf("Done. usec=%lld\n", usec()-start);
}
+ TEST("Benchmark lpCompare with string") {
+ unsigned long long start = usec();
+ for (int i = 0; i < 2000; i++) {
+ unsigned char *eptr = lpSeek(lp,0);
+ while (eptr != NULL) {
+ lpCompare(eptr,(unsigned char*)"nothing",7);
+ eptr = lpNext(lp,eptr);
+ }
+ }
+ printf("Done. usec=%lld\n", usec()-start);
+ }
+
+ TEST("Benchmark lpCompare with number") {
+ unsigned long long start = usec();
+ for (int i = 0; i < 2000; i++) {
+ unsigned char *eptr = lpSeek(lp,0);
+ while (eptr != NULL) {
+ lpCompare(lp, (unsigned char*)"99999", 5);
+ eptr = lpNext(lp,eptr);
+ }
+ }
+ printf("Done. usec=%lld\n", usec()-start);
+ }
+
lpFree(lp);
}