summaryrefslogtreecommitdiff
path: root/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c292
1 files changed, 227 insertions, 65 deletions
diff --git a/src/server.c b/src/server.c
index 0551eb3e4..faaca7215 100644
--- a/src/server.c
+++ b/src/server.c
@@ -201,6 +201,14 @@ struct redisCommand redisCommandTable[] = {
"read-only fast @string",
0,NULL,1,1,1,0,0,0},
+ {"getex",getexCommand,-2,
+ "write fast @string",
+ 0,NULL,1,1,1,0,0,0},
+
+ {"getdel",getdelCommand,2,
+ "write fast @string",
+ 0,NULL,1,1,1,0,0,0},
+
/* Note that we can't flag set as fast, since it may perform an
* implicit DEL of a large key. */
{"set",setCommand,-3,
@@ -449,15 +457,15 @@ struct redisCommand redisCommandTable[] = {
{"zunionstore",zunionstoreCommand,-4,
"write use-memory @sortedset",
- 0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0},
+ 0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0},
{"zinterstore",zinterstoreCommand,-4,
"write use-memory @sortedset",
- 0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0},
+ 0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0},
{"zdiffstore",zdiffstoreCommand,-4,
"write use-memory @sortedset",
- 0,zunionInterDiffStoreGetKeys,0,0,0,0,0,0},
+ 0,zunionInterDiffStoreGetKeys,1,1,1,0,0,0},
{"zunion",zunionCommand,-3,
"read-only @sortedset",
@@ -547,6 +555,10 @@ struct redisCommand redisCommandTable[] = {
"write no-script fast @sortedset @blocking",
0,NULL,1,-2,1,0,0,0},
+ {"zrandmember",zrandmemberCommand,-2,
+ "read-only random @sortedset",
+ 0,NULL,1,1,1,0,0,0},
+
{"hset",hsetCommand,-4,
"write use-memory fast @hash",
0,NULL,1,1,1,0,0,0},
@@ -603,6 +615,10 @@ struct redisCommand redisCommandTable[] = {
"read-only fast @hash",
0,NULL,1,1,1,0,0,0},
+ {"hrandfield",hrandfieldCommand,-2,
+ "read-only random @hash",
+ 0,NULL,1,1,1,0,0,0},
+
{"hscan",hscanCommand,-3,
"read-only random @hash",
0,NULL,1,1,1,0,0,0},
@@ -744,7 +760,7 @@ struct redisCommand redisCommandTable[] = {
"admin no-script",
0,NULL,0,0,0,0,0,0},
- {"psync",syncCommand,3,
+ {"psync",syncCommand,-3,
"admin no-script",
0,NULL,0,0,0,0,0,0},
@@ -941,7 +957,7 @@ struct redisCommand redisCommandTable[] = {
{"georadius_ro",georadiusroCommand,-6,
"read-only @geo",
- 0,georadiusGetKeys,1,1,1,0,0,0},
+ 0,NULL,1,1,1,0,0,0},
{"georadiusbymember",georadiusbymemberCommand,-5,
"write use-memory @geo",
@@ -949,7 +965,7 @@ struct redisCommand redisCommandTable[] = {
{"georadiusbymember_ro",georadiusbymemberroCommand,-5,
"read-only @geo",
- 0,georadiusGetKeys,1,1,1,0,0,0},
+ 0,NULL,1,1,1,0,0,0},
{"geohash",geohashCommand,-2,
"read-only @geo",
@@ -1016,11 +1032,11 @@ struct redisCommand redisCommandTable[] = {
{"xread",xreadCommand,-4,
"read-only @stream @blocking",
- 0,xreadGetKeys,1,1,1,0,0,0},
+ 0,xreadGetKeys,0,0,0,0,0,0},
{"xreadgroup",xreadCommand,-7,
"write @stream @blocking",
- 0,xreadGetKeys,1,1,1,0,0,0},
+ 0,xreadGetKeys,0,0,0,0,0,0},
{"xgroup",xgroupCommand,-2,
"write use-memory @stream",
@@ -1084,6 +1100,10 @@ struct redisCommand redisCommandTable[] = {
{"reset",resetCommand,1,
"no-script ok-stale ok-loading fast @connection",
+ 0,NULL,0,0,0,0,0,0},
+
+ {"failover",failoverCommand,-1,
+ "admin no-script ok-stale",
0,NULL,0,0,0,0,0,0}
};
@@ -1444,6 +1464,17 @@ dictType hashDictType = {
NULL /* allow to expand */
};
+/* Dict type without destructor */
+dictType sdsReplyDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL, /* val destructor */
+ NULL /* allow to expand */
+};
+
/* Keylist hash table type has unencoded redis objects as keys and
* lists as values. It's used for blocking operations (BLPOP) and to
* map swapped keys to a list of clients waiting for this keys to be loaded. */
@@ -1592,6 +1623,9 @@ void resetChildState() {
server.stat_current_cow_bytes = 0;
updateDictResizePolicy();
closeChildInfoPipe();
+ moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD,
+ REDISMODULE_SUBEVENT_FORK_CHILD_DIED,
+ NULL);
}
/* Return if child type is mutual exclusive with other fork children */
@@ -2159,14 +2193,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
- if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
+ if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)
+ flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
- if (server.aof_last_write_status == C_ERR)
+ if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
@@ -2174,8 +2209,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
checkClientPauseTimeoutAndReturnIfPaused();
/* Replication cron function -- used to reconnect to master,
- * detect transfer failures, start background RDB transfers and so forth. */
- run_with_period(1000) replicationCron();
+ * detect transfer failures, start background RDB transfers and so forth.
+ *
+ * If Redis is trying to failover then run the replication cron faster so
+ * progress on the handshake happens more quickly. */
+ if (server.failover_state != NO_FAILOVER) {
+ run_with_period(100) replicationCron();
+ } else {
+ run_with_period(1000) replicationCron();
+ }
/* Run the Redis Cluster cron. */
run_with_period(100) {
@@ -2386,12 +2428,18 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
server.get_ack_from_slaves = 0;
}
+ /* We may have recieved updates from clients about their current offset. NOTE:
+ * this can't be done where the ACK is recieved since failover will disconnect
+ * our clients. */
+ updateFailoverStatus();
+
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
- flushAppendOnlyFile(0);
+ if (server.aof_state == AOF_ON)
+ flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
@@ -2532,6 +2580,12 @@ void createSharedObjects(void) {
/* Used in the LMOVE/BLMOVE commands */
shared.left = createStringObject("left",4);
shared.right = createStringObject("right",5);
+ shared.pexpireat = createStringObject("PEXPIREAT",9);
+ shared.pexpire = createStringObject("PEXPIRE",7);
+ shared.persist = createStringObject("PERSIST",7);
+ shared.set = createStringObject("SET",3);
+ shared.pxat = createStringObject("PXAT", 4);
+ shared.px = createStringObject("PX",2);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
@@ -2634,6 +2688,13 @@ void initServerConfig(void) {
server.repl_backlog_off = 0;
server.repl_no_slaves_since = time(NULL);
+ /* Failover related */
+ server.failover_end_time = 0;
+ server.force_failover = 0;
+ server.target_replica_host = NULL;
+ server.target_replica_port = 0;
+ server.failover_state = NO_FAILOVER;
+
/* Client output buffer limits */
for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
@@ -2957,6 +3018,7 @@ int listenToPort(int port, int *fds, int *count) {
return C_ERR;
}
anetNonBlock(NULL,fds[*count]);
+ anetCloexec(fds[*count]);
(*count)++;
}
return C_OK;
@@ -3095,6 +3157,7 @@ void initServer(void) {
exit(1);
}
anetNonBlock(NULL,server.sofd);
+ anetCloexec(server.sofd);
}
/* Abort if there are no listening sockets at all. */
@@ -3557,7 +3620,7 @@ void preventCommandReplication(client *c) {
*/
void call(client *c, int flags) {
long long dirty;
- ustime_t start, duration;
+ monotime call_timer;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
static long long prev_err_count;
@@ -3583,9 +3646,10 @@ void call(client *c, int flags) {
dirty = server.dirty;
prev_err_count = server.stat_total_error_replies;
updateCachedTime(0);
- start = server.ustime;
+ elapsedStart(&call_timer);
c->cmd->proc(c);
- duration = ustime()-start;
+ const long duration = elapsedUs(call_timer);
+ c->duration = duration;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
@@ -3629,7 +3693,10 @@ void call(client *c, int flags) {
* arguments. */
robj **argv = c->original_argv ? c->original_argv : c->argv;
int argc = c->original_argv ? c->original_argc : c->argc;
- slowlogPushEntryIfNeeded(c,argv,argc,duration);
+ /* If the client is blocked we will handle slowlog when it is unblocked . */
+ if (!(c->flags & CLIENT_BLOCKED)) {
+ slowlogPushEntryIfNeeded(c,argv,argc,duration);
+ }
}
freeClientOriginalArgv(c);
@@ -4682,7 +4749,7 @@ sds genRedisInfoString(const char *section) {
"aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n",
- server.loading,
+ (int)server.loading,
server.stat_current_cow_bytes,
server.dirty,
server.child_type == CHILD_TYPE_RDB,
@@ -4972,6 +5039,7 @@ sds genRedisInfoString(const char *section) {
}
}
info = sdscatprintf(info,
+ "master_failover_state:%s\r\n"
"master_replid:%s\r\n"
"master_replid2:%s\r\n"
"master_repl_offset:%lld\r\n"
@@ -4980,6 +5048,7 @@ sds genRedisInfoString(const char *section) {
"repl_backlog_size:%lld\r\n"
"repl_backlog_first_byte_offset:%lld\r\n"
"repl_backlog_histlen:%lld\r\n",
+ getFailoverStateString(),
server.replid,
server.replid2,
server.master_repl_offset,
@@ -5184,7 +5253,7 @@ static int smapsGetSharedDirty(unsigned long addr) {
FILE *f;
f = fopen("/proc/self/smaps", "r");
- serverAssert(f);
+ if (!f) return -1;
while (1) {
if (!fgets(buf, sizeof(buf), f))
@@ -5195,8 +5264,8 @@ static int smapsGetSharedDirty(unsigned long addr) {
in_mapping = from <= addr && addr < to;
if (in_mapping && !memcmp(buf, "Shared_Dirty:", 13)) {
- ret = sscanf(buf, "%*s %d", &val);
- serverAssert(ret == 1);
+ sscanf(buf, "%*s %d", &val);
+ /* If parsing fails, we remain with val == -1 */
break;
}
}
@@ -5210,23 +5279,33 @@ static int smapsGetSharedDirty(unsigned long addr) {
* kernel is affected.
* The bug was fixed in commit ff1712f953e27f0b0718762ec17d0adb15c9fd0b
* titled: "arm64: pgtable: Ensure dirty bit is preserved across pte_wrprotect()"
- * Return 1 if the kernel seems to be affected, and 0 otherwise. */
+ * Return -1 on unexpected test failure, 1 if the kernel seems to be affected,
+ * and 0 otherwise. */
int linuxMadvFreeForkBugCheck(void) {
- int ret, pipefd[2];
+ int ret, pipefd[2] = { -1, -1 };
pid_t pid;
- char *p, *q, bug_found = 0;
- const long map_size = 3 * 4096;
+ char *p = NULL, *q;
+ int bug_found = 0;
+ long page_size = sysconf(_SC_PAGESIZE);
+ long map_size = 3 * page_size;
/* Create a memory map that's in our full control (not one used by the allocator). */
p = mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
- serverAssert(p != MAP_FAILED);
+ if (p == MAP_FAILED) {
+ serverLog(LL_WARNING, "Failed to mmap(): %s", strerror(errno));
+ return -1;
+ }
- q = p + 4096;
+ q = p + page_size;
/* Split the memory map in 3 pages by setting their protection as RO|RW|RO to prevent
* Linux from merging this memory map with adjacent VMAs. */
- ret = mprotect(q, 4096, PROT_READ | PROT_WRITE);
- serverAssert(!ret);
+ ret = mprotect(q, page_size, PROT_READ | PROT_WRITE);
+ if (ret < 0) {
+ serverLog(LL_WARNING, "Failed to mprotect(): %s", strerror(errno));
+ bug_found = -1;
+ goto exit;
+ }
/* Write to the page once to make it resident */
*(volatile char*)q = 0;
@@ -5235,8 +5314,16 @@ int linuxMadvFreeForkBugCheck(void) {
#ifndef MADV_FREE
#define MADV_FREE 8
#endif
- ret = madvise(q, 4096, MADV_FREE);
- serverAssert(!ret);
+ ret = madvise(q, page_size, MADV_FREE);
+ if (ret < 0) {
+ /* MADV_FREE is not available on older kernels that are presumably
+ * not affected. */
+ if (errno == EINVAL) goto exit;
+
+ serverLog(LL_WARNING, "Failed to madvise(): %s", strerror(errno));
+ bug_found = -1;
+ goto exit;
+ }
/* Write to the page after being marked for freeing, this is supposed to take
* ownership of that page again. */
@@ -5244,37 +5331,47 @@ int linuxMadvFreeForkBugCheck(void) {
/* Create a pipe for the child to return the info to the parent. */
ret = pipe(pipefd);
- serverAssert(!ret);
+ if (ret < 0) {
+ serverLog(LL_WARNING, "Failed to create pipe: %s", strerror(errno));
+ bug_found = -1;
+ goto exit;
+ }
/* Fork the process. */
pid = fork();
- serverAssert(pid >= 0);
- if (!pid) {
- /* Child: check if the page is marked as dirty, expecing 4 (kB).
+ if (pid < 0) {
+ serverLog(LL_WARNING, "Failed to fork: %s", strerror(errno));
+ bug_found = -1;
+ goto exit;
+ } else if (!pid) {
+ /* Child: check if the page is marked as dirty, page_size in kb.
* A value of 0 means the kernel is affected by the bug. */
- if (!smapsGetSharedDirty((unsigned long)q))
+ ret = smapsGetSharedDirty((unsigned long) q);
+ if (!ret)
bug_found = 1;
+ else if (ret == -1) /* Failed to read */
+ bug_found = -1;
- ret = write(pipefd[1], &bug_found, 1);
- serverAssert(ret == 1);
-
+ if (write(pipefd[1], &bug_found, sizeof(bug_found)) < 0)
+ serverLog(LL_WARNING, "Failed to write to parent: %s", strerror(errno));
exit(0);
} else {
/* Read the result from the child. */
- ret = read(pipefd[0], &bug_found, 1);
- serverAssert(ret == 1);
+ ret = read(pipefd[0], &bug_found, sizeof(bug_found));
+ if (ret < 0) {
+ serverLog(LL_WARNING, "Failed to read from child: %s", strerror(errno));
+ bug_found = -1;
+ }
/* Reap the child pid. */
- serverAssert(waitpid(pid, NULL, 0) == pid);
+ waitpid(pid, NULL, 0);
}
+exit:
/* Cleanup */
- ret = close(pipefd[0]);
- serverAssert(!ret);
- ret = close(pipefd[1]);
- serverAssert(!ret);
- ret = munmap(p, map_size);
- serverAssert(!ret);
+ if (pipefd[0] != -1) close(pipefd[0]);
+ if (pipefd[1] != -1) close(pipefd[1]);
+ if (p != NULL) munmap(p, map_size);
return bug_found;
}
@@ -5470,7 +5567,7 @@ void setupChildSignalHandlers(void) {
* of the parent process, e.g. fd(socket or flock) etc.
* should close the resources not used by the child process, so that if the
* parent restarts it can bind/lock despite the child possibly still running. */
-void closeClildUnusedResourceAfterFork() {
+void closeChildUnusedResourceAfterFork() {
closeListeningSockets(0);
if (server.cluster_enabled && server.cluster_config_file_lock_fd != -1)
close(server.cluster_config_file_lock_fd); /* don't care if this fails */
@@ -5497,7 +5594,7 @@ int redisFork(int purpose) {
server.in_fork_child = purpose;
setOOMScoreAdj(CONFIG_OOM_BGCHILD);
setupChildSignalHandlers();
- closeClildUnusedResourceAfterFork();
+ closeChildUnusedResourceAfterFork();
} else {
/* Parent */
server.stat_total_forks++;
@@ -5523,6 +5620,9 @@ int redisFork(int purpose) {
}
updateDictResizePolicy();
+ moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD,
+ REDISMODULE_SUBEVENT_FORK_CHILD_BORN,
+ NULL);
}
return childpid;
}
@@ -5533,7 +5633,7 @@ void sendChildCOWInfo(int ptype, int on_exit, char *pname) {
if (private_dirty) {
serverLog(on_exit ? LL_NOTICE : LL_VERBOSE,
"%s: %zu MB of memory used by copy-on-write",
- pname, private_dirty);
+ pname, private_dirty/(1024*1024));
}
sendChildInfo(ptype, on_exit, private_dirty);
@@ -5598,20 +5698,68 @@ void redisOutOfMemoryHandler(size_t allocation_size) {
allocation_size);
}
-void redisSetProcTitle(char *title) {
+/* Callback for sdstemplate on proc-title-template. See redis.conf for
+ * supported variables.
+ */
+static sds redisProcTitleGetVariable(const sds varname, void *arg)
+{
+ if (!strcmp(varname, "title")) {
+ return sdsnew(arg);
+ } else if (!strcmp(varname, "listen-addr")) {
+ if (server.port || server.tls_port)
+ return sdscatprintf(sdsempty(), "%s:%u",
+ server.bindaddr_count ? server.bindaddr[0] : "*",
+ server.port ? server.port : server.tls_port);
+ else
+ return sdscatprintf(sdsempty(), "unixsocket:%s", server.unixsocket);
+ } else if (!strcmp(varname, "server-mode")) {
+ if (server.cluster_enabled) return sdsnew("[cluster]");
+ else if (server.sentinel_mode) return sdsnew("[sentinel]");
+ else return sdsempty();
+ } else if (!strcmp(varname, "config-file")) {
+ return sdsnew(server.configfile ? server.configfile : "-");
+ } else if (!strcmp(varname, "port")) {
+ return sdscatprintf(sdsempty(), "%u", server.port);
+ } else if (!strcmp(varname, "tls-port")) {
+ return sdscatprintf(sdsempty(), "%u", server.tls_port);
+ } else if (!strcmp(varname, "unixsocket")) {
+ return sdsnew(server.unixsocket);
+ } else
+ return NULL; /* Unknown variable name */
+}
+
+/* Expand the specified proc-title-template string and return a newly
+ * allocated sds, or NULL. */
+static sds expandProcTitleTemplate(const char *template, const char *title) {
+ sds res = sdstemplate(template, redisProcTitleGetVariable, (void *) title);
+ if (!res)
+ return NULL;
+ return sdstrim(res, " ");
+}
+/* Validate the specified template, returns 1 if valid or 0 otherwise. */
+int validateProcTitleTemplate(const char *template) {
+ int ok = 1;
+ sds res = expandProcTitleTemplate(template, "");
+ if (!res)
+ return 0;
+ if (sdslen(res) == 0) ok = 0;
+ sdsfree(res);
+ return ok;
+}
+
+int redisSetProcTitle(char *title) {
#ifdef USE_SETPROCTITLE
- char *server_mode = "";
- if (server.cluster_enabled) server_mode = " [cluster]";
- else if (server.sentinel_mode) server_mode = " [sentinel]";
-
- setproctitle("%s %s:%d%s",
- title,
- server.bindaddr_count ? server.bindaddr[0] : "*",
- server.port ? server.port : server.tls_port,
- server_mode);
+ if (!title) title = server.exec_argv[0];
+ sds proc_title = expandProcTitleTemplate(server.proc_title_template, title);
+ if (!proc_title) return C_ERR; /* Not likely, proc_title_template is validated */
+
+ setproctitle("%s", proc_title);
+ sdsfree(proc_title);
#else
UNUSED(title);
#endif
+
+ return C_OK;
}
void redisSetCpuAffinity(const char *cpulist) {
@@ -5751,6 +5899,12 @@ int main(int argc, char **argv) {
init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid());
crc64_init();
+ /* Store umask value. Because umask(2) only offers a set-and-get API we have
+ * to reset it and restore it back. We do this early to avoid a potential
+ * race condition with threads that could be creating files or directories.
+ */
+ umask(server.umask = umask(0777));
+
uint8_t hashseed[16];
getRandomBytes(hashseed,sizeof(hashseed));
dictSetHashFunctionSeed(hashseed);
@@ -5843,6 +5997,7 @@ int main(int argc, char **argv) {
exit(1);
}
loadServerConfig(server.configfile, config_from_stdin, options);
+ if (server.sentinel_mode) loadSentinelConfigFromQueue();
sdsfree(options);
}
@@ -5868,7 +6023,7 @@ int main(int argc, char **argv) {
readOOMScoreAdj();
initServer();
if (background || server.pidfile) createPidFile();
- redisSetProcTitle(argv[0]);
+ if (server.set_proc_title) redisSetProcTitle(NULL);
redisAsciiArt();
checkTcpBacklogSettings();
@@ -5878,10 +6033,17 @@ int main(int argc, char **argv) {
#ifdef __linux__
linuxMemoryWarnings();
#if defined (__arm64__)
- if (linuxMadvFreeForkBugCheck()) {
- serverLog(LL_WARNING,"WARNING Your kernel has a bug that could lead to data corruption during background save. Please upgrade to the latest stable kernel.");
+ int ret;
+ if ((ret = linuxMadvFreeForkBugCheck())) {
+ if (ret == 1)
+ serverLog(LL_WARNING,"WARNING Your kernel has a bug that could lead to data corruption during background save. "
+ "Please upgrade to the latest stable kernel.");
+ else
+ serverLog(LL_WARNING, "Failed to test the kernel for a bug that could lead to data corruption during background save. "
+ "Your system could be affected, please report this error.");
if (!checkIgnoreWarning("ARM64-COW-BUG")) {
- serverLog(LL_WARNING,"Redis will now exit to prevent data corruption. Note that it is possible to suppress this warning by setting the following config: ignore-warnings ARM64-COW-BUG");
+ serverLog(LL_WARNING,"Redis will now exit to prevent data corruption. "
+ "Note that it is possible to suppress this warning by setting the following config: ignore-warnings ARM64-COW-BUG");
exit(1);
}
}