From ea930a352ca748c0fa49a1b2ee894ea92bc83b0e Mon Sep 17 00:00:00 2001 From: YaacovHazan Date: Sun, 20 Dec 2020 20:23:20 +0200 Subject: Report child copy-on-write info continuously Add INFO field, rdb_active_cow_size, to report COW of a live fork child while it's active. - once in 1024 keys check the time, and if there's more than one second since the last report send a report to the parent via the pipe. - refactor the child_info_data struct, it's an implementation detail that shouldn't be in the server struct, and not used to communicate data between caller and callee - remove the magic value from that struct (not sure what it was good for), and instead add handling of short reads. - add another value to the structure, cow_type, to indicate if the report is for the new rdb_active_cow_size field, or it's the last report of a successful operation - add new Module API to report the active COW - add more asserts variants to test.tcl --- src/childinfo.c | 100 ++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 20 deletions(-) (limited to 'src/childinfo.c') diff --git a/src/childinfo.c b/src/childinfo.c index d11aa7bcf..85adfe2f6 100644 --- a/src/childinfo.c +++ b/src/childinfo.c @@ -30,6 +30,12 @@ #include "server.h" #include +typedef struct { + int process_type; /* AOF or RDB child? */ + int on_exit; /* COW size of active or exited child */ + size_t cow_size; /* Copy on write size. */ +} child_info_data; + /* Open a child-parent channel used in order to move information about the * RDB / AOF saving process from the child to the parent (for instance * the amount of copy on write memory used) */ @@ -41,7 +47,7 @@ void openChildInfoPipe(void) { } else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) { closeChildInfoPipe(); } else { - memset(&server.child_info_data,0,sizeof(server.child_info_data)); + server.child_info_nread = 0; } } @@ -54,34 +60,88 @@ void closeChildInfoPipe(void) { close(server.child_info_pipe[1]); server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; + server.child_info_nread = 0; } } -/* Send COW data to parent. The child should call this function after populating - * the corresponding fields it want to sent (according to the process type). */ -void sendChildInfo(int ptype) { +/* Send COW data to parent. */ +void sendChildInfo(int process_type, int on_exit, size_t cow_size) { if (server.child_info_pipe[1] == -1) return; - server.child_info_data.magic = CHILD_INFO_MAGIC; - server.child_info_data.process_type = ptype; - ssize_t wlen = sizeof(server.child_info_data); - if (write(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) { + + child_info_data buffer = {.process_type = process_type, .on_exit = on_exit, .cow_size = cow_size}; + ssize_t wlen = sizeof(buffer); + + if (write(server.child_info_pipe[1],&buffer,wlen) != wlen) { /* Nothing to do on error, this will be detected by the other side. */ } } -/* Receive COW data from parent. */ +/* Update COW data. */ +void updateChildInfo(int process_type, int on_exit, size_t cow_size) { + if (!on_exit) { + server.stat_current_cow_bytes = cow_size; + return; + } + + if (process_type == CHILD_TYPE_RDB) { + server.stat_rdb_cow_bytes = cow_size; + } else if (process_type == CHILD_TYPE_AOF) { + server.stat_aof_cow_bytes = cow_size; + } else if (process_type == CHILD_TYPE_MODULE) { + server.stat_module_cow_bytes = cow_size; + } +} + +/* Read COW info data from the pipe. + * if complete data read into the buffer, process type, copy-on-write type and copy-on-write size + * are stored into *process_type, *on_exit and *cow_size respectively and returns 1. + * otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */ +int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) { + /* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */ + static child_info_data buffer; + ssize_t wlen = sizeof(buffer); + + /* Do not overlap */ + if (server.child_info_nread == wlen) server.child_info_nread = 0; + + int nread = read(server.child_info_pipe[0], (char *)&buffer + server.child_info_nread, wlen - server.child_info_nread); + if (nread > 0) { + server.child_info_nread += nread; + } + + /* We have complete child info */ + if (server.child_info_nread == wlen) { + *process_type = buffer.process_type; + *on_exit = buffer.on_exit; + *cow_size = buffer.cow_size; + return 1; + } else { + return 0; + } +} + +/* Receive COW data from child. */ void receiveChildInfo(void) { if (server.child_info_pipe[0] == -1) return; - ssize_t wlen = sizeof(server.child_info_data); - if (read(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen && - server.child_info_data.magic == CHILD_INFO_MAGIC) - { - if (server.child_info_data.process_type == CHILD_TYPE_RDB) { - server.stat_rdb_cow_bytes = server.child_info_data.cow_size; - } else if (server.child_info_data.process_type == CHILD_TYPE_AOF) { - server.stat_aof_cow_bytes = server.child_info_data.cow_size; - } else if (server.child_info_data.process_type == CHILD_TYPE_MODULE) { - server.stat_module_cow_bytes = server.child_info_data.cow_size; - } + + int process_type; + int on_exit; + size_t cow_size; + if (readChildInfo(&process_type, &on_exit, &cow_size)) { + updateChildInfo(process_type, on_exit, cow_size); + } +} + +/* Receive last COW data from child. */ +void receiveLastChildInfo(void) { + if (server.child_info_pipe[0] == -1) return; + + /* Drain the pipe and update child info */ + int process_type; + int on_exit; + size_t cow_size; + + while (readChildInfo(&process_type, &on_exit, &cow_size) > 0) { + updateChildInfo(process_type, on_exit, cow_size); } } -- cgit v1.2.1