diff options
-rw-r--r-- | .github/workflows/daily.yml | 48 | ||||
-rw-r--r-- | src/blocked.c | 7 | ||||
-rw-r--r-- | src/debug.c | 2 | ||||
-rw-r--r-- | src/networking.c | 29 | ||||
-rw-r--r-- | src/rdb.c | 64 | ||||
-rw-r--r-- | src/stream.h | 7 | ||||
-rw-r--r-- | src/t_stream.c | 35 | ||||
-rw-r--r-- | src/ziplist.c | 2 | ||||
-rw-r--r-- | tests/integration/aof.tcl | 12 | ||||
-rw-r--r-- | tests/integration/replication-3.tcl | 2 |
10 files changed, 169 insertions, 39 deletions
diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml new file mode 100644 index 000000000..b6a9abb68 --- /dev/null +++ b/.github/workflows/daily.yml @@ -0,0 +1,48 @@ +name: Daily + +on: + schedule: + - cron: '0 7 * * *' + +jobs: + test-jemalloc: + runs-on: ubuntu-latest + timeout-minutes: 1200 + steps: + - uses: actions/checkout@v1 + - name: make + run: make + - name: test + run: | + sudo apt-get install tcl8.5 + ./runtest --accurate --verbose + - name: module api test + run: ./runtest-moduleapi --verbose + + test-libc-malloc: + runs-on: ubuntu-latest + timeout-minutes: 1200 + steps: + - uses: actions/checkout@v1 + - name: make + run: make MALLOC=libc + - name: test + run: | + sudo apt-get install tcl8.5 + ./runtest --accurate --verbose + - name: module api test + run: ./runtest-moduleapi --verbose + + test-valgrind: + runs-on: ubuntu-latest + timeout-minutes: 14400 + steps: + - uses: actions/checkout@v1 + - name: make + run: make valgrind + - name: test + run: | + sudo apt-get install tcl8.5 valgrind -y + ./runtest --valgrind --verbose --clients 1 + - name: module api test + run: ./runtest-moduleapi --valgrind --verbose --clients 1 diff --git a/src/blocked.c b/src/blocked.c index 045369e93..92f1cee65 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { int noack = 0; if (group) { - consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr, - 1); + consumer = + streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr, + SLC_NONE); noack = receiver->bpop.xread_group_noack; } diff --git a/src/debug.c b/src/debug.c index cbb56cb71..587ff7c5d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1636,7 +1636,7 @@ void enableWatchdog(int period) { /* Watchdog was actually disabled, so we have to setup the signal * handler. */ sigemptyset(&act.sa_mask); - act.sa_flags = SA_ONSTACK | SA_SIGINFO; + act.sa_flags = SA_SIGINFO; act.sa_sigaction = watchdogSignalHandler; sigaction(SIGALRM, &act, NULL); } diff --git a/src/networking.c b/src/networking.c index 9d10c4bb4..fd1159e06 100644 --- a/src/networking.c +++ b/src/networking.c @@ -436,6 +436,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) { sdsfree(s); } +/* Sometimes we are forced to create a new reply node, and we can't append to + * the previous one, when that happens, we wanna try to trim the unused space + * at the end of the last reply node which we won't use anymore. */ +void trimReplyUnusedTailSpace(client *c) { + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; + + /* Note that 'tail' may be NULL even if we have a tail node, becuase when + * addDeferredMultiBulkLength() is used */ + if (!tail) return; + + /* We only try to trim the space is relatively high (more than a 1/4 of the + * allocation), otherwise there's a high chance realloc will NOP. + * Also, to avoid large memmove which happens as part of realloc, we only do + * that if the used part is small. */ + if (tail->size - tail->used > tail->size / 4 && + tail->used < PROTO_REPLY_CHUNK_BYTES) + { + size_t old_size = tail->size; + tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation (at least for + * memory usage tracking) */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + c->reply_bytes += tail->size - old_size; + listNodeValue(ln) = tail; + } +} + /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addReplyDeferredLen(client *c) { @@ -443,6 +471,7 @@ void *addReplyDeferredLen(client *c) { * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ if (prepareClientToWrite(c) != C_OK) return NULL; + trimReplyUnusedTailSpace(c); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } @@ -1442,7 +1442,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { /* Load every single element of the list */ while(len--) { - if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) { + decrRefCount(o); + return NULL; + } dec = getDecodedObject(ele); size_t len = sdslen(dec->ptr); quicklistPushTail(o->ptr, dec->ptr, len); @@ -1469,8 +1472,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { long long llval; sds sdsele; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } if (o->encoding == OBJ_ENCODING_INTSET) { /* Fetch integer value from element. */ @@ -1509,13 +1514,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { double score; zskiplistNode *znode; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + decrRefCount(o); + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + decrRefCount(o); + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -1547,10 +1562,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) { len--; /* Load raw strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + sdsfree(field); + decrRefCount(o); + return NULL; + } /* Add pair to ziplist */ o->ptr = ziplistPush(o->ptr, (unsigned char*)field, @@ -1578,10 +1598,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (o->encoding == OBJ_ENCODING_HT && len > 0) { len--; /* Load encoded strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + sdsfree(field); + decrRefCount(o); + return NULL; + } /* Add pair to hash table */ ret = dictAdd((dict*)o->ptr, field, value); @@ -1601,7 +1626,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (len--) { unsigned char *zl = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (zl == NULL) return NULL; + if (zl == NULL) { + decrRefCount(o); + return NULL; + } quicklistAppendZiplist(o->ptr, zl); } } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP || @@ -1824,8 +1852,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { decrRefCount(o); return NULL; } - streamConsumer *consumer = streamLookupConsumer(cgroup,cname, - 1); + streamConsumer *consumer = + streamLookupConsumer(cgroup,cname,SLC_NONE); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { diff --git a/src/stream.h b/src/stream.h index b69073994..0d3bf63fc 100644 --- a/src/stream.h +++ b/src/stream.h @@ -96,6 +96,11 @@ typedef struct streamPropInfo { /* Prototypes of exported APIs. */ struct client; +/* Flags for streamLookupConsumer */ +#define SLC_NONE 0 +#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */ +#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */ + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); diff --git a/src/t_stream.c b/src/t_stream.c index 5c1b9a523..676ddd9bb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1570,7 +1570,8 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], - consumername->ptr,1); + consumername->ptr, + SLC_NONE); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; @@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * consumer does not exist it is automatically created as a side effect * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { + int create = !(flags & SLC_NOCREAT); + int refresh = !(flags & SLC_NOREFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { @@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } - consumer->seen_time = mstime(); + if (refresh) consumer->seen_time = mstime(); return consumer; } @@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { * may have pending messages: they are removed from the PEL, and the number * of pending messages "lost" is returned. */ uint64_t streamDelConsumer(streamCG *cg, sds name) { - streamConsumer *consumer = streamLookupConsumer(cg,name,0); + streamConsumer *consumer = + streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); if (consumer == NULL) return 0; uint64_t retval = raxSize(consumer->pel); @@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) { } /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */ else { - streamConsumer *consumer = consumername ? - streamLookupConsumer(group,consumername->ptr,0): - NULL; - - /* If a consumer name was mentioned but it does not exist, we can - * just return an empty array. */ - if (consumername && consumer == NULL) { - addReplyArrayLen(c,0); - return; + streamConsumer *consumer = NULL; + if (consumername) { + consumer = streamLookupConsumer(group, + consumername->ptr, + SLC_NOCREAT|SLC_NOREFRESH); + + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumer == NULL) { + addReplyArrayLen(c,0); + return; + } } rax *pel = consumer ? consumer->pel : group->pel; @@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) { raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ if (consumer == NULL) - consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); nack->consumer = consumer; nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise diff --git a/src/ziplist.c b/src/ziplist.c index ef40d6aa2..ddae0d96f 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -440,7 +440,7 @@ unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) { if ((prevlensize) == 1) { \ (prevlen) = (ptr)[0]; \ } else if ((prevlensize) == 5) { \ - assert(sizeof((prevlen)) == 4); \ + assert(sizeof((prevlen)) == 4); \ memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \ memrev32ifbe(&prevlen); \ } \ diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 2734de7f1..b82c87d71 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -54,6 +54,12 @@ tags {"aof"} { set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + test "Truncated AOF loaded: we expect foo to be equal to 5" { assert {[$client get foo] eq "5"} } @@ -71,6 +77,12 @@ tags {"aof"} { set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + test "Truncated AOF loaded: we expect foo to be equal to 6 now" { assert {[$client get foo] eq "6"} } diff --git a/tests/integration/replication-3.tcl b/tests/integration/replication-3.tcl index 198e698f2..43eb53538 100644 --- a/tests/integration/replication-3.tcl +++ b/tests/integration/replication-3.tcl @@ -118,7 +118,7 @@ start_server {tags {"repl"}} { # correctly the RDB file: such file will contain "lua" AUX # sections with scripts already in the memory of the master. - wait_for_condition 50 100 { + wait_for_condition 500 100 { [s -1 master_link_status] eq {up} } else { fail "Replication not started." |