diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/aof.c | 1 | ||||
-rw-r--r-- | src/bio.c | 2 | ||||
-rw-r--r-- | src/blocked.c | 7 | ||||
-rw-r--r-- | src/config.c | 4 | ||||
-rw-r--r-- | src/config.h | 11 | ||||
-rw-r--r-- | src/debug.c | 2 | ||||
-rw-r--r-- | src/dict.c | 4 | ||||
-rw-r--r-- | src/multi.c | 5 | ||||
-rw-r--r-- | src/networking.c | 59 | ||||
-rw-r--r-- | src/rdb.c | 70 | ||||
-rw-r--r-- | src/redis-benchmark.c | 17 | ||||
-rw-r--r-- | src/server.c | 17 | ||||
-rw-r--r-- | src/server.h | 9 | ||||
-rw-r--r-- | src/setcpuaffinity.c | 129 | ||||
-rw-r--r-- | src/stream.h | 7 | ||||
-rw-r--r-- | src/t_stream.c | 35 | ||||
-rw-r--r-- | src/tracking.c | 5 | ||||
-rw-r--r-- | src/ziplist.c | 2 |
19 files changed, 338 insertions, 50 deletions
diff --git a/src/Makefile b/src/Makefile index f9922afce..55f862cfc 100644 --- a/src/Makefile +++ b/src/Makefile @@ -206,7 +206,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crcspeed.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark @@ -1596,6 +1596,7 @@ int rewriteAppendOnlyFileBackground(void) { /* Child */ redisSetProcTitle("redis-aof-rewrite"); + redisSetCpuAffinity(server.aof_rewrite_cpulist); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite"); @@ -166,6 +166,8 @@ void *bioProcessBackgroundJobs(void *arg) { break; } + redisSetCpuAffinity(server.bio_cpulist); + /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); diff --git a/src/blocked.c b/src/blocked.c index 045369e93..92f1cee65 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { int noack = 0; if (group) { - consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr, - 1); + consumer = + streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr, + SLC_NONE); noack = receiver->bpop.xread_group_noack; } diff --git a/src/config.c b/src/config.c index e0cbcc281..64854592c 100644 --- a/src/config.c +++ b/src/config.c @@ -2133,6 +2133,10 @@ standardConfig configs[] = { createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL), createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL), createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL), + createStringConfig("server_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.server_cpulist, NULL, NULL, NULL), + createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL), + createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL), + createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bgsave_cpulist, NULL, NULL, NULL), /* Enum Configs */ createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, server.supervised_mode, SUPERVISED_NONE, NULL, NULL), diff --git a/src/config.h b/src/config.h index 40dc683ce..6025d4e96 100644 --- a/src/config.h +++ b/src/config.h @@ -230,9 +230,12 @@ void setproctitle(const char *fmt, ...); #ifdef __linux__ #define redis_set_thread_title(name) pthread_setname_np(pthread_self(), name) #else -#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#if (defined __FreeBSD__ || defined __OpenBSD__) #include <pthread_np.h> #define redis_set_thread_title(name) pthread_set_name_np(pthread_self(), name) +#elif defined __NetBSD__ +#include <pthread.h> +#define redis_set_thread_title(name) pthread_setname_np(pthread_self(), name, NULL) #else #if (defined __APPLE__ && defined(MAC_OS_X_VERSION_10_7)) int pthread_setname_np(const char *name); @@ -244,4 +247,10 @@ int pthread_setname_np(const char *name); #endif #endif +/* Check if we can use setcpuaffinity(). */ +#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#define USE_SETCPUAFFINITY +void setcpuaffinity(const char *cpulist); +#endif + #endif diff --git a/src/debug.c b/src/debug.c index cbb56cb71..587ff7c5d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1636,7 +1636,7 @@ void enableWatchdog(int period) { /* Watchdog was actually disabled, so we have to setup the signal * handler. */ sigemptyset(&act.sa_mask); - act.sa_flags = SA_ONSTACK | SA_SIGINFO; + act.sa_flags = SA_SIGINFO; act.sa_sigaction = watchdogSignalHandler; sigaction(SIGALRM, &act, NULL); } diff --git a/src/dict.c b/src/dict.c index ac6f8cfde..13c185253 100644 --- a/src/dict.c +++ b/src/dict.c @@ -766,8 +766,8 @@ dictEntry *dictGetFairRandomKey(dict *d) { /* Function to reverse bits. Algorithm from: * http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel */ static unsigned long rev(unsigned long v) { - unsigned long s = 8 * sizeof(v); // bit size; must be power of 2 - unsigned long mask = ~0; + unsigned long s = CHAR_BIT * sizeof(v); // bit size; must be power of 2 + unsigned long mask = ~0UL; while ((s >>= 1) > 0) { mask ^= (mask << s); v = ((v >> s) & mask) | ((v << s) & ~mask); diff --git a/src/multi.c b/src/multi.c index cbbd2c513..a331a6240 100644 --- a/src/multi.c +++ b/src/multi.c @@ -172,7 +172,10 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && + !server.loading && + !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) + { execCommandPropagateMulti(c); must_propagate = 1; } diff --git a/src/networking.c b/src/networking.c index c4a277e0a..75c0c16b1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -393,6 +393,35 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " "to its %s: '%s' after processing the command " "'%s'", from, to, s, cmdname); + if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog && + server.repl_backlog_histlen > 0) + { + long long dumplen = 256; + if (server.repl_backlog_histlen < dumplen) + dumplen = server.repl_backlog_histlen; + + /* Identify the first byte to dump. */ + long long idx = + (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) % + server.repl_backlog_size; + + /* Scan the circular buffer to collect 'dumplen' bytes. */ + sds dump = sdsempty(); + while(dumplen) { + long long thislen = + ((server.repl_backlog_size - idx) < dumplen) ? + (server.repl_backlog_size - idx) : dumplen; + + dump = sdscatrepr(dump,server.repl_backlog+idx,thislen); + dumplen -= thislen; + idx = 0; + } + + /* Finally log such bytes: this is vital debugging info to + * understand what happened. */ + serverLog(LL_WARNING,"Latest backlog is: '%s'", dump); + sdsfree(dump); + } server.stat_unexpected_error_replies++; } } @@ -436,6 +465,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) { sdsfree(s); } +/* Sometimes we are forced to create a new reply node, and we can't append to + * the previous one, when that happens, we wanna try to trim the unused space + * at the end of the last reply node which we won't use anymore. */ +void trimReplyUnusedTailSpace(client *c) { + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; + + /* Note that 'tail' may be NULL even if we have a tail node, becuase when + * addDeferredMultiBulkLength() is used */ + if (!tail) return; + + /* We only try to trim the space is relatively high (more than a 1/4 of the + * allocation), otherwise there's a high chance realloc will NOP. + * Also, to avoid large memmove which happens as part of realloc, we only do + * that if the used part is small. */ + if (tail->size - tail->used > tail->size / 4 && + tail->used < PROTO_REPLY_CHUNK_BYTES) + { + size_t old_size = tail->size; + tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation (at least for + * memory usage tracking) */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + c->reply_bytes += tail->size - old_size; + listNodeValue(ln) = tail; + } +} + /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addReplyDeferredLen(client *c) { @@ -443,6 +500,7 @@ void *addReplyDeferredLen(client *c) { * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ if (prepareClientToWrite(c) != C_OK) return NULL; + trimReplyUnusedTailSpace(c); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } @@ -2843,6 +2901,7 @@ void *IOThreadMain(void *myid) { snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); + redisSetCpuAffinity(server.server_cpulist); while(1) { /* Wait for start */ @@ -1351,6 +1351,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { /* Child */ redisSetProcTitle("redis-rdb-bgsave"); + redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSave(filename,rsi); if (retval == C_OK) { sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); @@ -1441,7 +1442,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { /* Load every single element of the list */ while(len--) { - if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) { + decrRefCount(o); + return NULL; + } dec = getDecodedObject(ele); size_t len = sdslen(dec->ptr); quicklistPushTail(o->ptr, dec->ptr, len); @@ -1468,8 +1472,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { long long llval; sds sdsele; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } if (o->encoding == OBJ_ENCODING_INTSET) { /* Fetch integer value from element. */ @@ -1508,13 +1514,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { double score; zskiplistNode *znode; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + decrRefCount(o); + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + decrRefCount(o); + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -1546,10 +1562,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) { len--; /* Load raw strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + sdsfree(field); + decrRefCount(o); + return NULL; + } /* Add pair to ziplist */ o->ptr = ziplistPush(o->ptr, (unsigned char*)field, @@ -1577,10 +1598,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (o->encoding == OBJ_ENCODING_HT && len > 0) { len--; /* Load encoded strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + sdsfree(field); + decrRefCount(o); + return NULL; + } /* Add pair to hash table */ ret = dictAdd((dict*)o->ptr, field, value); @@ -1600,7 +1626,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (len--) { unsigned char *zl = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (zl == NULL) return NULL; + if (zl == NULL) { + decrRefCount(o); + return NULL; + } quicklistAppendZiplist(o->ptr, zl); } } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP || @@ -1823,8 +1852,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { decrRefCount(o); return NULL; } - streamConsumer *consumer = streamLookupConsumer(cgroup,cname, - 1); + streamConsumer *consumer = + streamLookupConsumer(cgroup,cname,SLC_NONE); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { @@ -2292,7 +2321,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { serverLog(LL_WARNING,"Wrong RDB checksum expected: (%llx) but " - "got (%llx). Aborting now.",expected,cksum); + "got (%llx). Aborting now.", + (unsigned long long)expected, + (unsigned long long)cksum); rdbExitReportCorruptRDB("RDB CRC error"); } } @@ -2458,6 +2489,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { rioInitWithFd(&rdb,server.rdb_pipe_write); redisSetProcTitle("redis-rdb-to-slaves"); + redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi); if (retval == C_OK && rioFlush(&rdb) == 0) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 2df41580b..77daf981c 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -94,6 +94,7 @@ static struct config { sds dbnumstr; char *tests; char *auth; + const char *user; int precision; int num_threads; struct benchmarkThread **threads; @@ -258,7 +259,10 @@ static redisConfig *getRedisConfig(const char *ip, int port, if(config.auth) { void *authReply = NULL; - redisAppendCommand(c, "AUTH %s", config.auth); + if (config.user == NULL) + redisAppendCommand(c, "AUTH %s", config.auth); + else + redisAppendCommand(c, "AUTH %s %s", config.user, config.auth); if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; if (reply) freeReplyObject(reply); reply = ((redisReply *) authReply); @@ -628,7 +632,12 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending = 0; if (config.auth) { char *buf = NULL; - int len = redisFormatCommand(&buf, "AUTH %s", config.auth); + int len; + if (config.user == NULL) + len = redisFormatCommand(&buf, "AUTH %s", config.auth); + else + len = redisFormatCommand(&buf, "AUTH %s %s", + config.user, config.auth); c->obuf = sdscatlen(c->obuf, buf, len); free(buf); c->prefix_pending++; @@ -1299,6 +1308,9 @@ int parseOptions(int argc, const char **argv) { } else if (!strcmp(argv[i],"-a") ) { if (lastarg) goto invalid; config.auth = strdup(argv[++i]); + } else if (!strcmp(argv[i],"--user")) { + if (lastarg) goto invalid; + config.user = argv[++i]; } else if (!strcmp(argv[i],"-d")) { if (lastarg) goto invalid; config.datasize = atoi(argv[++i]); @@ -1385,6 +1397,7 @@ usage: " -p <port> Server port (default 6379)\n" " -s <socket> Server socket (overrides host and port)\n" " -a <password> Password for Redis Auth\n" +" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n" " -c <clients> Number of parallel connections (default 50)\n" " -n <requests> Total number of requests (default 100000)\n" " -d <size> Data size of SET/GET value in bytes (default 3)\n" diff --git a/src/server.c b/src/server.c index 659604ef3..e2b4b6f3d 100644 --- a/src/server.c +++ b/src/server.c @@ -2899,7 +2899,6 @@ void initServer(void) { scriptingInit(1); slowlogInit(); latencyMonitorInit(); - crc64_init(); } /* Some steps in server initialization need to be done last (after modules @@ -3205,8 +3204,8 @@ void call(client *c, int flags) { server.fixed_time_expire++; - /* Sent the command to clients in MONITOR mode, only if the commands are - * not generated from reading an AOF. */ + /* Send the command to clients in MONITOR mode if applicable. + * Administrative commands are considered too dangerous to be shown. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) @@ -4281,6 +4280,7 @@ sds genRedisInfoString(const char *section) { "active_defrag_key_misses:%lld\r\n" "tracking_total_keys:%lld\r\n" "tracking_total_items:%lld\r\n" + "tracking_total_prefixes:%lld\r\n" "unexpected_error_replies:%lld\r\n", server.stat_numconnections, server.stat_numcommands, @@ -4311,6 +4311,7 @@ sds genRedisInfoString(const char *section) { server.stat_active_defrag_key_misses, (unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalItems(), + (unsigned long long) trackingGetTotalPrefixes(), server.stat_unexpected_error_replies); } @@ -4850,6 +4851,14 @@ void redisSetProcTitle(char *title) { #endif } +void redisSetCpuAffinity(const char *cpulist) { +#ifdef USE_SETCPUAFFINITY + setcpuaffinity(cpulist); +#else + UNUSED(cpulist); +#endif +} + /* * Check whether systemd or upstart have been used to start redis. */ @@ -4953,6 +4962,7 @@ int main(int argc, char **argv) { zmalloc_set_oom_handler(redisOutOfMemoryHandler); srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); + crc64_init(); uint8_t hashseed[16]; getRandomBytes(hashseed,sizeof(hashseed)); @@ -5118,6 +5128,7 @@ int main(int argc, char **argv) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } + redisSetCpuAffinity(server.server_cpulist); aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); diff --git a/src/server.h b/src/server.h index 41d767e13..59cf1370e 100644 --- a/src/server.h +++ b/src/server.h @@ -732,7 +732,7 @@ typedef struct readyList { no AUTH is needed, and every connection is immediately authenticated. */ -typedef struct user { +typedef struct { sds name; /* The username as an SDS string. */ uint64_t flags; /* See USER_FLAG_* */ @@ -1433,6 +1433,11 @@ struct redisServer { int tls_replication; int tls_auth_clients; redisTLSContextConfig tls_ctx_config; + /* cpu affinity */ + char *server_cpulist; /* cpu affinity list of redis server main/io thread. */ + char *bio_cpulist; /* cpu affinity list of bio thread. */ + char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ + char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ }; typedef struct pubsubPattern { @@ -1585,6 +1590,7 @@ void exitFromChild(int retcode); size_t redisPopcount(void *s, long count); void redisSetProcTitle(char *title); int redisCommunicateSystemd(const char *sd_notify_msg); +void redisSetCpuAffinity(const char *cpulist); /* networking.c -- Networking and Client related operations */ client *createClient(connection *conn); @@ -1689,6 +1695,7 @@ void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); +uint64_t trackingGetTotalPrefixes(void); void trackingBroadcastInvalidationMessages(void); /* List data type */ diff --git a/src/setcpuaffinity.c b/src/setcpuaffinity.c new file mode 100644 index 000000000..dcae81c71 --- /dev/null +++ b/src/setcpuaffinity.c @@ -0,0 +1,129 @@ +/* ========================================================================== + * setcpuaffinity.c - Linux/BSD setcpuaffinity. + * -------------------------------------------------------------------------- + * Copyright (C) 2020 zhenwei pi + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the + * following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * ========================================================================== + */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include <stdlib.h> +#include <string.h> +#include <ctype.h> +#ifdef __linux__ +#include <sched.h> +#endif +#ifdef __FreeBSD__ +#include <sys/param.h> +#include <sys/cpuset.h> +#endif +#include "config.h" + +#ifdef USE_SETCPUAFFINITY +static const char *next_token(const char *q, int sep) { + if (q) + q = strchr(q, sep); + if (q) + q++; + + return q; +} + +static int next_num(const char *str, char **end, int *result) { + if (!str || *str == '\0' || !isdigit(*str)) + return -1; + + *result = strtoul(str, end, 10); + if (str == *end) + return -1; + + return 0; +} + +/* set current thread cpu affinity to cpu list, this function works like + * taskset command (actually cpulist parsing logic reference to util-linux). + * example of this function: "0,2,3", "0,2-3", "0-20:2". */ +void setcpuaffinity(const char *cpulist) { + const char *p, *q; + char *end = NULL; +#ifdef __linux__ + cpu_set_t cpuset; +#endif +#ifdef __FreeBSD__ + cpuset_t cpuset; +#endif + + if (!cpulist) + return; + + CPU_ZERO(&cpuset); + + q = cpulist; + while (p = q, q = next_token(q, ','), p) { + int a, b, s; + const char *c1, *c2; + + if (next_num(p, &end, &a) != 0) + return; + + b = a; + s = 1; + p = end; + + c1 = next_token(p, '-'); + c2 = next_token(p, ','); + + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + if (next_num(c1, &end, &b) != 0) + return; + + c1 = end && *end ? next_token(end, ':') : NULL; + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + if (next_num(c1, &end, &s) != 0) + return; + + if (s == 0) + return; + } + } + + if ((a > b)) + return; + + while (a <= b) { + CPU_SET(a, &cpuset); + a += s; + } + } + + if (end && *end) + return; + +#ifdef __linux__ + sched_setaffinity(0, sizeof(cpuset), &cpuset); +#endif +#ifdef __FreeBSD__ + cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset); +#endif +} + +#endif /* USE_SETCPUAFFINITY */ diff --git a/src/stream.h b/src/stream.h index b69073994..0d3bf63fc 100644 --- a/src/stream.h +++ b/src/stream.h @@ -96,6 +96,11 @@ typedef struct streamPropInfo { /* Prototypes of exported APIs. */ struct client; +/* Flags for streamLookupConsumer */ +#define SLC_NONE 0 +#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */ +#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */ + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); diff --git a/src/t_stream.c b/src/t_stream.c index 5c1b9a523..676ddd9bb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1570,7 +1570,8 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], - consumername->ptr,1); + consumername->ptr, + SLC_NONE); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; @@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * consumer does not exist it is automatically created as a side effect * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { + int create = !(flags & SLC_NOCREAT); + int refresh = !(flags & SLC_NOREFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { @@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } - consumer->seen_time = mstime(); + if (refresh) consumer->seen_time = mstime(); return consumer; } @@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { * may have pending messages: they are removed from the PEL, and the number * of pending messages "lost" is returned. */ uint64_t streamDelConsumer(streamCG *cg, sds name) { - streamConsumer *consumer = streamLookupConsumer(cg,name,0); + streamConsumer *consumer = + streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); if (consumer == NULL) return 0; uint64_t retval = raxSize(consumer->pel); @@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) { } /* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */ else { - streamConsumer *consumer = consumername ? - streamLookupConsumer(group,consumername->ptr,0): - NULL; - - /* If a consumer name was mentioned but it does not exist, we can - * just return an empty array. */ - if (consumername && consumer == NULL) { - addReplyArrayLen(c,0); - return; + streamConsumer *consumer = NULL; + if (consumername) { + consumer = streamLookupConsumer(group, + consumername->ptr, + SLC_NOCREAT|SLC_NOREFRESH); + + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumer == NULL) { + addReplyArrayLen(c,0); + return; + } } rax *pel = consumer ? consumer->pel : group->pel; @@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) { raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ if (consumer == NULL) - consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); nack->consumer = consumer; nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise diff --git a/src/tracking.c b/src/tracking.c index 48d231627..a995817e2 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -518,3 +518,8 @@ uint64_t trackingGetTotalKeys(void) { if (TrackingTable == NULL) return 0; return raxSize(TrackingTable); } + +uint64_t trackingGetTotalPrefixes(void) { + if (PrefixTable == NULL) return 0; + return raxSize(PrefixTable); +} diff --git a/src/ziplist.c b/src/ziplist.c index ef40d6aa2..ddae0d96f 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -440,7 +440,7 @@ unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) { if ((prevlensize) == 1) { \ (prevlen) = (ptr)[0]; \ } else if ((prevlensize) == 5) { \ - assert(sizeof((prevlen)) == 4); \ + assert(sizeof((prevlen)) == 4); \ memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \ memrev32ifbe(&prevlen); \ } \ |