summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/daily.yml48
-rw-r--r--redis.conf25
-rw-r--r--src/Makefile2
-rw-r--r--src/aof.c1
-rw-r--r--src/bio.c2
-rw-r--r--src/blocked.c7
-rw-r--r--src/config.c4
-rw-r--r--src/config.h11
-rw-r--r--src/debug.c2
-rw-r--r--src/dict.c4
-rw-r--r--src/multi.c5
-rw-r--r--src/networking.c59
-rw-r--r--src/rdb.c70
-rw-r--r--src/redis-benchmark.c17
-rw-r--r--src/server.c17
-rw-r--r--src/server.h9
-rw-r--r--src/setcpuaffinity.c129
-rw-r--r--src/stream.h7
-rw-r--r--src/t_stream.c35
-rw-r--r--src/tracking.c5
-rw-r--r--src/ziplist.c2
-rw-r--r--tests/integration/aof.tcl12
-rw-r--r--tests/integration/replication-3.tcl2
-rw-r--r--tests/test_helper.tcl7
-rw-r--r--tests/unit/introspection.tcl4
25 files changed, 430 insertions, 56 deletions
diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml
new file mode 100644
index 000000000..b6a9abb68
--- /dev/null
+++ b/.github/workflows/daily.yml
@@ -0,0 +1,48 @@
+name: Daily
+
+on:
+ schedule:
+ - cron: '0 7 * * *'
+
+jobs:
+ test-jemalloc:
+ runs-on: ubuntu-latest
+ timeout-minutes: 1200
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make
+ - name: test
+ run: |
+ sudo apt-get install tcl8.5
+ ./runtest --accurate --verbose
+ - name: module api test
+ run: ./runtest-moduleapi --verbose
+
+ test-libc-malloc:
+ runs-on: ubuntu-latest
+ timeout-minutes: 1200
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make MALLOC=libc
+ - name: test
+ run: |
+ sudo apt-get install tcl8.5
+ ./runtest --accurate --verbose
+ - name: module api test
+ run: ./runtest-moduleapi --verbose
+
+ test-valgrind:
+ runs-on: ubuntu-latest
+ timeout-minutes: 14400
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make valgrind
+ - name: test
+ run: |
+ sudo apt-get install tcl8.5 valgrind -y
+ ./runtest --valgrind --verbose --clients 1
+ - name: module api test
+ run: ./runtest-moduleapi --valgrind --verbose --clients 1
diff --git a/redis.conf b/redis.conf
index d96d26e1c..8841117fc 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1780,3 +1780,28 @@ rdb-save-incremental-fsync yes
# Maximum number of set/hash/zset/list fields that will be processed from
# the main dictionary scan
# active-defrag-max-scan-fields 1000
+
+# It is possible to pin different threads and processes of Redis to specific
+# CPUs in your system, in order to maximize the performances of the server.
+# This is useful both in order to pin different Redis threads in different
+# CPUs, but also in order to make sure that multiple Redis instances running
+# in the same host will be pinned to different CPUs.
+#
+# Normally you can do this using the "taskset" command, however it is also
+# possible to this via Redis configuration directly, both in Linux and FreeBSD.
+#
+# You can pin the server/IO threads, bio threads, aof rewrite child process, and
+# the bgsave child process. The syntax to specify the cpu list is the same as
+# the taskset command:
+#
+# Set redis server/io threads to cpu affinity 0,2,4,6:
+# server_cpulist 0-7:2
+#
+# Set bio threads to cpu affinity 1,3:
+# bio_cpulist 1,3
+#
+# Set aof rewrite child process to cpu affinity 8,9,10,11:
+# aof_rewrite_cpulist 8-11
+#
+# Set bgsave child process to cpu affinity 1,10,11
+# bgsave_cpulist 1,10-11
diff --git a/src/Makefile b/src/Makefile
index f9922afce..55f862cfc 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -206,7 +206,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crcspeed.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/aof.c b/src/aof.c
index 301a40848..02409abe6 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1596,6 +1596,7 @@ int rewriteAppendOnlyFileBackground(void) {
/* Child */
redisSetProcTitle("redis-aof-rewrite");
+ redisSetCpuAffinity(server.aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
diff --git a/src/bio.c b/src/bio.c
index 85f681185..69c62fc6f 100644
--- a/src/bio.c
+++ b/src/bio.c
@@ -166,6 +166,8 @@ void *bioProcessBackgroundJobs(void *arg) {
break;
}
+ redisSetCpuAffinity(server.bio_cpulist);
+
/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
diff --git a/src/blocked.c b/src/blocked.c
index 045369e93..92f1cee65 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
int noack = 0;
if (group) {
- consumer = streamLookupConsumer(group,
- receiver->bpop.xread_consumer->ptr,
- 1);
+ consumer =
+ streamLookupConsumer(group,
+ receiver->bpop.xread_consumer->ptr,
+ SLC_NONE);
noack = receiver->bpop.xread_group_noack;
}
diff --git a/src/config.c b/src/config.c
index e0cbcc281..64854592c 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2133,6 +2133,10 @@ standardConfig configs[] = {
createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL),
createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL),
createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL),
+ createStringConfig("server_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.server_cpulist, NULL, NULL, NULL),
+ createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL),
+ createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL),
+ createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bgsave_cpulist, NULL, NULL, NULL),
/* Enum Configs */
createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, server.supervised_mode, SUPERVISED_NONE, NULL, NULL),
diff --git a/src/config.h b/src/config.h
index 40dc683ce..6025d4e96 100644
--- a/src/config.h
+++ b/src/config.h
@@ -230,9 +230,12 @@ void setproctitle(const char *fmt, ...);
#ifdef __linux__
#define redis_set_thread_title(name) pthread_setname_np(pthread_self(), name)
#else
-#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
+#if (defined __FreeBSD__ || defined __OpenBSD__)
#include <pthread_np.h>
#define redis_set_thread_title(name) pthread_set_name_np(pthread_self(), name)
+#elif defined __NetBSD__
+#include <pthread.h>
+#define redis_set_thread_title(name) pthread_setname_np(pthread_self(), name, NULL)
#else
#if (defined __APPLE__ && defined(MAC_OS_X_VERSION_10_7))
int pthread_setname_np(const char *name);
@@ -244,4 +247,10 @@ int pthread_setname_np(const char *name);
#endif
#endif
+/* Check if we can use setcpuaffinity(). */
+#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
+#define USE_SETCPUAFFINITY
+void setcpuaffinity(const char *cpulist);
+#endif
+
#endif
diff --git a/src/debug.c b/src/debug.c
index cbb56cb71..587ff7c5d 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -1636,7 +1636,7 @@ void enableWatchdog(int period) {
/* Watchdog was actually disabled, so we have to setup the signal
* handler. */
sigemptyset(&act.sa_mask);
- act.sa_flags = SA_ONSTACK | SA_SIGINFO;
+ act.sa_flags = SA_SIGINFO;
act.sa_sigaction = watchdogSignalHandler;
sigaction(SIGALRM, &act, NULL);
}
diff --git a/src/dict.c b/src/dict.c
index ac6f8cfde..13c185253 100644
--- a/src/dict.c
+++ b/src/dict.c
@@ -766,8 +766,8 @@ dictEntry *dictGetFairRandomKey(dict *d) {
/* Function to reverse bits. Algorithm from:
* http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel */
static unsigned long rev(unsigned long v) {
- unsigned long s = 8 * sizeof(v); // bit size; must be power of 2
- unsigned long mask = ~0;
+ unsigned long s = CHAR_BIT * sizeof(v); // bit size; must be power of 2
+ unsigned long mask = ~0UL;
while ((s >>= 1) > 0) {
mask ^= (mask << s);
v = ((v >> s) & mask) | ((v << s) & ~mask);
diff --git a/src/multi.c b/src/multi.c
index cbbd2c513..a331a6240 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -172,7 +172,10 @@ void execCommand(client *c) {
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
- if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
+ if (!must_propagate &&
+ !server.loading &&
+ !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
+ {
execCommandPropagateMulti(c);
must_propagate = 1;
}
diff --git a/src/networking.c b/src/networking.c
index c4a277e0a..75c0c16b1 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -393,6 +393,35 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command "
"'%s'", from, to, s, cmdname);
+ if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
+ server.repl_backlog_histlen > 0)
+ {
+ long long dumplen = 256;
+ if (server.repl_backlog_histlen < dumplen)
+ dumplen = server.repl_backlog_histlen;
+
+ /* Identify the first byte to dump. */
+ long long idx =
+ (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
+ server.repl_backlog_size;
+
+ /* Scan the circular buffer to collect 'dumplen' bytes. */
+ sds dump = sdsempty();
+ while(dumplen) {
+ long long thislen =
+ ((server.repl_backlog_size - idx) < dumplen) ?
+ (server.repl_backlog_size - idx) : dumplen;
+
+ dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
+ dumplen -= thislen;
+ idx = 0;
+ }
+
+ /* Finally log such bytes: this is vital debugging info to
+ * understand what happened. */
+ serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
+ sdsfree(dump);
+ }
server.stat_unexpected_error_replies++;
}
}
@@ -436,6 +465,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) {
sdsfree(s);
}
+/* Sometimes we are forced to create a new reply node, and we can't append to
+ * the previous one, when that happens, we wanna try to trim the unused space
+ * at the end of the last reply node which we won't use anymore. */
+void trimReplyUnusedTailSpace(client *c) {
+ listNode *ln = listLast(c->reply);
+ clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
+
+ /* Note that 'tail' may be NULL even if we have a tail node, becuase when
+ * addDeferredMultiBulkLength() is used */
+ if (!tail) return;
+
+ /* We only try to trim the space is relatively high (more than a 1/4 of the
+ * allocation), otherwise there's a high chance realloc will NOP.
+ * Also, to avoid large memmove which happens as part of realloc, we only do
+ * that if the used part is small. */
+ if (tail->size - tail->used > tail->size / 4 &&
+ tail->used < PROTO_REPLY_CHUNK_BYTES)
+ {
+ size_t old_size = tail->size;
+ tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
+ /* take over the allocation's internal fragmentation (at least for
+ * memory usage tracking) */
+ tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
+ c->reply_bytes += tail->size - old_size;
+ listNodeValue(ln) = tail;
+ }
+}
+
/* Adds an empty object to the reply list that will contain the multi bulk
* length, which is not known when this function is called. */
void *addReplyDeferredLen(client *c) {
@@ -443,6 +500,7 @@ void *addReplyDeferredLen(client *c) {
* ready to be sent, since we are sure that before returning to the
* event loop setDeferredAggregateLen() will be called. */
if (prepareClientToWrite(c) != C_OK) return NULL;
+ trimReplyUnusedTailSpace(c);
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
return listLast(c->reply);
}
@@ -2843,6 +2901,7 @@ void *IOThreadMain(void *myid) {
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
+ redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
diff --git a/src/rdb.c b/src/rdb.c
index 9f6bf13f1..5cec208c5 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1351,6 +1351,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
/* Child */
redisSetProcTitle("redis-rdb-bgsave");
+ redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
@@ -1441,7 +1442,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
/* Load every single element of the list */
while(len--) {
- if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
+ if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
dec = getDecodedObject(ele);
size_t len = sdslen(dec->ptr);
quicklistPushTail(o->ptr, dec->ptr, len);
@@ -1468,8 +1472,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
long long llval;
sds sdsele;
- if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
if (o->encoding == OBJ_ENCODING_INTSET) {
/* Fetch integer value from element. */
@@ -1508,13 +1514,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
double score;
zskiplistNode *znode;
- if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
if (rdbtype == RDB_TYPE_ZSET_2) {
- if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
+ if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
+ decrRefCount(o);
+ sdsfree(sdsele);
+ return NULL;
+ }
} else {
- if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
+ if (rdbLoadDoubleValue(rdb,&score) == -1) {
+ decrRefCount(o);
+ sdsfree(sdsele);
+ return NULL;
+ }
}
/* Don't care about integer-encoded strings. */
@@ -1546,10 +1562,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
len--;
/* Load raw strings */
- if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
- if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
+ if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ sdsfree(field);
+ decrRefCount(o);
+ return NULL;
+ }
/* Add pair to ziplist */
o->ptr = ziplistPush(o->ptr, (unsigned char*)field,
@@ -1577,10 +1598,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
while (o->encoding == OBJ_ENCODING_HT && len > 0) {
len--;
/* Load encoded strings */
- if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
- if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
- == NULL) return NULL;
+ if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
+ if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
+ sdsfree(field);
+ decrRefCount(o);
+ return NULL;
+ }
/* Add pair to hash table */
ret = dictAdd((dict*)o->ptr, field, value);
@@ -1600,7 +1626,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
while (len--) {
unsigned char *zl =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
- if (zl == NULL) return NULL;
+ if (zl == NULL) {
+ decrRefCount(o);
+ return NULL;
+ }
quicklistAppendZiplist(o->ptr, zl);
}
} else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||
@@ -1823,8 +1852,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
decrRefCount(o);
return NULL;
}
- streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
- 1);
+ streamConsumer *consumer =
+ streamLookupConsumer(cgroup,cname,SLC_NONE);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
@@ -2292,7 +2321,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,"Wrong RDB checksum expected: (%llx) but "
- "got (%llx). Aborting now.",expected,cksum);
+ "got (%llx). Aborting now.",
+ (unsigned long long)expected,
+ (unsigned long long)cksum);
rdbExitReportCorruptRDB("RDB CRC error");
}
}
@@ -2458,6 +2489,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
rioInitWithFd(&rdb,server.rdb_pipe_write);
redisSetProcTitle("redis-rdb-to-slaves");
+ redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0)
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 2df41580b..77daf981c 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -94,6 +94,7 @@ static struct config {
sds dbnumstr;
char *tests;
char *auth;
+ const char *user;
int precision;
int num_threads;
struct benchmarkThread **threads;
@@ -258,7 +259,10 @@ static redisConfig *getRedisConfig(const char *ip, int port,
if(config.auth) {
void *authReply = NULL;
- redisAppendCommand(c, "AUTH %s", config.auth);
+ if (config.user == NULL)
+ redisAppendCommand(c, "AUTH %s", config.auth);
+ else
+ redisAppendCommand(c, "AUTH %s %s", config.user, config.auth);
if (REDIS_OK != redisGetReply(c, &authReply)) goto fail;
if (reply) freeReplyObject(reply);
reply = ((redisReply *) authReply);
@@ -628,7 +632,12 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->prefix_pending = 0;
if (config.auth) {
char *buf = NULL;
- int len = redisFormatCommand(&buf, "AUTH %s", config.auth);
+ int len;
+ if (config.user == NULL)
+ len = redisFormatCommand(&buf, "AUTH %s", config.auth);
+ else
+ len = redisFormatCommand(&buf, "AUTH %s %s",
+ config.user, config.auth);
c->obuf = sdscatlen(c->obuf, buf, len);
free(buf);
c->prefix_pending++;
@@ -1299,6 +1308,9 @@ int parseOptions(int argc, const char **argv) {
} else if (!strcmp(argv[i],"-a") ) {
if (lastarg) goto invalid;
config.auth = strdup(argv[++i]);
+ } else if (!strcmp(argv[i],"--user")) {
+ if (lastarg) goto invalid;
+ config.user = argv[++i];
} else if (!strcmp(argv[i],"-d")) {
if (lastarg) goto invalid;
config.datasize = atoi(argv[++i]);
@@ -1385,6 +1397,7 @@ usage:
" -p <port> Server port (default 6379)\n"
" -s <socket> Server socket (overrides host and port)\n"
" -a <password> Password for Redis Auth\n"
+" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
" -c <clients> Number of parallel connections (default 50)\n"
" -n <requests> Total number of requests (default 100000)\n"
" -d <size> Data size of SET/GET value in bytes (default 3)\n"
diff --git a/src/server.c b/src/server.c
index 659604ef3..e2b4b6f3d 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2899,7 +2899,6 @@ void initServer(void) {
scriptingInit(1);
slowlogInit();
latencyMonitorInit();
- crc64_init();
}
/* Some steps in server initialization need to be done last (after modules
@@ -3205,8 +3204,8 @@ void call(client *c, int flags) {
server.fixed_time_expire++;
- /* Sent the command to clients in MONITOR mode, only if the commands are
- * not generated from reading an AOF. */
+ /* Send the command to clients in MONITOR mode if applicable.
+ * Administrative commands are considered too dangerous to be shown. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
@@ -4281,6 +4280,7 @@ sds genRedisInfoString(const char *section) {
"active_defrag_key_misses:%lld\r\n"
"tracking_total_keys:%lld\r\n"
"tracking_total_items:%lld\r\n"
+ "tracking_total_prefixes:%lld\r\n"
"unexpected_error_replies:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
@@ -4311,6 +4311,7 @@ sds genRedisInfoString(const char *section) {
server.stat_active_defrag_key_misses,
(unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems(),
+ (unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies);
}
@@ -4850,6 +4851,14 @@ void redisSetProcTitle(char *title) {
#endif
}
+void redisSetCpuAffinity(const char *cpulist) {
+#ifdef USE_SETCPUAFFINITY
+ setcpuaffinity(cpulist);
+#else
+ UNUSED(cpulist);
+#endif
+}
+
/*
* Check whether systemd or upstart have been used to start redis.
*/
@@ -4953,6 +4962,7 @@ int main(int argc, char **argv) {
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
+ crc64_init();
uint8_t hashseed[16];
getRandomBytes(hashseed,sizeof(hashseed));
@@ -5118,6 +5128,7 @@ int main(int argc, char **argv) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
+ redisSetCpuAffinity(server.server_cpulist);
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
diff --git a/src/server.h b/src/server.h
index 41d767e13..59cf1370e 100644
--- a/src/server.h
+++ b/src/server.h
@@ -732,7 +732,7 @@ typedef struct readyList {
no AUTH is needed, and every
connection is immediately
authenticated. */
-typedef struct user {
+typedef struct {
sds name; /* The username as an SDS string. */
uint64_t flags; /* See USER_FLAG_* */
@@ -1433,6 +1433,11 @@ struct redisServer {
int tls_replication;
int tls_auth_clients;
redisTLSContextConfig tls_ctx_config;
+ /* cpu affinity */
+ char *server_cpulist; /* cpu affinity list of redis server main/io thread. */
+ char *bio_cpulist; /* cpu affinity list of bio thread. */
+ char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
+ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
};
typedef struct pubsubPattern {
@@ -1585,6 +1590,7 @@ void exitFromChild(int retcode);
size_t redisPopcount(void *s, long count);
void redisSetProcTitle(char *title);
int redisCommunicateSystemd(const char *sd_notify_msg);
+void redisSetCpuAffinity(const char *cpulist);
/* networking.c -- Networking and Client related operations */
client *createClient(connection *conn);
@@ -1689,6 +1695,7 @@ void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);
uint64_t trackingGetTotalKeys(void);
+uint64_t trackingGetTotalPrefixes(void);
void trackingBroadcastInvalidationMessages(void);
/* List data type */
diff --git a/src/setcpuaffinity.c b/src/setcpuaffinity.c
new file mode 100644
index 000000000..dcae81c71
--- /dev/null
+++ b/src/setcpuaffinity.c
@@ -0,0 +1,129 @@
+/* ==========================================================================
+ * setcpuaffinity.c - Linux/BSD setcpuaffinity.
+ * --------------------------------------------------------------------------
+ * Copyright (C) 2020 zhenwei pi
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do so, subject to the
+ * following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included
+ * in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+ * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+ * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+ * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+ * USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * ==========================================================================
+ */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <stdlib.h>
+#include <string.h>
+#include <ctype.h>
+#ifdef __linux__
+#include <sched.h>
+#endif
+#ifdef __FreeBSD__
+#include <sys/param.h>
+#include <sys/cpuset.h>
+#endif
+#include "config.h"
+
+#ifdef USE_SETCPUAFFINITY
+static const char *next_token(const char *q, int sep) {
+ if (q)
+ q = strchr(q, sep);
+ if (q)
+ q++;
+
+ return q;
+}
+
+static int next_num(const char *str, char **end, int *result) {
+ if (!str || *str == '\0' || !isdigit(*str))
+ return -1;
+
+ *result = strtoul(str, end, 10);
+ if (str == *end)
+ return -1;
+
+ return 0;
+}
+
+/* set current thread cpu affinity to cpu list, this function works like
+ * taskset command (actually cpulist parsing logic reference to util-linux).
+ * example of this function: "0,2,3", "0,2-3", "0-20:2". */
+void setcpuaffinity(const char *cpulist) {
+ const char *p, *q;
+ char *end = NULL;
+#ifdef __linux__
+ cpu_set_t cpuset;
+#endif
+#ifdef __FreeBSD__
+ cpuset_t cpuset;
+#endif
+
+ if (!cpulist)
+ return;
+
+ CPU_ZERO(&cpuset);
+
+ q = cpulist;
+ while (p = q, q = next_token(q, ','), p) {
+ int a, b, s;
+ const char *c1, *c2;
+
+ if (next_num(p, &end, &a) != 0)
+ return;
+
+ b = a;
+ s = 1;
+ p = end;
+
+ c1 = next_token(p, '-');
+ c2 = next_token(p, ',');
+
+ if (c1 != NULL && (c2 == NULL || c1 < c2)) {
+ if (next_num(c1, &end, &b) != 0)
+ return;
+
+ c1 = end && *end ? next_token(end, ':') : NULL;
+ if (c1 != NULL && (c2 == NULL || c1 < c2)) {
+ if (next_num(c1, &end, &s) != 0)
+ return;
+
+ if (s == 0)
+ return;
+ }
+ }
+
+ if ((a > b))
+ return;
+
+ while (a <= b) {
+ CPU_SET(a, &cpuset);
+ a += s;
+ }
+ }
+
+ if (end && *end)
+ return;
+
+#ifdef __linux__
+ sched_setaffinity(0, sizeof(cpuset), &cpuset);
+#endif
+#ifdef __FreeBSD__
+ cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset);
+#endif
+}
+
+#endif /* USE_SETCPUAFFINITY */
diff --git a/src/stream.h b/src/stream.h
index b69073994..0d3bf63fc 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -96,6 +96,11 @@ typedef struct streamPropInfo {
/* Prototypes of exported APIs. */
struct client;
+/* Flags for streamLookupConsumer */
+#define SLC_NONE 0
+#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
+#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
+
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
@@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
diff --git a/src/t_stream.c b/src/t_stream.c
index 5c1b9a523..676ddd9bb 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1570,7 +1570,8 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i],
- consumername->ptr,1);
+ consumername->ptr,
+ SLC_NONE);
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
@@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
* consumer does not exist it is automatically created as a side effect
* of calling this function, otherwise its last seen time is updated and
* the existing consumer reference returned. */
-streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
+streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
+ int create = !(flags & SLC_NOCREAT);
+ int refresh = !(flags & SLC_NOREFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) {
@@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL);
}
- consumer->seen_time = mstime();
+ if (refresh) consumer->seen_time = mstime();
return consumer;
}
@@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
* may have pending messages: they are removed from the PEL, and the number
* of pending messages "lost" is returned. */
uint64_t streamDelConsumer(streamCG *cg, sds name) {
- streamConsumer *consumer = streamLookupConsumer(cg,name,0);
+ streamConsumer *consumer =
+ streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
if (consumer == NULL) return 0;
uint64_t retval = raxSize(consumer->pel);
@@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) {
}
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else {
- streamConsumer *consumer = consumername ?
- streamLookupConsumer(group,consumername->ptr,0):
- NULL;
-
- /* If a consumer name was mentioned but it does not exist, we can
- * just return an empty array. */
- if (consumername && consumer == NULL) {
- addReplyArrayLen(c,0);
- return;
+ streamConsumer *consumer = NULL;
+ if (consumername) {
+ consumer = streamLookupConsumer(group,
+ consumername->ptr,
+ SLC_NOCREAT|SLC_NOREFRESH);
+
+ /* If a consumer name was mentioned but it does not exist, we can
+ * just return an empty array. */
+ if (consumer == NULL) {
+ addReplyArrayLen(c,0);
+ return;
+ }
}
rax *pel = consumer ? consumer->pel : group->pel;
@@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */
if (consumer == NULL)
- consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
+ consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
nack->consumer = consumer;
nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given, otherwise
diff --git a/src/tracking.c b/src/tracking.c
index 48d231627..a995817e2 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -518,3 +518,8 @@ uint64_t trackingGetTotalKeys(void) {
if (TrackingTable == NULL) return 0;
return raxSize(TrackingTable);
}
+
+uint64_t trackingGetTotalPrefixes(void) {
+ if (PrefixTable == NULL) return 0;
+ return raxSize(PrefixTable);
+}
diff --git a/src/ziplist.c b/src/ziplist.c
index ef40d6aa2..ddae0d96f 100644
--- a/src/ziplist.c
+++ b/src/ziplist.c
@@ -440,7 +440,7 @@ unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
if ((prevlensize) == 1) { \
(prevlen) = (ptr)[0]; \
} else if ((prevlensize) == 5) { \
- assert(sizeof((prevlen)) == 4); \
+ assert(sizeof((prevlen)) == 4); \
memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \
memrev32ifbe(&prevlen); \
} \
diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl
index 2734de7f1..b82c87d71 100644
--- a/tests/integration/aof.tcl
+++ b/tests/integration/aof.tcl
@@ -54,6 +54,12 @@ tags {"aof"} {
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
+ wait_for_condition 50 100 {
+ [catch {$client ping} e] == 0
+ } else {
+ fail "Loading DB is taking too much time."
+ }
+
test "Truncated AOF loaded: we expect foo to be equal to 5" {
assert {[$client get foo] eq "5"}
}
@@ -71,6 +77,12 @@ tags {"aof"} {
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
+ wait_for_condition 50 100 {
+ [catch {$client ping} e] == 0
+ } else {
+ fail "Loading DB is taking too much time."
+ }
+
test "Truncated AOF loaded: we expect foo to be equal to 6 now" {
assert {[$client get foo] eq "6"}
}
diff --git a/tests/integration/replication-3.tcl b/tests/integration/replication-3.tcl
index 198e698f2..43eb53538 100644
--- a/tests/integration/replication-3.tcl
+++ b/tests/integration/replication-3.tcl
@@ -118,7 +118,7 @@ start_server {tags {"repl"}} {
# correctly the RDB file: such file will contain "lua" AUX
# sections with scripts already in the memory of the master.
- wait_for_condition 50 100 {
+ wait_for_condition 500 100 {
[s -1 master_link_status] eq {up}
} else {
fail "Replication not started."
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 3eee1aeb7..de0a64728 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -216,9 +216,6 @@ proc run_solo {name code} {
}
proc cleanup {} {
- if {$::dont_clean} {
- return
- }
if {!$::quiet} {puts -nonewline "Cleanup: may take some time... "}
flush stdout
catch {exec rm -rf {*}[glob tests/tmp/redis.conf.*]}
@@ -456,11 +453,11 @@ proc the_end {} {
foreach failed $::failed_tests {
puts "*** $failed"
}
- cleanup
+ if {!$::dont_clean} cleanup
exit 1
} else {
puts "\n[colorstr bold-white {\o/}] [colorstr bold-green {All tests passed without errors!}]\n"
- cleanup
+ if {!$::dont_clean} cleanup
exit 0
}
}
diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl
index cd905084a..b60ca0d48 100644
--- a/tests/unit/introspection.tcl
+++ b/tests/unit/introspection.tcl
@@ -94,6 +94,10 @@ start_server {tags {"introspection"}} {
slaveof
bind
requirepass
+ server_cpulist
+ bio_cpulist
+ aof_rewrite_cpulist
+ bgsave_cpulist
}
set configs {}