diff options
Diffstat (limited to 'libntp/ntp_worker.c')
-rw-r--r-- | libntp/ntp_worker.c | 340 |
1 files changed, 340 insertions, 0 deletions
diff --git a/libntp/ntp_worker.c b/libntp/ntp_worker.c new file mode 100644 index 0000000..bb1cb87 --- /dev/null +++ b/libntp/ntp_worker.c @@ -0,0 +1,340 @@ +/* + * ntp_worker.c + */ +#include <config.h> +#include "ntp_workimpl.h" + +#ifdef WORKER + +#include <stdio.h> +#include <ctype.h> +#include <signal.h> + +#include "iosignal.h" +#include "ntp_stdlib.h" +#include "ntp_malloc.h" +#include "ntp_syslog.h" +#include "ntpd.h" +#include "ntp_io.h" +#include "ntp_assert.h" +#include "ntp_unixtime.h" +#include "intreswork.h" + + +#define CHILD_MAX_IDLE (3 * 60) /* seconds, idle worker limit */ + +blocking_child ** blocking_children; +size_t blocking_children_alloc; +int worker_per_query; /* boolean */ +int intres_req_pending; + + +#ifndef HAVE_IO_COMPLETION_PORT +/* + * pipe_socketpair() + * + * Provides an AF_UNIX socketpair on systems which have them, otherwise + * pair of unidirectional pipes. + */ +int +pipe_socketpair( + int caller_fds[2], + int * is_pipe + ) +{ + int rc; + int fds[2]; + int called_pipe; + +#ifdef HAVE_SOCKETPAIR + rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]); +#else + rc = -1; +#endif + + if (-1 == rc) { + rc = pipe(&fds[0]); + called_pipe = TRUE; + } else { + called_pipe = FALSE; + } + + if (-1 == rc) + return rc; + + caller_fds[0] = fds[0]; + caller_fds[1] = fds[1]; + if (is_pipe != NULL) + *is_pipe = called_pipe; + + return 0; +} + + +/* + * close_all_except() + * + * Close all file descriptors except the given keep_fd. + */ +void +close_all_except( + int keep_fd + ) +{ + int fd; + + for (fd = 0; fd < keep_fd; fd++) + close(fd); + + close_all_beyond(keep_fd); +} + + +/* + * close_all_beyond() + * + * Close all file descriptors after the given keep_fd, which is the + * highest fd to keep open. + */ +void +close_all_beyond( + int keep_fd + ) +{ +# ifdef HAVE_CLOSEFROM + closefrom(keep_fd + 1); +# elif defined(F_CLOSEM) + /* + * From 'Writing Reliable AIX Daemons,' SG24-4946-00, + * by Eric Agar (saves us from doing 32767 system + * calls) + */ + if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1) + msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1); +# else /* !HAVE_CLOSEFROM && !F_CLOSEM follows */ + int fd; + int max_fd; + + max_fd = GETDTABLESIZE(); + for (fd = keep_fd + 1; fd < max_fd; fd++) + close(fd); +# endif /* !HAVE_CLOSEFROM && !F_CLOSEM */ +} +#endif /* HAVE_IO_COMPLETION_PORT */ + + +u_int +available_blocking_child_slot(void) +{ + const size_t each = sizeof(blocking_children[0]); + u_int slot; + size_t prev_alloc; + size_t new_alloc; + size_t prev_octets; + size_t octets; + + for (slot = 0; slot < blocking_children_alloc; slot++) { + if (NULL == blocking_children[slot]) + return slot; + if (blocking_children[slot]->reusable) { + blocking_children[slot]->reusable = FALSE; + return slot; + } + } + + prev_alloc = blocking_children_alloc; + prev_octets = prev_alloc * each; + new_alloc = blocking_children_alloc + 4; + octets = new_alloc * each; + blocking_children = erealloc_zero(blocking_children, octets, + prev_octets); + blocking_children_alloc = new_alloc; + + return prev_alloc; +} + + +int +queue_blocking_request( + blocking_work_req rtype, + void * req, + size_t reqsize, + blocking_work_callback done_func, + void * context + ) +{ + static u_int intres_slot = UINT_MAX; + u_int child_slot; + blocking_child * c; + blocking_pipe_header req_hdr; + + req_hdr.octets = sizeof(req_hdr) + reqsize; + req_hdr.magic_sig = BLOCKING_REQ_MAGIC; + req_hdr.rtype = rtype; + req_hdr.done_func = done_func; + req_hdr.context = context; + + child_slot = UINT_MAX; + if (worker_per_query || UINT_MAX == intres_slot || + blocking_children[intres_slot]->reusable) + child_slot = available_blocking_child_slot(); + if (!worker_per_query) { + if (UINT_MAX == intres_slot) + intres_slot = child_slot; + else + child_slot = intres_slot; + if (0 == intres_req_pending) + intres_timeout_req(0); + } + intres_req_pending++; + INSIST(UINT_MAX != child_slot); + c = blocking_children[child_slot]; + if (NULL == c) { + c = emalloc_zero(sizeof(*c)); +#ifdef WORK_FORK + c->req_read_pipe = -1; + c->req_write_pipe = -1; +#endif +#ifdef WORK_PIPE + c->resp_read_pipe = -1; + c->resp_write_pipe = -1; +#endif + blocking_children[child_slot] = c; + } + req_hdr.child_idx = child_slot; + + return send_blocking_req_internal(c, &req_hdr, req); +} + + +int queue_blocking_response( + blocking_child * c, + blocking_pipe_header * resp, + size_t respsize, + const blocking_pipe_header * req + ) +{ + resp->octets = respsize; + resp->magic_sig = BLOCKING_RESP_MAGIC; + resp->rtype = req->rtype; + resp->context = req->context; + resp->done_func = req->done_func; + + return send_blocking_resp_internal(c, resp); +} + + +void +process_blocking_resp( + blocking_child * c + ) +{ + blocking_pipe_header * resp; + void * data; + + /* + * On Windows send_blocking_resp_internal() may signal the + * blocking_response_ready event multiple times while we're + * processing a response, so always consume all available + * responses before returning to test the event again. + */ +#ifdef WORK_THREAD + do { +#endif + resp = receive_blocking_resp_internal(c); + if (NULL != resp) { + DEBUG_REQUIRE(BLOCKING_RESP_MAGIC == + resp->magic_sig); + data = (char *)resp + sizeof(*resp); + intres_req_pending--; + (*resp->done_func)(resp->rtype, resp->context, + resp->octets - sizeof(*resp), + data); + free(resp); + } +#ifdef WORK_THREAD + } while (NULL != resp); +#endif + if (!worker_per_query && 0 == intres_req_pending) + intres_timeout_req(CHILD_MAX_IDLE); + else if (worker_per_query) + req_child_exit(c); +} + + +/* + * blocking_child_common runs as a forked child or a thread + */ +int +blocking_child_common( + blocking_child *c + ) +{ + int say_bye; + blocking_pipe_header *req; + + say_bye = FALSE; + while (!say_bye) { + req = receive_blocking_req_internal(c); + if (NULL == req) { + say_bye = TRUE; + break; + } + + DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig); + + switch (req->rtype) { + case BLOCKING_GETADDRINFO: + if (blocking_getaddrinfo(c, req)) + say_bye = TRUE; + break; + + case BLOCKING_GETNAMEINFO: + if (blocking_getnameinfo(c, req)) + say_bye = TRUE; + break; + + default: + msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype); + say_bye = TRUE; + } + + free(req); + } + + return 0; +} + + +/* + * worker_idle_timer_fired() + * + * The parent starts this timer when the last pending response has been + * received from the child, making it idle, and clears the timer when a + * request is dispatched to the child. Once the timer expires, the + * child is sent packing. + * + * This is called when worker_idle_timer is nonzero and less than or + * equal to current_time. + */ +void +worker_idle_timer_fired(void) +{ + u_int idx; + blocking_child * c; + + DEBUG_REQUIRE(0 == intres_req_pending); + + intres_timeout_req(0); + for (idx = 0; idx < blocking_children_alloc; idx++) { + c = blocking_children[idx]; + if (NULL == c) + continue; + req_child_exit(c); + } +} + + +#else /* !WORKER follows */ +int ntp_worker_nonempty_compilation_unit; +#endif |