summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/daily.yml48
-rw-r--r--src/blocked.c7
-rw-r--r--src/debug.c2
-rw-r--r--src/networking.c29
-rw-r--r--src/rdb.c64
-rw-r--r--src/stream.h7
-rw-r--r--src/t_stream.c35
-rw-r--r--src/ziplist.c2
-rw-r--r--tests/integration/aof.tcl12
-rw-r--r--tests/integration/replication-3.tcl2
10 files changed, 169 insertions, 39 deletions
diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml
new file mode 100644
index 000000000..b6a9abb68
--- /dev/null
+++ b/.github/workflows/daily.yml
@@ -0,0 +1,48 @@
+name: Daily
+
+on:
+ schedule:
+ - cron: '0 7 * * *'
+
+jobs:
+ test-jemalloc:
+ runs-on: ubuntu-latest
+ timeout-minutes: 1200
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make
+ - name: test
+ run: |
+ sudo apt-get install tcl8.5
+ ./runtest --accurate --verbose
+ - name: module api test
+ run: ./runtest-moduleapi --verbose
+
+ test-libc-malloc:
+ runs-on: ubuntu-latest
+ timeout-minutes: 1200
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make MALLOC=libc
+ - name: test
+ run: |
+ sudo apt-get install tcl8.5
+ ./runtest --accurate --verbose
+ - name: module api test
+ run: ./runtest-moduleapi --verbose
+
+ test-valgrind:
+ runs-on: ubuntu-latest
+ timeout-minutes: 14400
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make valgrind
+ - name: test
+ run: |
+ sudo apt-get install tcl8.5 valgrind -y
+ ./runtest --valgrind --verbose --clients 1
+ - name: module api test
+ run: ./runtest-moduleapi --valgrind --verbose --clients 1
diff --git a/src/blocked.c b/src/blocked.c
index 045369e93..92f1cee65 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
int noack = 0;
if (group) {
- consumer = streamLookupConsumer(group,
- receiver->bpop.xread_consumer->ptr,
- 1);
+ consumer =
+ streamLookupConsumer(group,
+ receiver->bpop.xread_consumer->ptr,
+ SLC_NONE);
noack = receiver->bpop.xread_group_noack;
}
diff --git a/src/debug.c b/src/debug.c
index cbb56cb71..587ff7c5d 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -1636,7 +1636,7 @@ void enableWatchdog(int period) {
/* Watchdog was actually disabled, so we have to setup the signal
* handler. */
sigemptyset(&act.sa_mask);
- act.sa_flags = SA_ONSTACK | SA_SIGINFO;
+ act.sa_flags = SA_SIGINFO;
act.sa_sigaction = watchdogSignalHandler;
sigaction(SIGALRM, &act, NULL);
}
diff --git a/src/networking.c b/src/networking.c
index 9d10c4bb4..fd1159e06 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -436,6 +436,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) {
sdsfree(s);
}
+/* Sometimes we are forced to create a new reply node, and we can't append to
+ * the previous one, when that happens, we wanna try to trim the unused space
+ * at the end of the last reply node which we won't use anymore. */
+void trimReplyUnusedTailSpace(client *c) {
+ listNode *ln = listLast(c->reply);
+ clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
+
+ /* Note that 'tail' may be NULL even if we have a tail node, becuase when
+ * addDeferredMultiBulkLength() is used */
+ if (!tail) return;
+
+ /* We only try to trim the space is relatively high (more than a 1/4 of the
+ * allocation), otherwise there's a high chance realloc will NOP.
+ * Also, to avoid large memmove which happens as part of realloc, we only do
+ * that if the used part is small. */
+ if (tail->size - tail->used > tail->size / 4 &&
+ tail->used < PROTO_REPLY_CHUNK_BYTES)
+ {
+ size_t old_size = tail->size;
+ tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
+ /* take over the allocation's internal fragmentation (at least for
+ * memory usage tracking) */
+ tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
+ c->reply_bytes += tail->size - old_size;
+ listNodeValue(ln) = tail;
+ }
+}
+
/* Adds an empty object to the reply list that will contain the multi bulk
* length, which is not known when this function is called. */
void *addReplyDeferredLen(client *c) {
@@ -443,6 +471,7 @@ void *addReplyDeferredLen(client *c) {
* ready to be sent, since we are sure that before returning to the
* event loop setDeferredAggregateLen() will be called. */
if (prepareClientToWrite(c) != C_OK) return NULL;
+ trimReplyUnusedTailSpace(c);
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
return listLast(c->reply);
}
diff --git a/src/rdb.c b/src/rdb.c
index e2a2fb39f..72ed23137 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1442,7 +1442,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
/* Load every single element of the list */
while(len--) {
- if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
+ if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
dec = getDecodedObject(ele);
size_t len = sdslen(dec->ptr);
quicklistPushTail(o->ptr, dec->ptr, len);
@@ -1469,8 +1472,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
long long llval;
sds sdsele;
- if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
if (o->encoding == OBJ_ENCODING_INTSET) {
/* Fetch integer value from element. */
@@ -1509,13 +1514,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
double score;
zskiplistNode *znode;
- if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
if (rdbtype == RDB_TYPE_ZSET_2) {
- if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
+ if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
+ decrRefCount(o);
+ sdsfree(sdsele);
+ return NULL;
+ }
} else {
- if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
+ if (rdbLoadDoubleValue(rdb,&score) == -1) {
+ decrRefCount(o);
+ sdsfree(sdsele);
+ return NULL;
+ }
}
/* Don't care about integer-encoded strings. */
@@ -1547,10 +1562,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
len--;
/* Load raw strings */
- if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
- if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
+ if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ sdsfree(field);
+ decrRefCount(o);
+ return NULL;
+ }
/* Add pair to ziplist */
o->ptr = ziplistPush(o->ptr, (unsigned char*)field,
@@ -1578,10 +1598,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
while (o->encoding == OBJ_ENCODING_HT && len > 0) {
len--;
/* Load encoded strings */
- if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
- if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
+ if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ sdsfree(field);
+ decrRefCount(o);
+ return NULL;
+ }
/* Add pair to hash table */
ret = dictAdd((dict*)o->ptr, field, value);
@@ -1601,7 +1626,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
while (len--) {
unsigned char *zl =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
- if (zl == NULL) return NULL;
+ if (zl == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
quicklistAppendZiplist(o->ptr, zl);
}
} else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||
@@ -1824,8 +1852,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
decrRefCount(o);
return NULL;
}
- streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
- 1);
+ streamConsumer *consumer =
+ streamLookupConsumer(cgroup,cname,SLC_NONE);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
diff --git a/src/stream.h b/src/stream.h
index b69073994..0d3bf63fc 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -96,6 +96,11 @@ typedef struct streamPropInfo {
/* Prototypes of exported APIs. */
struct client;
+/* Flags for streamLookupConsumer */
+#define SLC_NONE 0
+#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
+#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
+
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
@@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
diff --git a/src/t_stream.c b/src/t_stream.c
index 5c1b9a523..676ddd9bb 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1570,7 +1570,8 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
- consumername->ptr,1);
+ consumername->ptr,
+ SLC_NONE);
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
@@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* consumer does not exist it is automatically created as a side effect
* of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
+ int create = !(flags & SLC_NOCREAT);
+ int refresh = !(flags & SLC_NOREFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) {
@@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL);
}
- consumer->seen_time = mstime();
+ if (refresh) consumer->seen_time = mstime();
return consumer;
}
@@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
* may have pending messages: they are removed from the PEL, and the number
* of pending messages "lost" is returned. */
uint64_t streamDelConsumer(streamCG *cg, sds name) {
- streamConsumer *consumer = streamLookupConsumer(cg,name,0);
+ streamConsumer *consumer =
+ streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
if (consumer == NULL) return 0;
uint64_t retval = raxSize(consumer->pel);
@@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) {
}
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else {
- streamConsumer *consumer = consumername ?
- streamLookupConsumer(group,consumername->ptr,0):
- NULL;
-
- /* If a consumer name was mentioned but it does not exist, we can
- * just return an empty array. */
- if (consumername && consumer == NULL) {
- addReplyArrayLen(c,0);
- return;
+ streamConsumer *consumer = NULL;
+ if (consumername) {
+ consumer = streamLookupConsumer(group,
+ consumername->ptr,
+ SLC_NOCREAT|SLC_NOREFRESH);
+
+ /* If a consumer name was mentioned but it does not exist, we can
+ * just return an empty array. */
+ if (consumer == NULL) {
+ addReplyArrayLen(c,0);
+ return;
+ }
}
rax *pel = consumer ? consumer->pel : group->pel;
@@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */
if (consumer == NULL)
- consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
+ consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
nack->consumer = consumer;
nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given, otherwise
diff --git a/src/ziplist.c b/src/ziplist.c
index ef40d6aa2..ddae0d96f 100644
--- a/src/ziplist.c
+++ b/src/ziplist.c
@@ -440,7 +440,7 @@ unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
if ((prevlensize) == 1) { \
(prevlen) = (ptr)[0]; \
} else if ((prevlensize) == 5) { \
- assert(sizeof((prevlen)) == 4); \
+ assert(sizeof((prevlen)) == 4); \
memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \
memrev32ifbe(&prevlen); \
} \
diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl
index 2734de7f1..b82c87d71 100644
--- a/tests/integration/aof.tcl
+++ b/tests/integration/aof.tcl
@@ -54,6 +54,12 @@ tags {"aof"} {
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
+ wait_for_condition 50 100 {
+ [catch {$client ping} e] == 0
+ } else {
+ fail "Loading DB is taking too much time."
+ }
+
test "Truncated AOF loaded: we expect foo to be equal to 5" {
assert {[$client get foo] eq "5"}
}
@@ -71,6 +77,12 @@ tags {"aof"} {
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
+ wait_for_condition 50 100 {
+ [catch {$client ping} e] == 0
+ } else {
+ fail "Loading DB is taking too much time."
+ }
+
test "Truncated AOF loaded: we expect foo to be equal to 6 now" {
assert {[$client get foo] eq "6"}
}
diff --git a/tests/integration/replication-3.tcl b/tests/integration/replication-3.tcl
index 198e698f2..43eb53538 100644
--- a/tests/integration/replication-3.tcl
+++ b/tests/integration/replication-3.tcl
@@ -118,7 +118,7 @@ start_server {tags {"repl"}} {
# correctly the RDB file: such file will contain "lua" AUX
# sections with scripts already in the memory of the master.
- wait_for_condition 50 100 {
+ wait_for_condition 500 100 {
[s -1 master_link_status] eq {up}
} else {
fail "Replication not started."