diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/aof.c | 35 | ||||
-rw-r--r-- | src/childinfo.c | 2 | ||||
-rw-r--r-- | src/db.c | 5 | ||||
-rw-r--r-- | src/defrag.c | 2 | ||||
-rw-r--r-- | src/module.c | 104 | ||||
-rw-r--r-- | src/rdb.c | 49 | ||||
-rw-r--r-- | src/redismodule.h | 7 | ||||
-rw-r--r-- | src/replication.c | 6 | ||||
-rw-r--r-- | src/scripting.c | 3 | ||||
-rw-r--r-- | src/server.c | 103 | ||||
-rw-r--r-- | src/server.h | 10 |
11 files changed, 237 insertions, 89 deletions
@@ -264,9 +264,9 @@ int startAppendOnly(void) { strerror(errno)); return C_ERR; } - if (server.rdb_child_pid != -1) { + if (hasForkChild() && server.aof_child_pid == -1) { server.aof_rewrite_scheduled = 1; - serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible."); + serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible."); } else { /* If there is a pending AOF rewrite, we need to switch it off and * start a new one: the old one cannot be reused because it is not @@ -395,7 +395,7 @@ void flushAppendOnlyFile(int force) { * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); - } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) { + } else if (hasForkChild()) { latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); @@ -491,9 +491,8 @@ void flushAppendOnlyFile(int force) { try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ - if (server.aof_no_fsync_on_rewrite && - (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) - return; + if (server.aof_no_fsync_on_rewrite && hasForkChild()) + return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { @@ -1563,39 +1562,24 @@ void aofClosePipes(void) { */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; - long long start; - if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; + if (hasForkChild()) return C_ERR; if (aofCreatePipes() != C_OK) return C_ERR; openChildInfoPipe(); - start = ustime(); - if ((childpid = fork()) == 0) { + if ((childpid = redisFork()) == 0) { char tmpfile[256]; /* Child */ - closeListeningSockets(0); redisSetProcTitle("redis-aof-rewrite"); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "AOF rewrite: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - server.child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_AOF); + sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite"); exitFromChild(0); } else { exitFromChild(1); } } else { /* Parent */ - server.stat_fork_time = ustime()-start; - server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); if (childpid == -1) { closeChildInfoPipe(); serverLog(LL_WARNING, @@ -1609,7 +1593,6 @@ int rewriteAppendOnlyFileBackground(void) { server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); server.aof_child_pid = childpid; - updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start @@ -1624,7 +1607,7 @@ int rewriteAppendOnlyFileBackground(void) { void bgrewriteaofCommand(client *c) { if (server.aof_child_pid != -1) { addReplyError(c,"Background append only file rewriting already in progress"); - } else if (server.rdb_child_pid != -1) { + } else if (hasForkChild()) { server.aof_rewrite_scheduled = 1; addReplyStatus(c,"Background append only file rewriting scheduled"); } else if (rewriteAppendOnlyFileBackground() == C_OK) { diff --git a/src/childinfo.c b/src/childinfo.c index 719025e8c..fa0600552 100644 --- a/src/childinfo.c +++ b/src/childinfo.c @@ -80,6 +80,8 @@ void receiveChildInfo(void) { server.stat_rdb_cow_bytes = server.child_info_data.cow_size; } else if (server.child_info_data.process_type == CHILD_INFO_TYPE_AOF) { server.stat_aof_cow_bytes = server.child_info_data.cow_size; + } else if (server.child_info_data.process_type == CHILD_INFO_TYPE_MODULE) { + server.stat_module_cow_bytes = server.child_info_data.cow_size; } } } @@ -60,10 +60,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ - if (server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && - !(flags & LOOKUP_NOTOUCH)) - { + if (!hasForkChild() && !(flags & LOOKUP_NOTOUCH)){ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { diff --git a/src/defrag.c b/src/defrag.c index ecf0255dc..93c6a4619 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -1039,7 +1039,7 @@ void activeDefragCycle(void) { mstime_t latency; int quit = 0; - if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1) + if (hasForkChild()) return; /* Defragging memory while there's a fork will just do damage. */ /* Once a second, check if we the fragmentation justfies starting a scan diff --git a/src/module.c b/src/module.c index b2bddd316..854989e73 100644 --- a/src/module.c +++ b/src/module.c @@ -31,6 +31,7 @@ #include "cluster.h" #include "rdb.h" #include <dlfcn.h> +#include <wait.h> #define REDISMODULE_CORE 1 #include "redismodule.h" @@ -293,6 +294,14 @@ typedef struct RedisModuleCommandFilter { /* Registered filters */ static list *moduleCommandFilters; +typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); + +static struct RedisModuleForkInfo { + RedisModuleForkDoneHandler done_handler; + void* done_handler_user_data; +} moduleForkInfo = {0}; + + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -5131,6 +5140,98 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) } /* -------------------------------------------------------------------------- + * Module fork API + * -------------------------------------------------------------------------- */ + +/* Create a background child process with the current frozen snaphost of the + * main process where you can do some processing in the background without + * affecting / freezing the traffic and no need for threads and GIL locking. + * Note that Redis allows for only one concurrent fork. + * When the child wants to exit, it should call RedisModule_ExitFromChild. + * If the parent wants to kill the child it should call RedisModule_KillForkChild + * The done handler callback will be executed on the parent process when the + * child existed (but not when killed) + * Return: -1 on failure, on success the parent process will get a positive PID + * of the child, and the child process will get 0. + */ +int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) +{ + pid_t childpid; + if (hasForkChild()) { + return -1; + } + + openChildInfoPipe(); + if ((childpid = redisFork()) == 0) { + /* Child */ + redisSetProcTitle("redis-module-fork"); + } else if (childpid == -1) { + closeChildInfoPipe(); + serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno)); + } else { + /* Parent */ + server.module_child_pid = childpid; + moduleForkInfo.done_handler = cb; + moduleForkInfo.done_handler_user_data = user_data; + serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid); + } + return childpid; +} + +/* Call from the child process when you want to terminate it. + * retcode will be provided to the done handler executed on the parent process. + */ +int RM_ExitFromChild(int retcode) +{ + sendChildCOWInfo(CHILD_INFO_TYPE_MODULE, "Module fork"); + exitFromChild(retcode); + return REDISMODULE_OK; +} + +void TerminateModuleForkChild(int wait) { + int statloc; + serverLog(LL_NOTICE,"Killing running module fork child: %ld", + (long) server.module_child_pid); + if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) { + while(wait3(&statloc,0,NULL) != server.module_child_pid); + } + /* Reset the buffer accumulating changes while the child saves. */ + server.module_child_pid = -1; + moduleForkInfo.done_handler = NULL; + moduleForkInfo.done_handler_user_data = NULL; + closeChildInfoPipe(); + updateDictResizePolicy(); +} + +/* Can be used to kill the forked child process from the parent process. + * child_pid whould be the return value of RedisModule_Fork. */ +int RM_KillForkChild(int child_pid) +{ + /* No module child? return. */ + if (server.module_child_pid == -1) return REDISMODULE_ERR; + /* Make sure the module knows the pid it wants to kill (not trying to + * randomly kill other module's forks) */ + if (server.module_child_pid != child_pid) return REDISMODULE_ERR; + /* Kill module child, wait for child exit. */ + TerminateModuleForkChild(1); + return REDISMODULE_OK; +} + +void ModuleForkDoneHandler(int exitcode, int bysignal) +{ + serverLog(LL_NOTICE, + "Module fork exited pid: %d, retcode: %d, bysignal: %d", + server.module_child_pid, exitcode, bysignal); + if (moduleForkInfo.done_handler) { + moduleForkInfo.done_handler(exitcode, bysignal, + moduleForkInfo.done_handler_user_data); + } + server.module_child_pid = -1; + moduleForkInfo.done_handler = NULL; + moduleForkInfo.done_handler_user_data = NULL; +} + +/* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -5652,4 +5753,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CommandFilterArgInsert); REGISTER_API(CommandFilterArgReplace); REGISTER_API(CommandFilterArgDelete); + REGISTER_API(Fork); + REGISTER_API(ExitFromChild); + REGISTER_API(KillForkChild); } @@ -1335,40 +1335,25 @@ werr: int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; - long long start; - if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; + if (hasForkChild()) return C_ERR; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); openChildInfoPipe(); - start = ustime(); - if ((childpid = fork()) == 0) { + if ((childpid = redisFork()) == 0) { int retval; /* Child */ - closeListeningSockets(0); redisSetProcTitle("redis-rdb-bgsave"); retval = rdbSave(filename,rsi); if (retval == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "RDB: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - server.child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_RDB); + sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); } exitFromChild((retval == C_OK) ? 0 : 1); } else { /* Parent */ - server.stat_fork_time = ustime()-start; - server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); if (childpid == -1) { closeChildInfoPipe(); server.lastbgsave_status = C_ERR; @@ -1380,7 +1365,6 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; - updateDictResizePolicy(); return C_OK; } return C_OK; /* unreached */ @@ -2431,10 +2415,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { listNode *ln; listIter li; pid_t childpid; - long long start; int pipefds[2]; - if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; + if (hasForkChild()) return C_ERR; /* Before to fork, create a pipe that will be used in order to * send back to the parent the IDs of the slaves that successfully @@ -2470,8 +2453,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { /* Create the child process. */ openChildInfoPipe(); - start = ustime(); - if ((childpid = fork()) == 0) { + if ((childpid = redisFork()) == 0) { /* Child */ int retval; rio slave_sockets; @@ -2479,7 +2461,6 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { rioInitWithFdset(&slave_sockets,fds,numfds); zfree(fds); - closeListeningSockets(0); redisSetProcTitle("redis-rdb-to-slaves"); retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); @@ -2487,16 +2468,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { retval = C_ERR; if (retval == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "RDB: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - server.child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_RDB); + sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); /* If we are returning OK, at least one slave was served * with the RDB file as expected, so we need to send a report @@ -2565,16 +2537,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { close(pipefds[1]); closeChildInfoPipe(); } else { - server.stat_fork_time = ustime()-start; - server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); - serverLog(LL_NOTICE,"Background RDB transfer started by pid %d", childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; - updateDictResizePolicy(); } zfree(clientids); zfree(fds); @@ -2617,13 +2584,13 @@ void bgsaveCommand(client *c) { if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); - } else if (server.aof_child_pid != -1) { + } else if (hasForkChild()) { if (schedule) { server.rdb_bgsave_scheduled = 1; addReplyStatus(c,"Background saving scheduled"); } else { addReplyError(c, - "An AOF log rewriting in progress: can't BGSAVE right now. " + "Another BG operation is in progress: can't BGSAVE right now. " "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " "possible."); } diff --git a/src/redismodule.h b/src/redismodule.h index ad1c6e652..6a3a164b5 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -182,6 +182,7 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); +typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -372,6 +373,9 @@ const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(R int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos); +int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); +int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); +int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); #endif /* This is included inline inside each Redis module. */ @@ -546,6 +550,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CommandFilterArgInsert); REDISMODULE_GET_API(CommandFilterArgReplace); REDISMODULE_GET_API(CommandFilterArgDelete); + REDISMODULE_GET_API(Fork); + REDISMODULE_GET_API(ExitFromChild); + REDISMODULE_GET_API(KillForkChild); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/replication.c b/src/replication.c index bb4287b62..8039e06ae 100644 --- a/src/replication.c +++ b/src/replication.c @@ -751,11 +751,11 @@ void syncCommand(client *c) { /* Target is disk (or the slave is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, * let's start one. */ - if (server.aof_child_pid == -1) { + if (!hasForkChild()) { startBgsaveForReplication(c->slave_capa); } else { serverLog(LL_NOTICE, - "No BGSAVE in progress, but an AOF rewrite is active. " + "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); } } @@ -2930,7 +2930,7 @@ void replicationCron(void) { * In case of diskless replication, we make sure to wait the specified * number of seconds (according to configuration) so that other slaves * have the time to arrive before we start streaming. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + if (!hasForkChild()) { time_t idle, max_idle = 0; int slaves_waiting = 0; int mincapa = -1; diff --git a/src/scripting.c b/src/scripting.c index 9ac8af2e7..3129e4f47 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1827,7 +1827,7 @@ void ldbSendLogs(void) { int ldbStartSession(client *c) { ldb.forked = (c->flags & CLIENT_LUA_DEBUG_SYNC) == 0; if (ldb.forked) { - pid_t cp = fork(); + pid_t cp = redisFork(); if (cp == -1) { addReplyError(c,"Fork() failed: can't run EVAL in debugging mode."); return 0; @@ -1844,7 +1844,6 @@ int ldbStartSession(client *c) { * socket to make sure if the parent crashes a reset is sent * to the clients. */ serverLog(LL_WARNING,"Redis forked for debugging eval"); - closeListeningSockets(0); } else { /* Parent */ listAddNodeTail(ldb.children,(void*)(unsigned long)cp); diff --git a/src/server.c b/src/server.c index 9d70e5c1c..f38ed7897 100644 --- a/src/server.c +++ b/src/server.c @@ -1449,12 +1449,18 @@ int incrementallyRehash(int dbid) { * for dict.c to resize the hash tables accordingly to the fact we have o not * running childs. */ void updateDictResizePolicy(void) { - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) + if (!hasForkChild()) dictEnableResize(); else dictDisableResize(); } +int hasForkChild() { + return server.rdb_child_pid != -1 || + server.aof_child_pid != -1 || + server.module_child_pid != -1; +} + /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -1691,7 +1697,7 @@ void databasesCron(void) { /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + if (!hasForkChild()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ @@ -1888,15 +1894,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && + if (!hasForkChild() && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ - if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || - ldbPendingChildren()) + if (hasForkChild() || ldbPendingChildren()) { int statloc; pid_t pid; @@ -1907,18 +1912,29 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); + /* sigKillChildHandler catches the signal and calls exit(), but we + * must make sure not to flag lastbgsave_status, etc incorrectly. */ + if (exitcode == SIGUSR1) { + bysignal = SIGUSR1; + exitcode = 1; + } + if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " - "rdb_child_pid = %d, aof_child_pid = %d", + "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, - (int) server.aof_child_pid); + (int) server.aof_child_pid, + (int) server.module_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); + } else if (pid == server.module_child_pid) { + ModuleForkDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, @@ -1956,8 +1972,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && - server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && + !hasForkChild() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { @@ -2015,7 +2030,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * Note: this code must be after the replicationCron() call above so * make sure when refactoring this file to keep this order. This is useful * because we want to give priority to RDB savings for replication. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && + if (!hasForkChild() && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) @@ -2799,6 +2814,7 @@ void initServer(void) { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; + server.module_child_pid = -1; server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; @@ -2817,6 +2833,7 @@ void initServer(void) { server.stat_peak_memory = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; + server.stat_module_cow_bytes = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; @@ -3566,6 +3583,12 @@ int prepareForShutdown(int flags) { killRDBChild(); } + /* Kill module child if there is one. */ + if (server.module_child_pid != -1) { + serverLog(LL_WARNING,"There is a module fork child. Killing it!"); + TerminateModuleForkChild(0); + } + if (server.aof_state != AOF_OFF) { /* Kill the AOF saving child as the AOF we already have may be longer * but contains the full dataset anyway. */ @@ -4066,7 +4089,9 @@ sds genRedisInfoString(char *section) { "aof_current_rewrite_time_sec:%jd\r\n" "aof_last_bgrewrite_status:%s\r\n" "aof_last_write_status:%s\r\n" - "aof_last_cow_size:%zu\r\n", + "aof_last_cow_size:%zu\r\n" + "module_fork_in_progress:%d\r\n" + "module_fork_last_cow_size:%zu\r\n", server.loading, server.dirty, server.rdb_child_pid != -1, @@ -4084,7 +4109,9 @@ sds genRedisInfoString(char *section) { -1 : time(NULL)-server.aof_rewrite_time_start), (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", (server.aof_last_write_status == C_OK) ? "ok" : "err", - server.stat_aof_cow_bytes); + server.stat_aof_cow_bytes, + server.module_child_pid != -1, + server.stat_module_cow_bytes); if (server.aof_enabled) { info = sdscatprintf(info, @@ -4591,6 +4618,58 @@ void setupSignalHandlers(void) { return; } +static void sigKillChildHandler(int sig) { + UNUSED(sig); + /* this handler is needed to resolve a valgrind warning */ + serverLogFromHandler(LL_WARNING, "Received SIGUSR1 in child, exiting now."); + exitFromChild(SIGUSR1); +} + +void setupChildSignalHandlers(void) { + struct sigaction act; + + /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. + * Otherwise, sa_handler is used. */ + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + act.sa_handler = sigKillChildHandler; + sigaction(SIGUSR1, &act, NULL); + return; +} + +int redisFork() { + int childpid; + long long start = ustime(); + if ((childpid = fork()) == 0) { + /* Child */ + closeListeningSockets(0); + setupChildSignalHandlers(); + } else { + /* Parent */ + server.stat_fork_time = ustime()-start; + server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); + if (childpid == -1) { + return -1; + } + updateDictResizePolicy(); + } + return childpid; +} + +void sendChildCOWInfo(int ptype, char *pname) { + size_t private_dirty = zmalloc_get_private_dirty(-1); + + if (private_dirty) { + serverLog(LL_NOTICE, + "%s: %zu MB of memory used by copy-on-write", + pname, private_dirty/(1024*1024)); + } + + server.child_info_data.cow_size = private_dirty; + sendChildInfo(ptype); +} + void memtest(size_t megabytes, int passes); /* Returns 1 if there is --sentinel among the arguments or if diff --git a/src/server.h b/src/server.h index 7759081b0..d132cf09c 100644 --- a/src/server.h +++ b/src/server.h @@ -1041,6 +1041,7 @@ struct clusterState; #define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL #define CHILD_INFO_TYPE_RDB 0 #define CHILD_INFO_TYPE_AOF 1 +#define CHILD_INFO_TYPE_MODULE 3 struct redisServer { /* General */ @@ -1076,6 +1077,7 @@ struct redisServer { int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a client blocked on a module command needs to be processed. */ + pid_t module_child_pid; /* PID of module child */ /* Networking */ int port; /* TCP listening port */ int tcp_backlog; /* TCP listen() backlog */ @@ -1149,6 +1151,7 @@ struct redisServer { _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ + size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -1540,6 +1543,8 @@ void moduleAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +void ModuleForkDoneHandler(int exitcode, int bysignal); +void TerminateModuleForkChild(int wait); ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); @@ -1803,6 +1808,11 @@ void closeChildInfoPipe(void); void sendChildInfo(int process_type); void receiveChildInfo(void); +/* Fork helpers */ +int redisFork(); +int hasForkChild(); +void sendChildCOWInfo(int ptype, char *pname); + /* acl.c -- Authentication related prototypes. */ extern rax *Users; extern user *DefaultUser; |