summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/aof.c35
-rw-r--r--src/childinfo.c2
-rw-r--r--src/db.c5
-rw-r--r--src/defrag.c2
-rw-r--r--src/module.c104
-rw-r--r--src/rdb.c49
-rw-r--r--src/redismodule.h7
-rw-r--r--src/replication.c6
-rw-r--r--src/scripting.c3
-rw-r--r--src/server.c103
-rw-r--r--src/server.h10
11 files changed, 237 insertions, 89 deletions
diff --git a/src/aof.c b/src/aof.c
index 91d0d1a76..8bc6c543d 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -264,9 +264,9 @@ int startAppendOnly(void) {
strerror(errno));
return C_ERR;
}
- if (server.rdb_child_pid != -1) {
+ if (hasForkChild() && server.aof_child_pid == -1) {
server.aof_rewrite_scheduled = 1;
- serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
+ serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
} else {
/* If there is a pending AOF rewrite, we need to switch it off and
* start a new one: the old one cannot be reused because it is not
@@ -395,7 +395,7 @@ void flushAppendOnlyFile(int force) {
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
- } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
+ } else if (hasForkChild()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
@@ -491,9 +491,8 @@ void flushAppendOnlyFile(int force) {
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
- if (server.aof_no_fsync_on_rewrite &&
- (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
- return;
+ if (server.aof_no_fsync_on_rewrite && hasForkChild())
+ return;
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
@@ -1563,39 +1562,24 @@ void aofClosePipes(void) {
*/
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
- long long start;
- if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
+ if (hasForkChild()) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
- start = ustime();
- if ((childpid = fork()) == 0) {
+ if ((childpid = redisFork()) == 0) {
char tmpfile[256];
/* Child */
- closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
- size_t private_dirty = zmalloc_get_private_dirty(-1);
-
- if (private_dirty) {
- serverLog(LL_NOTICE,
- "AOF rewrite: %zu MB of memory used by copy-on-write",
- private_dirty/(1024*1024));
- }
-
- server.child_info_data.cow_size = private_dirty;
- sendChildInfo(CHILD_INFO_TYPE_AOF);
+ sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
- server.stat_fork_time = ustime()-start;
- server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
- latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
serverLog(LL_WARNING,
@@ -1609,7 +1593,6 @@ int rewriteAppendOnlyFileBackground(void) {
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
- updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
@@ -1624,7 +1607,7 @@ int rewriteAppendOnlyFileBackground(void) {
void bgrewriteaofCommand(client *c) {
if (server.aof_child_pid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
- } else if (server.rdb_child_pid != -1) {
+ } else if (hasForkChild()) {
server.aof_rewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
diff --git a/src/childinfo.c b/src/childinfo.c
index 719025e8c..fa0600552 100644
--- a/src/childinfo.c
+++ b/src/childinfo.c
@@ -80,6 +80,8 @@ void receiveChildInfo(void) {
server.stat_rdb_cow_bytes = server.child_info_data.cow_size;
} else if (server.child_info_data.process_type == CHILD_INFO_TYPE_AOF) {
server.stat_aof_cow_bytes = server.child_info_data.cow_size;
+ } else if (server.child_info_data.process_type == CHILD_INFO_TYPE_MODULE) {
+ server.stat_module_cow_bytes = server.child_info_data.cow_size;
}
}
}
diff --git a/src/db.c b/src/db.c
index 95eaf04e9..a46e0251f 100644
--- a/src/db.c
+++ b/src/db.c
@@ -60,10 +60,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
- if (server.rdb_child_pid == -1 &&
- server.aof_child_pid == -1 &&
- !(flags & LOOKUP_NOTOUCH))
- {
+ if (!hasForkChild() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
diff --git a/src/defrag.c b/src/defrag.c
index ecf0255dc..93c6a4619 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -1039,7 +1039,7 @@ void activeDefragCycle(void) {
mstime_t latency;
int quit = 0;
- if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1)
+ if (hasForkChild())
return; /* Defragging memory while there's a fork will just do damage. */
/* Once a second, check if we the fragmentation justfies starting a scan
diff --git a/src/module.c b/src/module.c
index b2bddd316..854989e73 100644
--- a/src/module.c
+++ b/src/module.c
@@ -31,6 +31,7 @@
#include "cluster.h"
#include "rdb.h"
#include <dlfcn.h>
+#include <wait.h>
#define REDISMODULE_CORE 1
#include "redismodule.h"
@@ -293,6 +294,14 @@ typedef struct RedisModuleCommandFilter {
/* Registered filters */
static list *moduleCommandFilters;
+typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
+
+static struct RedisModuleForkInfo {
+ RedisModuleForkDoneHandler done_handler;
+ void* done_handler_user_data;
+} moduleForkInfo = {0};
+
+
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@@ -5131,6 +5140,98 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
}
/* --------------------------------------------------------------------------
+ * Module fork API
+ * -------------------------------------------------------------------------- */
+
+/* Create a background child process with the current frozen snaphost of the
+ * main process where you can do some processing in the background without
+ * affecting / freezing the traffic and no need for threads and GIL locking.
+ * Note that Redis allows for only one concurrent fork.
+ * When the child wants to exit, it should call RedisModule_ExitFromChild.
+ * If the parent wants to kill the child it should call RedisModule_KillForkChild
+ * The done handler callback will be executed on the parent process when the
+ * child existed (but not when killed)
+ * Return: -1 on failure, on success the parent process will get a positive PID
+ * of the child, and the child process will get 0.
+ */
+int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data)
+{
+ pid_t childpid;
+ if (hasForkChild()) {
+ return -1;
+ }
+
+ openChildInfoPipe();
+ if ((childpid = redisFork()) == 0) {
+ /* Child */
+ redisSetProcTitle("redis-module-fork");
+ } else if (childpid == -1) {
+ closeChildInfoPipe();
+ serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno));
+ } else {
+ /* Parent */
+ server.module_child_pid = childpid;
+ moduleForkInfo.done_handler = cb;
+ moduleForkInfo.done_handler_user_data = user_data;
+ serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid);
+ }
+ return childpid;
+}
+
+/* Call from the child process when you want to terminate it.
+ * retcode will be provided to the done handler executed on the parent process.
+ */
+int RM_ExitFromChild(int retcode)
+{
+ sendChildCOWInfo(CHILD_INFO_TYPE_MODULE, "Module fork");
+ exitFromChild(retcode);
+ return REDISMODULE_OK;
+}
+
+void TerminateModuleForkChild(int wait) {
+ int statloc;
+ serverLog(LL_NOTICE,"Killing running module fork child: %ld",
+ (long) server.module_child_pid);
+ if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
+ while(wait3(&statloc,0,NULL) != server.module_child_pid);
+ }
+ /* Reset the buffer accumulating changes while the child saves. */
+ server.module_child_pid = -1;
+ moduleForkInfo.done_handler = NULL;
+ moduleForkInfo.done_handler_user_data = NULL;
+ closeChildInfoPipe();
+ updateDictResizePolicy();
+}
+
+/* Can be used to kill the forked child process from the parent process.
+ * child_pid whould be the return value of RedisModule_Fork. */
+int RM_KillForkChild(int child_pid)
+{
+ /* No module child? return. */
+ if (server.module_child_pid == -1) return REDISMODULE_ERR;
+ /* Make sure the module knows the pid it wants to kill (not trying to
+ * randomly kill other module's forks) */
+ if (server.module_child_pid != child_pid) return REDISMODULE_ERR;
+ /* Kill module child, wait for child exit. */
+ TerminateModuleForkChild(1);
+ return REDISMODULE_OK;
+}
+
+void ModuleForkDoneHandler(int exitcode, int bysignal)
+{
+ serverLog(LL_NOTICE,
+ "Module fork exited pid: %d, retcode: %d, bysignal: %d",
+ server.module_child_pid, exitcode, bysignal);
+ if (moduleForkInfo.done_handler) {
+ moduleForkInfo.done_handler(exitcode, bysignal,
+ moduleForkInfo.done_handler_user_data);
+ }
+ server.module_child_pid = -1;
+ moduleForkInfo.done_handler = NULL;
+ moduleForkInfo.done_handler_user_data = NULL;
+}
+
+/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@@ -5652,4 +5753,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CommandFilterArgInsert);
REGISTER_API(CommandFilterArgReplace);
REGISTER_API(CommandFilterArgDelete);
+ REGISTER_API(Fork);
+ REGISTER_API(ExitFromChild);
+ REGISTER_API(KillForkChild);
}
diff --git a/src/rdb.c b/src/rdb.c
index d11fb93f7..d9164b21c 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1335,40 +1335,25 @@ werr:
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
- long long start;
- if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
+ if (hasForkChild()) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();
- start = ustime();
- if ((childpid = fork()) == 0) {
+ if ((childpid = redisFork()) == 0) {
int retval;
/* Child */
- closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
- size_t private_dirty = zmalloc_get_private_dirty(-1);
-
- if (private_dirty) {
- serverLog(LL_NOTICE,
- "RDB: %zu MB of memory used by copy-on-write",
- private_dirty/(1024*1024));
- }
-
- server.child_info_data.cow_size = private_dirty;
- sendChildInfo(CHILD_INFO_TYPE_RDB);
+ sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
- server.stat_fork_time = ustime()-start;
- server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
- latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
@@ -1380,7 +1365,6 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
- updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
@@ -2431,10 +2415,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
- long long start;
int pipefds[2];
- if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
+ if (hasForkChild()) return C_ERR;
/* Before to fork, create a pipe that will be used in order to
* send back to the parent the IDs of the slaves that successfully
@@ -2470,8 +2453,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
/* Create the child process. */
openChildInfoPipe();
- start = ustime();
- if ((childpid = fork()) == 0) {
+ if ((childpid = redisFork()) == 0) {
/* Child */
int retval;
rio slave_sockets;
@@ -2479,7 +2461,6 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
rioInitWithFdset(&slave_sockets,fds,numfds);
zfree(fds);
- closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
@@ -2487,16 +2468,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
retval = C_ERR;
if (retval == C_OK) {
- size_t private_dirty = zmalloc_get_private_dirty(-1);
-
- if (private_dirty) {
- serverLog(LL_NOTICE,
- "RDB: %zu MB of memory used by copy-on-write",
- private_dirty/(1024*1024));
- }
-
- server.child_info_data.cow_size = private_dirty;
- sendChildInfo(CHILD_INFO_TYPE_RDB);
+ sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
/* If we are returning OK, at least one slave was served
* with the RDB file as expected, so we need to send a report
@@ -2565,16 +2537,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
close(pipefds[1]);
closeChildInfoPipe();
} else {
- server.stat_fork_time = ustime()-start;
- server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
- latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
-
serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
- updateDictResizePolicy();
}
zfree(clientids);
zfree(fds);
@@ -2617,13 +2584,13 @@ void bgsaveCommand(client *c) {
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
- } else if (server.aof_child_pid != -1) {
+ } else if (hasForkChild()) {
if (schedule) {
server.rdb_bgsave_scheduled = 1;
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
- "An AOF log rewriting in progress: can't BGSAVE right now. "
+ "Another BG operation is in progress: can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
diff --git a/src/redismodule.h b/src/redismodule.h
index ad1c6e652..6a3a164b5 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -182,6 +182,7 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
+typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods {
@@ -372,6 +373,9 @@ const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(R
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos);
+int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data);
+int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode);
+int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid);
#endif
/* This is included inline inside each Redis module. */
@@ -546,6 +550,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CommandFilterArgInsert);
REDISMODULE_GET_API(CommandFilterArgReplace);
REDISMODULE_GET_API(CommandFilterArgDelete);
+ REDISMODULE_GET_API(Fork);
+ REDISMODULE_GET_API(ExitFromChild);
+ REDISMODULE_GET_API(KillForkChild);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
diff --git a/src/replication.c b/src/replication.c
index bb4287b62..8039e06ae 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -751,11 +751,11 @@ void syncCommand(client *c) {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
- if (server.aof_child_pid == -1) {
+ if (!hasForkChild()) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
- "No BGSAVE in progress, but an AOF rewrite is active. "
+ "No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
@@ -2930,7 +2930,7 @@ void replicationCron(void) {
* In case of diskless replication, we make sure to wait the specified
* number of seconds (according to configuration) so that other slaves
* have the time to arrive before we start streaming. */
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
+ if (!hasForkChild()) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
diff --git a/src/scripting.c b/src/scripting.c
index 9ac8af2e7..3129e4f47 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -1827,7 +1827,7 @@ void ldbSendLogs(void) {
int ldbStartSession(client *c) {
ldb.forked = (c->flags & CLIENT_LUA_DEBUG_SYNC) == 0;
if (ldb.forked) {
- pid_t cp = fork();
+ pid_t cp = redisFork();
if (cp == -1) {
addReplyError(c,"Fork() failed: can't run EVAL in debugging mode.");
return 0;
@@ -1844,7 +1844,6 @@ int ldbStartSession(client *c) {
* socket to make sure if the parent crashes a reset is sent
* to the clients. */
serverLog(LL_WARNING,"Redis forked for debugging eval");
- closeListeningSockets(0);
} else {
/* Parent */
listAddNodeTail(ldb.children,(void*)(unsigned long)cp);
diff --git a/src/server.c b/src/server.c
index 9d70e5c1c..f38ed7897 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1449,12 +1449,18 @@ int incrementallyRehash(int dbid) {
* for dict.c to resize the hash tables accordingly to the fact we have o not
* running childs. */
void updateDictResizePolicy(void) {
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
+ if (!hasForkChild())
dictEnableResize();
else
dictDisableResize();
}
+int hasForkChild() {
+ return server.rdb_child_pid != -1 ||
+ server.aof_child_pid != -1 ||
+ server.module_child_pid != -1;
+}
+
/* ======================= Cron: called every 100 ms ======================== */
/* Add a sample to the operations per second array of samples. */
@@ -1691,7 +1697,7 @@ void databasesCron(void) {
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
+ if (!hasForkChild()) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
@@ -1888,15 +1894,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
+ if (!hasForkChild() &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
- if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
- ldbPendingChildren())
+ if (hasForkChild() || ldbPendingChildren())
{
int statloc;
pid_t pid;
@@ -1907,18 +1912,29 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
+ /* sigKillChildHandler catches the signal and calls exit(), but we
+ * must make sure not to flag lastbgsave_status, etc incorrectly. */
+ if (exitcode == SIGUSR1) {
+ bysignal = SIGUSR1;
+ exitcode = 1;
+ }
+
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
- "rdb_child_pid = %d, aof_child_pid = %d",
+ "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
- (int) server.aof_child_pid);
+ (int) server.aof_child_pid,
+ (int) server.module_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
+ } else if (pid == server.module_child_pid) {
+ ModuleForkDoneHandler(exitcode,bysignal);
+ if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
@@ -1956,8 +1972,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
- server.rdb_child_pid == -1 &&
- server.aof_child_pid == -1 &&
+ !hasForkChild() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
@@ -2015,7 +2030,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* Note: this code must be after the replicationCron() call above so
* make sure when refactoring this file to keep this order. This is useful
* because we want to give priority to RDB savings for replication. */
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
+ if (!hasForkChild() &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
@@ -2799,6 +2814,7 @@ void initServer(void) {
server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
+ server.module_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
@@ -2817,6 +2833,7 @@ void initServer(void) {
server.stat_peak_memory = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
+ server.stat_module_cow_bytes = 0;
server.cron_malloc_stats.zmalloc_used = 0;
server.cron_malloc_stats.process_rss = 0;
server.cron_malloc_stats.allocator_allocated = 0;
@@ -3566,6 +3583,12 @@ int prepareForShutdown(int flags) {
killRDBChild();
}
+ /* Kill module child if there is one. */
+ if (server.module_child_pid != -1) {
+ serverLog(LL_WARNING,"There is a module fork child. Killing it!");
+ TerminateModuleForkChild(0);
+ }
+
if (server.aof_state != AOF_OFF) {
/* Kill the AOF saving child as the AOF we already have may be longer
* but contains the full dataset anyway. */
@@ -4066,7 +4089,9 @@ sds genRedisInfoString(char *section) {
"aof_current_rewrite_time_sec:%jd\r\n"
"aof_last_bgrewrite_status:%s\r\n"
"aof_last_write_status:%s\r\n"
- "aof_last_cow_size:%zu\r\n",
+ "aof_last_cow_size:%zu\r\n"
+ "module_fork_in_progress:%d\r\n"
+ "module_fork_last_cow_size:%zu\r\n",
server.loading,
server.dirty,
server.rdb_child_pid != -1,
@@ -4084,7 +4109,9 @@ sds genRedisInfoString(char *section) {
-1 : time(NULL)-server.aof_rewrite_time_start),
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
(server.aof_last_write_status == C_OK) ? "ok" : "err",
- server.stat_aof_cow_bytes);
+ server.stat_aof_cow_bytes,
+ server.module_child_pid != -1,
+ server.stat_module_cow_bytes);
if (server.aof_enabled) {
info = sdscatprintf(info,
@@ -4591,6 +4618,58 @@ void setupSignalHandlers(void) {
return;
}
+static void sigKillChildHandler(int sig) {
+ UNUSED(sig);
+ /* this handler is needed to resolve a valgrind warning */
+ serverLogFromHandler(LL_WARNING, "Received SIGUSR1 in child, exiting now.");
+ exitFromChild(SIGUSR1);
+}
+
+void setupChildSignalHandlers(void) {
+ struct sigaction act;
+
+ /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
+ * Otherwise, sa_handler is used. */
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = 0;
+ act.sa_handler = sigKillChildHandler;
+ sigaction(SIGUSR1, &act, NULL);
+ return;
+}
+
+int redisFork() {
+ int childpid;
+ long long start = ustime();
+ if ((childpid = fork()) == 0) {
+ /* Child */
+ closeListeningSockets(0);
+ setupChildSignalHandlers();
+ } else {
+ /* Parent */
+ server.stat_fork_time = ustime()-start;
+ server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
+ latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
+ if (childpid == -1) {
+ return -1;
+ }
+ updateDictResizePolicy();
+ }
+ return childpid;
+}
+
+void sendChildCOWInfo(int ptype, char *pname) {
+ size_t private_dirty = zmalloc_get_private_dirty(-1);
+
+ if (private_dirty) {
+ serverLog(LL_NOTICE,
+ "%s: %zu MB of memory used by copy-on-write",
+ pname, private_dirty/(1024*1024));
+ }
+
+ server.child_info_data.cow_size = private_dirty;
+ sendChildInfo(ptype);
+}
+
void memtest(size_t megabytes, int passes);
/* Returns 1 if there is --sentinel among the arguments or if
diff --git a/src/server.h b/src/server.h
index 7759081b0..d132cf09c 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1041,6 +1041,7 @@ struct clusterState;
#define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL
#define CHILD_INFO_TYPE_RDB 0
#define CHILD_INFO_TYPE_AOF 1
+#define CHILD_INFO_TYPE_MODULE 3
struct redisServer {
/* General */
@@ -1076,6 +1077,7 @@ struct redisServer {
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
+ pid_t module_child_pid; /* PID of module child */
/* Networking */
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
@@ -1149,6 +1151,7 @@ struct redisServer {
_Atomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
+ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@@ -1540,6 +1543,8 @@ void moduleAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c);
+void ModuleForkDoneHandler(int exitcode, int bysignal);
+void TerminateModuleForkChild(int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
@@ -1803,6 +1808,11 @@ void closeChildInfoPipe(void);
void sendChildInfo(int process_type);
void receiveChildInfo(void);
+/* Fork helpers */
+int redisFork();
+int hasForkChild();
+void sendChildCOWInfo(int ptype, char *pname);
+
/* acl.c -- Authentication related prototypes. */
extern rax *Users;
extern user *DefaultUser;