diff options
author | Oran Agra <oran@redislabs.com> | 2021-02-01 20:11:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-01 20:11:42 +0200 |
commit | 2dba1e391d3772a8da182d95bde050ffa9d01e4d (patch) | |
tree | 3664bcd3ede605643a18668624f41c846b5e43ab /src/server.c | |
parent | ec2d180739aa3877a45ec54438c68a7659be5159 (diff) | |
parent | 95338f9cc41fdfd050f122789187db75fda1fe3c (diff) | |
download | redis-6.2-rc3.tar.gz |
Merge 6.2 RC36.2-rc3
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 292 |
1 files changed, 227 insertions, 65 deletions
diff --git a/src/server.c b/src/server.c index 0551eb3e4..faaca7215 100644 --- a/src/server.c +++ b/src/server.c @@ -201,6 +201,14 @@ struct redisCommand redisCommandTable[] = { "read-only fast @string", 0,NULL,1,1,1,0,0,0}, + {"getex",getexCommand,-2, + "write fast @string", + 0,NULL,1,1,1,0,0,0}, + + {"getdel",getdelCommand,2, + "write fast @string", + 0,NULL,1,1,1,0,0,0}, + /* Note that we can't flag set as fast, since it may perform an * implicit DEL of a large key. */ {"set",setCommand,-3, @@ -449,15 +457,15 @@ struct redisCommand redisCommandTable[] = { {"zunionstore",zunionstoreCommand,-4, "write use-memory @sortedset", - 0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0}, + 0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0}, {"zinterstore",zinterstoreCommand,-4, "write use-memory @sortedset", - 0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0}, + 0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0}, {"zdiffstore",zdiffstoreCommand,-4, "write use-memory @sortedset", - 0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0}, + 0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0}, {"zunion",zunionCommand,-3, "read-only @sortedset", @@ -547,6 +555,10 @@ struct redisCommand redisCommandTable[] = { "write no-script fast @sortedset @blocking", 0,NULL,1,-2,1,0,0,0}, + {"zrandmember",zrandmemberCommand,-2, + "read-only random @sortedset", + 0,NULL,1,1,1,0,0,0}, + {"hset",hsetCommand,-4, "write use-memory fast @hash", 0,NULL,1,1,1,0,0,0}, @@ -603,6 +615,10 @@ struct redisCommand redisCommandTable[] = { "read-only fast @hash", 0,NULL,1,1,1,0,0,0}, + {"hrandfield",hrandfieldCommand,-2, + "read-only random @hash", + 0,NULL,1,1,1,0,0,0}, + {"hscan",hscanCommand,-3, "read-only random @hash", 0,NULL,1,1,1,0,0,0}, @@ -744,7 +760,7 @@ struct redisCommand redisCommandTable[] = { "admin no-script", 0,NULL,0,0,0,0,0,0}, - {"psync",syncCommand,3, + {"psync",syncCommand,-3, "admin no-script", 0,NULL,0,0,0,0,0,0}, @@ -941,7 +957,7 @@ struct redisCommand redisCommandTable[] = { {"georadius_ro",georadiusroCommand,-6, "read-only @geo", - 0,georadiusGetKeys,1,1,1,0,0,0}, + 0,NULL,1,1,1,0,0,0}, {"georadiusbymember",georadiusbymemberCommand,-5, "write use-memory @geo", @@ -949,7 +965,7 @@ struct redisCommand redisCommandTable[] = { {"georadiusbymember_ro",georadiusbymemberroCommand,-5, "read-only @geo", - 0,georadiusGetKeys,1,1,1,0,0,0}, + 0,NULL,1,1,1,0,0,0}, {"geohash",geohashCommand,-2, "read-only @geo", @@ -1016,11 +1032,11 @@ struct redisCommand redisCommandTable[] = { {"xread",xreadCommand,-4, "read-only @stream @blocking", - 0,xreadGetKeys,1,1,1,0,0,0}, + 0,xreadGetKeys,0,0,0,0,0,0}, {"xreadgroup",xreadCommand,-7, "write @stream @blocking", - 0,xreadGetKeys,1,1,1,0,0,0}, + 0,xreadGetKeys,0,0,0,0,0,0}, {"xgroup",xgroupCommand,-2, "write use-memory @stream", @@ -1084,6 +1100,10 @@ struct redisCommand redisCommandTable[] = { {"reset",resetCommand,1, "no-script ok-stale ok-loading fast @connection", + 0,NULL,0,0,0,0,0,0}, + + {"failover",failoverCommand,-1, + "admin no-script ok-stale", 0,NULL,0,0,0,0,0,0} }; @@ -1444,6 +1464,17 @@ dictType hashDictType = { NULL /* allow to expand */ }; +/* Dict type without destructor */ +dictType sdsReplyDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + /* Keylist hash table type has unencoded redis objects as keys and * lists as values. It's used for blocking operations (BLPOP) and to * map swapped keys to a list of clients waiting for this keys to be loaded. */ @@ -1592,6 +1623,9 @@ void resetChildState() { server.stat_current_cow_bytes = 0; updateDictResizePolicy(); closeChildInfoPipe(); + moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD, + REDISMODULE_SUBEVENT_FORK_CHILD_DIED, + NULL); } /* Return if child type is mutual exclusive with other fork children */ @@ -2159,14 +2193,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ - if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); + if (server.aof_state == AOF_ON && server.aof_flush_postponed_start) + flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * a higher frequency. */ run_with_period(1000) { - if (server.aof_last_write_status == C_ERR) + if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } @@ -2174,8 +2209,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { checkClientPauseTimeoutAndReturnIfPaused(); /* Replication cron function -- used to reconnect to master, - * detect transfer failures, start background RDB transfers and so forth. */ - run_with_period(1000) replicationCron(); + * detect transfer failures, start background RDB transfers and so forth. + * + * If Redis is trying to failover then run the replication cron faster so + * progress on the handshake happens more quickly. */ + if (server.failover_state != NO_FAILOVER) { + run_with_period(100) replicationCron(); + } else { + run_with_period(1000) replicationCron(); + } /* Run the Redis Cluster cron. */ run_with_period(100) { @@ -2386,12 +2428,18 @@ void beforeSleep(struct aeEventLoop *eventLoop) { server.get_ack_from_slaves = 0; } + /* We may have recieved updates from clients about their current offset. NOTE: + * this can't be done where the ACK is recieved since failover will disconnect + * our clients. */ + updateFailoverStatus(); + /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); + if (server.aof_state == AOF_ON) + flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); @@ -2532,6 +2580,12 @@ void createSharedObjects(void) { /* Used in the LMOVE/BLMOVE commands */ shared.left = createStringObject("left",4); shared.right = createStringObject("right",5); + shared.pexpireat = createStringObject("PEXPIREAT",9); + shared.pexpire = createStringObject("PEXPIRE",7); + shared.persist = createStringObject("PERSIST",7); + shared.set = createStringObject("SET",3); + shared.pxat = createStringObject("PXAT", 4); + shared.px = createStringObject("PX",2); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -2634,6 +2688,13 @@ void initServerConfig(void) { server.repl_backlog_off = 0; server.repl_no_slaves_since = time(NULL); + /* Failover related */ + server.failover_end_time = 0; + server.force_failover = 0; + server.target_replica_host = NULL; + server.target_replica_port = 0; + server.failover_state = NO_FAILOVER; + /* Client output buffer limits */ for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; @@ -2957,6 +3018,7 @@ int listenToPort(int port, int *fds, int *count) { return C_ERR; } anetNonBlock(NULL,fds[*count]); + anetCloexec(fds[*count]); (*count)++; } return C_OK; @@ -3095,6 +3157,7 @@ void initServer(void) { exit(1); } anetNonBlock(NULL,server.sofd); + anetCloexec(server.sofd); } /* Abort if there are no listening sockets at all. */ @@ -3557,7 +3620,7 @@ void preventCommandReplication(client *c) { */ void call(client *c, int flags) { long long dirty; - ustime_t start, duration; + monotime call_timer; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; static long long prev_err_count; @@ -3583,9 +3646,10 @@ void call(client *c, int flags) { dirty = server.dirty; prev_err_count = server.stat_total_error_replies; updateCachedTime(0); - start = server.ustime; + elapsedStart(&call_timer); c->cmd->proc(c); - duration = ustime()-start; + const long duration = elapsedUs(call_timer); + c->duration = duration; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; @@ -3629,7 +3693,10 @@ void call(client *c, int flags) { * arguments. */ robj **argv = c->original_argv ? c->original_argv : c->argv; int argc = c->original_argv ? c->original_argc : c->argc; - slowlogPushEntryIfNeeded(c,argv,argc,duration); + /* If the client is blocked we will handle slowlog when it is unblocked . */ + if (!(c->flags & CLIENT_BLOCKED)) { + slowlogPushEntryIfNeeded(c,argv,argc,duration); + } } freeClientOriginalArgv(c); @@ -4682,7 +4749,7 @@ sds genRedisInfoString(const char *section) { "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" "module_fork_last_cow_size:%zu\r\n", - server.loading, + (int)server.loading, server.stat_current_cow_bytes, server.dirty, server.child_type == CHILD_TYPE_RDB, @@ -4972,6 +5039,7 @@ sds genRedisInfoString(const char *section) { } } info = sdscatprintf(info, + "master_failover_state:%s\r\n" "master_replid:%s\r\n" "master_replid2:%s\r\n" "master_repl_offset:%lld\r\n" @@ -4980,6 +5048,7 @@ sds genRedisInfoString(const char *section) { "repl_backlog_size:%lld\r\n" "repl_backlog_first_byte_offset:%lld\r\n" "repl_backlog_histlen:%lld\r\n", + getFailoverStateString(), server.replid, server.replid2, server.master_repl_offset, @@ -5184,7 +5253,7 @@ static int smapsGetSharedDirty(unsigned long addr) { FILE *f; f = fopen("/proc/self/smaps", "r"); - serverAssert(f); + if (!f) return -1; while (1) { if (!fgets(buf, sizeof(buf), f)) @@ -5195,8 +5264,8 @@ static int smapsGetSharedDirty(unsigned long addr) { in_mapping = from <= addr && addr < to; if (in_mapping && !memcmp(buf, "Shared_Dirty:", 13)) { - ret = sscanf(buf, "%*s %d", &val); - serverAssert(ret == 1); + sscanf(buf, "%*s %d", &val); + /* If parsing fails, we remain with val == -1 */ break; } } @@ -5210,23 +5279,33 @@ static int smapsGetSharedDirty(unsigned long addr) { * kernel is affected. * The bug was fixed in commit ff1712f953e27f0b0718762ec17d0adb15c9fd0b * titled: "arm64: pgtable: Ensure dirty bit is preserved across pte_wrprotect()" - * Return 1 if the kernel seems to be affected, and 0 otherwise. */ + * Return -1 on unexpected test failure, 1 if the kernel seems to be affected, + * and 0 otherwise. */ int linuxMadvFreeForkBugCheck(void) { - int ret, pipefd[2]; + int ret, pipefd[2] = { -1, -1 }; pid_t pid; - char *p, *q, bug_found = 0; - const long map_size = 3 * 4096; + char *p = NULL, *q; + int bug_found = 0; + long page_size = sysconf(_SC_PAGESIZE); + long map_size = 3 * page_size; /* Create a memory map that's in our full control (not one used by the allocator). */ p = mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); - serverAssert(p != MAP_FAILED); + if (p == MAP_FAILED) { + serverLog(LL_WARNING, "Failed to mmap(): %s", strerror(errno)); + return -1; + } - q = p + 4096; + q = p + page_size; /* Split the memory map in 3 pages by setting their protection as RO|RW|RO to prevent * Linux from merging this memory map with adjacent VMAs. */ - ret = mprotect(q, 4096, PROT_READ | PROT_WRITE); - serverAssert(!ret); + ret = mprotect(q, page_size, PROT_READ | PROT_WRITE); + if (ret < 0) { + serverLog(LL_WARNING, "Failed to mprotect(): %s", strerror(errno)); + bug_found = -1; + goto exit; + } /* Write to the page once to make it resident */ *(volatile char*)q = 0; @@ -5235,8 +5314,16 @@ int linuxMadvFreeForkBugCheck(void) { #ifndef MADV_FREE #define MADV_FREE 8 #endif - ret = madvise(q, 4096, MADV_FREE); - serverAssert(!ret); + ret = madvise(q, page_size, MADV_FREE); + if (ret < 0) { + /* MADV_FREE is not available on older kernels that are presumably + * not affected. */ + if (errno == EINVAL) goto exit; + + serverLog(LL_WARNING, "Failed to madvise(): %s", strerror(errno)); + bug_found = -1; + goto exit; + } /* Write to the page after being marked for freeing, this is supposed to take * ownership of that page again. */ @@ -5244,37 +5331,47 @@ int linuxMadvFreeForkBugCheck(void) { /* Create a pipe for the child to return the info to the parent. */ ret = pipe(pipefd); - serverAssert(!ret); + if (ret < 0) { + serverLog(LL_WARNING, "Failed to create pipe: %s", strerror(errno)); + bug_found = -1; + goto exit; + } /* Fork the process. */ pid = fork(); - serverAssert(pid >= 0); - if (!pid) { - /* Child: check if the page is marked as dirty, expecing 4 (kB). + if (pid < 0) { + serverLog(LL_WARNING, "Failed to fork: %s", strerror(errno)); + bug_found = -1; + goto exit; + } else if (!pid) { + /* Child: check if the page is marked as dirty, page_size in kb. * A value of 0 means the kernel is affected by the bug. */ - if (!smapsGetSharedDirty((unsigned long)q)) + ret = smapsGetSharedDirty((unsigned long) q); + if (!ret) bug_found = 1; + else if (ret == -1) /* Failed to read */ + bug_found = -1; - ret = write(pipefd[1], &bug_found, 1); - serverAssert(ret == 1); - + if (write(pipefd[1], &bug_found, sizeof(bug_found)) < 0) + serverLog(LL_WARNING, "Failed to write to parent: %s", strerror(errno)); exit(0); } else { /* Read the result from the child. */ - ret = read(pipefd[0], &bug_found, 1); - serverAssert(ret == 1); + ret = read(pipefd[0], &bug_found, sizeof(bug_found)); + if (ret < 0) { + serverLog(LL_WARNING, "Failed to read from child: %s", strerror(errno)); + bug_found = -1; + } /* Reap the child pid. */ - serverAssert(waitpid(pid, NULL, 0) == pid); + waitpid(pid, NULL, 0); } +exit: /* Cleanup */ - ret = close(pipefd[0]); - serverAssert(!ret); - ret = close(pipefd[1]); - serverAssert(!ret); - ret = munmap(p, map_size); - serverAssert(!ret); + if (pipefd[0] != -1) close(pipefd[0]); + if (pipefd[1] != -1) close(pipefd[1]); + if (p != NULL) munmap(p, map_size); return bug_found; } @@ -5470,7 +5567,7 @@ void setupChildSignalHandlers(void) { * of the parent process, e.g. fd(socket or flock) etc. * should close the resources not used by the child process, so that if the * parent restarts it can bind/lock despite the child possibly still running. */ -void closeClildUnusedResourceAfterFork() { +void closeChildUnusedResourceAfterFork() { closeListeningSockets(0); if (server.cluster_enabled && server.cluster_config_file_lock_fd != -1) close(server.cluster_config_file_lock_fd); /* don't care if this fails */ @@ -5497,7 +5594,7 @@ int redisFork(int purpose) { server.in_fork_child = purpose; setOOMScoreAdj(CONFIG_OOM_BGCHILD); setupChildSignalHandlers(); - closeClildUnusedResourceAfterFork(); + closeChildUnusedResourceAfterFork(); } else { /* Parent */ server.stat_total_forks++; @@ -5523,6 +5620,9 @@ int redisFork(int purpose) { } updateDictResizePolicy(); + moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD, + REDISMODULE_SUBEVENT_FORK_CHILD_BORN, + NULL); } return childpid; } @@ -5533,7 +5633,7 @@ void sendChildCOWInfo(int ptype, int on_exit, char *pname) { if (private_dirty) { serverLog(on_exit ? LL_NOTICE : LL_VERBOSE, "%s: %zu MB of memory used by copy-on-write", - pname, private_dirty); + pname, private_dirty/(1024*1024)); } sendChildInfo(ptype, on_exit, private_dirty); @@ -5598,20 +5698,68 @@ void redisOutOfMemoryHandler(size_t allocation_size) { allocation_size); } -void redisSetProcTitle(char *title) { +/* Callback for sdstemplate on proc-title-template. See redis.conf for + * supported variables. + */ +static sds redisProcTitleGetVariable(const sds varname, void *arg) +{ + if (!strcmp(varname, "title")) { + return sdsnew(arg); + } else if (!strcmp(varname, "listen-addr")) { + if (server.port || server.tls_port) + return sdscatprintf(sdsempty(), "%s:%u", + server.bindaddr_count ? server.bindaddr[0] : "*", + server.port ? server.port : server.tls_port); + else + return sdscatprintf(sdsempty(), "unixsocket:%s", server.unixsocket); + } else if (!strcmp(varname, "server-mode")) { + if (server.cluster_enabled) return sdsnew("[cluster]"); + else if (server.sentinel_mode) return sdsnew("[sentinel]"); + else return sdsempty(); + } else if (!strcmp(varname, "config-file")) { + return sdsnew(server.configfile ? server.configfile : "-"); + } else if (!strcmp(varname, "port")) { + return sdscatprintf(sdsempty(), "%u", server.port); + } else if (!strcmp(varname, "tls-port")) { + return sdscatprintf(sdsempty(), "%u", server.tls_port); + } else if (!strcmp(varname, "unixsocket")) { + return sdsnew(server.unixsocket); + } else + return NULL; /* Unknown variable name */ +} + +/* Expand the specified proc-title-template string and return a newly + * allocated sds, or NULL. */ +static sds expandProcTitleTemplate(const char *template, const char *title) { + sds res = sdstemplate(template, redisProcTitleGetVariable, (void *) title); + if (!res) + return NULL; + return sdstrim(res, " "); +} +/* Validate the specified template, returns 1 if valid or 0 otherwise. */ +int validateProcTitleTemplate(const char *template) { + int ok = 1; + sds res = expandProcTitleTemplate(template, ""); + if (!res) + return 0; + if (sdslen(res) == 0) ok = 0; + sdsfree(res); + return ok; +} + +int redisSetProcTitle(char *title) { #ifdef USE_SETPROCTITLE - char *server_mode = ""; - if (server.cluster_enabled) server_mode = " [cluster]"; - else if (server.sentinel_mode) server_mode = " [sentinel]"; - - setproctitle("%s %s:%d%s", - title, - server.bindaddr_count ? server.bindaddr[0] : "*", - server.port ? server.port : server.tls_port, - server_mode); + if (!title) title = server.exec_argv[0]; + sds proc_title = expandProcTitleTemplate(server.proc_title_template, title); + if (!proc_title) return C_ERR; /* Not likely, proc_title_template is validated */ + + setproctitle("%s", proc_title); + sdsfree(proc_title); #else UNUSED(title); #endif + + return C_OK; } void redisSetCpuAffinity(const char *cpulist) { @@ -5751,6 +5899,12 @@ int main(int argc, char **argv) { init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid()); crc64_init(); + /* Store umask value. Because umask(2) only offers a set-and-get API we have + * to reset it and restore it back. We do this early to avoid a potential + * race condition with threads that could be creating files or directories. + */ + umask(server.umask = umask(0777)); + uint8_t hashseed[16]; getRandomBytes(hashseed,sizeof(hashseed)); dictSetHashFunctionSeed(hashseed); @@ -5843,6 +5997,7 @@ int main(int argc, char **argv) { exit(1); } loadServerConfig(server.configfile, config_from_stdin, options); + if (server.sentinel_mode) loadSentinelConfigFromQueue(); sdsfree(options); } @@ -5868,7 +6023,7 @@ int main(int argc, char **argv) { readOOMScoreAdj(); initServer(); if (background || server.pidfile) createPidFile(); - redisSetProcTitle(argv[0]); + if (server.set_proc_title) redisSetProcTitle(NULL); redisAsciiArt(); checkTcpBacklogSettings(); @@ -5878,10 +6033,17 @@ int main(int argc, char **argv) { #ifdef __linux__ linuxMemoryWarnings(); #if defined (__arm64__) - if (linuxMadvFreeForkBugCheck()) { - serverLog(LL_WARNING,"WARNING Your kernel has a bug that could lead to data corruption during background save. Please upgrade to the latest stable kernel."); + int ret; + if ((ret = linuxMadvFreeForkBugCheck())) { + if (ret == 1) + serverLog(LL_WARNING,"WARNING Your kernel has a bug that could lead to data corruption during background save. " + "Please upgrade to the latest stable kernel."); + else + serverLog(LL_WARNING, "Failed to test the kernel for a bug that could lead to data corruption during background save. " + "Your system could be affected, please report this error."); if (!checkIgnoreWarning("ARM64-COW-BUG")) { - serverLog(LL_WARNING,"Redis will now exit to prevent data corruption. Note that it is possible to suppress this warning by setting the following config: ignore-warnings ARM64-COW-BUG"); + serverLog(LL_WARNING,"Redis will now exit to prevent data corruption. " + "Note that it is possible to suppress this warning by setting the following config: ignore-warnings ARM64-COW-BUG"); exit(1); } } |