diff options
Diffstat (limited to 'libntp/work_thread.c')
-rw-r--r-- | libntp/work_thread.c | 726 |
1 files changed, 726 insertions, 0 deletions
diff --git a/libntp/work_thread.c b/libntp/work_thread.c new file mode 100644 index 0000000..38d8747 --- /dev/null +++ b/libntp/work_thread.c @@ -0,0 +1,726 @@ +/* + * work_thread.c - threads implementation for blocking worker child. + */ +#include <config.h> +#include "ntp_workimpl.h" + +#ifdef WORK_THREAD + +#include <stdio.h> +#include <ctype.h> +#include <signal.h> +#ifndef SYS_WINNT +#include <pthread.h> +#endif + +#include "ntp_stdlib.h" +#include "ntp_malloc.h" +#include "ntp_syslog.h" +#include "ntpd.h" +#include "ntp_io.h" +#include "ntp_assert.h" +#include "ntp_unixtime.h" +#include "timespecops.h" +#include "ntp_worker.h" + +#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) +#define CHILD_GONE_RESP CHILD_EXIT_REQ +#define WORKITEMS_ALLOC_INC 16 +#define RESPONSES_ALLOC_INC 4 + +#ifndef THREAD_MINSTACKSIZE +#define THREAD_MINSTACKSIZE (64U * 1024) +#endif + +#ifndef DEVOLATILE +#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var)) +#endif + +#ifdef SYS_WINNT +# define thread_exit(c) _endthreadex(c) +# define tickle_sem SetEvent +#else +# define thread_exit(c) pthread_exit((void*)(size_t)(c)) +# define tickle_sem sem_post +#endif + +#ifdef WORK_PIPE +addremove_io_fd_func addremove_io_fd; +#else +addremove_io_semaphore_func addremove_io_semaphore; +#endif + +static void start_blocking_thread(blocking_child *); +static void start_blocking_thread_internal(blocking_child *); +static void prepare_child_sems(blocking_child *); +static int wait_for_sem(sem_ref, struct timespec *); +static void ensure_workitems_empty_slot(blocking_child *); +static void ensure_workresp_empty_slot(blocking_child *); +static int queue_req_pointer(blocking_child *, blocking_pipe_header *); +static void cleanup_after_child(blocking_child *); +#ifdef SYS_WINNT +u_int WINAPI blocking_thread(void *); +#else +void * blocking_thread(void *); +#endif +#ifndef SYS_WINNT +static void block_thread_signals(sigset_t *); +#endif + + +void +exit_worker( + int exitcode + ) +{ + thread_exit(exitcode); /* see #define thread_exit */ +} + + +int +worker_sleep( + blocking_child * c, + time_t seconds + ) +{ + struct timespec until; + int rc; + +# ifdef HAVE_CLOCK_GETTIME + if (0 != clock_gettime(CLOCK_REALTIME, &until)) { + msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); + return -1; + } +# else + if (0 != getclock(TIMEOFDAY, &until)) { + msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); + return -1; + } +# endif + until.tv_sec += seconds; + do { + rc = wait_for_sem(c->wake_scheduled_sleep, &until); + } while (-1 == rc && EINTR == errno); + if (0 == rc) + return -1; + if (-1 == rc && ETIMEDOUT == errno) + return 0; + msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); + return -1; +} + + +void +interrupt_worker_sleep(void) +{ + u_int idx; + blocking_child * c; + + for (idx = 0; idx < blocking_children_alloc; idx++) { + c = blocking_children[idx]; + if (NULL == c || NULL == c->wake_scheduled_sleep) + continue; + tickle_sem(c->wake_scheduled_sleep); + } +} + + +static void +ensure_workitems_empty_slot( + blocking_child *c + ) +{ + const size_t each = sizeof(blocking_children[0]->workitems[0]); + size_t new_alloc; + size_t old_octets; + size_t new_octets; + void * nonvol_workitems; + + + if (c->workitems != NULL && + NULL == c->workitems[c->next_workitem]) + return; + + new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; + old_octets = c->workitems_alloc * each; + new_octets = new_alloc * each; + nonvol_workitems = DEVOLATILE(void *, c->workitems); + c->workitems = erealloc_zero(nonvol_workitems, new_octets, + old_octets); + if (0 == c->next_workitem) + c->next_workitem = c->workitems_alloc; + c->workitems_alloc = new_alloc; +} + + +static void +ensure_workresp_empty_slot( + blocking_child *c + ) +{ + const size_t each = sizeof(blocking_children[0]->responses[0]); + size_t new_alloc; + size_t old_octets; + size_t new_octets; + void * nonvol_responses; + + if (c->responses != NULL && + NULL == c->responses[c->next_response]) + return; + + new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; + old_octets = c->responses_alloc * each; + new_octets = new_alloc * each; + nonvol_responses = DEVOLATILE(void *, c->responses); + c->responses = erealloc_zero(nonvol_responses, new_octets, + old_octets); + if (0 == c->next_response) + c->next_response = c->responses_alloc; + c->responses_alloc = new_alloc; +} + + +/* + * queue_req_pointer() - append a work item or idle exit request to + * blocking_workitems[]. + */ +static int +queue_req_pointer( + blocking_child * c, + blocking_pipe_header * hdr + ) +{ + c->workitems[c->next_workitem] = hdr; + c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc; + + /* + * We only want to signal the wakeup event if the child is + * blocking on it, which is indicated by setting the blocking + * event. Wait with zero timeout to test. + */ + /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */ + tickle_sem(c->blocking_req_ready); + + return 0; +} + + +int +send_blocking_req_internal( + blocking_child * c, + blocking_pipe_header * hdr, + void * data + ) +{ + blocking_pipe_header * threadcopy; + size_t payload_octets; + + REQUIRE(hdr != NULL); + REQUIRE(data != NULL); + DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); + + if (hdr->octets <= sizeof(*hdr)) + return 1; /* failure */ + payload_octets = hdr->octets - sizeof(*hdr); + + ensure_workitems_empty_slot(c); + if (NULL == c->thread_ref) { + ensure_workresp_empty_slot(c); + start_blocking_thread(c); + } + + threadcopy = emalloc(hdr->octets); + memcpy(threadcopy, hdr, sizeof(*hdr)); + memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); + + return queue_req_pointer(c, threadcopy); +} + + +blocking_pipe_header * +receive_blocking_req_internal( + blocking_child * c + ) +{ + blocking_pipe_header * req; + int rc; + + /* + * Child blocks here when idle. SysV semaphores maintain a + * count and release from sem_wait() only when it reaches 0. + * Windows auto-reset events are simpler, and multiple SetEvent + * calls before any thread waits result in a single wakeup. + * On Windows, the child drains all workitems each wakeup, while + * with SysV semaphores wait_sem() is used before each item. + */ +#ifdef SYS_WINNT + while (NULL == c->workitems[c->next_workeritem]) { + /* !!!! SetEvent(c->child_is_blocking); */ + rc = wait_for_sem(c->blocking_req_ready, NULL); + INSIST(0 == rc); + /* !!!! ResetEvent(c->child_is_blocking); */ + } +#else + do { + rc = wait_for_sem(c->blocking_req_ready, NULL); + } while (-1 == rc && EINTR == errno); + INSIST(0 == rc); +#endif + + req = c->workitems[c->next_workeritem]; + INSIST(NULL != req); + c->workitems[c->next_workeritem] = NULL; + c->next_workeritem = (1 + c->next_workeritem) % + c->workitems_alloc; + + if (CHILD_EXIT_REQ == req) { /* idled out */ + send_blocking_resp_internal(c, CHILD_GONE_RESP); + req = NULL; + } + + return req; +} + + +int +send_blocking_resp_internal( + blocking_child * c, + blocking_pipe_header * resp + ) +{ + ensure_workresp_empty_slot(c); + + c->responses[c->next_response] = resp; + c->next_response = (1 + c->next_response) % c->responses_alloc; + +#ifdef WORK_PIPE + write(c->resp_write_pipe, "", 1); +#else + tickle_sem(c->blocking_response_ready); +#endif + + return 0; +} + + +#ifndef WORK_PIPE +void +handle_blocking_resp_sem( + void * context + ) +{ + HANDLE ready; + blocking_child * c; + u_int idx; + + ready = (HANDLE)context; + c = NULL; + for (idx = 0; idx < blocking_children_alloc; idx++) { + c = blocking_children[idx]; + if (c != NULL && c->thread_ref != NULL && + ready == c->blocking_response_ready) + break; + } + if (idx < blocking_children_alloc) + process_blocking_resp(c); +} +#endif /* !WORK_PIPE */ + + +blocking_pipe_header * +receive_blocking_resp_internal( + blocking_child * c + ) +{ + blocking_pipe_header * removed; +#ifdef WORK_PIPE + int rc; + char scratch[32]; + + do { + rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); + } while (-1 == rc && EINTR == errno); +#endif + removed = c->responses[c->next_workresp]; + if (NULL != removed) { + c->responses[c->next_workresp] = NULL; + c->next_workresp = (1 + c->next_workresp) % + c->responses_alloc; + DEBUG_ENSURE(CHILD_GONE_RESP == removed || + BLOCKING_RESP_MAGIC == removed->magic_sig); + } + if (CHILD_GONE_RESP == removed) { + cleanup_after_child(c); + removed = NULL; + } + + return removed; +} + + +static void +start_blocking_thread( + blocking_child * c + ) +{ + + DEBUG_INSIST(!c->reusable); + + prepare_child_sems(c); + start_blocking_thread_internal(c); +} + + +static void +start_blocking_thread_internal( + blocking_child * c + ) +#ifdef SYS_WINNT +{ + thr_ref blocking_child_thread; + u_int blocking_thread_id; + BOOL resumed; + + (*addremove_io_semaphore)(c->blocking_response_ready, FALSE); + blocking_child_thread = + (HANDLE)_beginthreadex( + NULL, + 0, + &blocking_thread, + c, + CREATE_SUSPENDED, + &blocking_thread_id); + + if (NULL == blocking_child_thread) { + msyslog(LOG_ERR, "start blocking thread failed: %m"); + exit(-1); + } + c->thread_id = blocking_thread_id; + c->thread_ref = blocking_child_thread; + /* remember the thread priority is only within the process class */ + if (!SetThreadPriority(blocking_child_thread, + THREAD_PRIORITY_BELOW_NORMAL)) + msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); + + resumed = ResumeThread(blocking_child_thread); + DEBUG_INSIST(resumed); +} +#else /* pthreads start_blocking_thread_internal() follows */ +{ +# ifdef NEED_PTHREAD_INIT + static int pthread_init_called; +# endif + pthread_attr_t thr_attr; + int rc; + int saved_errno; + int pipe_ends[2]; /* read then write */ + int is_pipe; + int flags; + size_t stacksize; + sigset_t saved_sig_mask; + +# ifdef NEED_PTHREAD_INIT + /* + * from lib/isc/unix/app.c: + * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. + */ + if (!pthread_init_called) { + pthread_init(); + pthread_init_called = TRUE; + } +# endif + + rc = pipe_socketpair(&pipe_ends[0], &is_pipe); + if (0 != rc) { + msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); + exit(1); + } + c->resp_read_pipe = move_fd(pipe_ends[0]); + c->resp_write_pipe = move_fd(pipe_ends[1]); + c->ispipe = is_pipe; + flags = fcntl(c->resp_read_pipe, F_GETFL, 0); + if (-1 == flags) { + msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); + exit(1); + } + rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); + if (-1 == rc) { + msyslog(LOG_ERR, + "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); + exit(1); + } + (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); + pthread_attr_init(&thr_attr); + pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); +#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ + defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) + rc = pthread_attr_getstacksize(&thr_attr, &stacksize); + if (-1 == rc) { + msyslog(LOG_ERR, + "start_blocking_thread: pthread_attr_getstacksize %m"); + } else if (stacksize < THREAD_MINSTACKSIZE) { + rc = pthread_attr_setstacksize(&thr_attr, + THREAD_MINSTACKSIZE); + if (-1 == rc) + msyslog(LOG_ERR, + "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m", + (u_long)stacksize, + (u_long)THREAD_MINSTACKSIZE); + } +#else + UNUSED_ARG(stacksize); +#endif +#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) + pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); +#endif + c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); + block_thread_signals(&saved_sig_mask); + rc = pthread_create(c->thread_ref, &thr_attr, + &blocking_thread, c); + saved_errno = errno; + pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); + pthread_attr_destroy(&thr_attr); + if (0 != rc) { + errno = saved_errno; + msyslog(LOG_ERR, "pthread_create() blocking child: %m"); + exit(1); + } +} +#endif + + +/* + * block_thread_signals() + * + * Temporarily block signals used by ntpd main thread, so that signal + * mask inherited by child threads leaves them blocked. Returns prior + * active signal mask via pmask, to be restored by the main thread + * after pthread_create(). + */ +#ifndef SYS_WINNT +void +block_thread_signals( + sigset_t * pmask + ) +{ + sigset_t block; + + sigemptyset(&block); +# ifdef HAVE_SIGNALED_IO +# ifdef SIGIO + sigaddset(&block, SIGIO); +# endif +# ifdef SIGPOLL + sigaddset(&block, SIGPOLL); +# endif +# endif /* HAVE_SIGNALED_IO */ + sigaddset(&block, SIGALRM); + sigaddset(&block, MOREDEBUGSIG); + sigaddset(&block, LESSDEBUGSIG); +# ifdef SIGDIE1 + sigaddset(&block, SIGDIE1); +# endif +# ifdef SIGDIE2 + sigaddset(&block, SIGDIE2); +# endif +# ifdef SIGDIE3 + sigaddset(&block, SIGDIE3); +# endif +# ifdef SIGDIE4 + sigaddset(&block, SIGDIE4); +# endif +# ifdef SIGBUS + sigaddset(&block, SIGBUS); +# endif + sigemptyset(pmask); + pthread_sigmask(SIG_BLOCK, &block, pmask); +} +#endif /* !SYS_WINNT */ + + +/* + * prepare_child_sems() + * + * create sync events (semaphores) + * child_is_blocking initially unset + * blocking_req_ready initially unset + * + * Child waits for blocking_req_ready to be set after + * setting child_is_blocking. blocking_req_ready and + * blocking_response_ready are auto-reset, so wake one + * waiter and become unset (unsignalled) in one operation. + */ +static void +prepare_child_sems( + blocking_child *c + ) +#ifdef SYS_WINNT +{ + if (NULL == c->blocking_req_ready) { + /* manual reset using ResetEvent() */ + /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */ + /* auto reset - one thread released from wait each set */ + c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL); + c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL); + c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL); + } else { + /* !!!! ResetEvent(c->child_is_blocking); */ + /* ResetEvent(c->blocking_req_ready); */ + /* ResetEvent(c->blocking_response_ready); */ + /* ResetEvent(c->wake_scheduled_sleep); */ + } +} +#else /* pthreads prepare_child_sems() follows */ +{ + size_t octets; + + if (NULL == c->blocking_req_ready) { + octets = sizeof(*c->blocking_req_ready); + octets += sizeof(*c->wake_scheduled_sleep); + /* !!!! octets += sizeof(*c->child_is_blocking); */ + c->blocking_req_ready = emalloc_zero(octets);; + c->wake_scheduled_sleep = 1 + c->blocking_req_ready; + /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */ + } else { + sem_destroy(c->blocking_req_ready); + sem_destroy(c->wake_scheduled_sleep); + /* !!!! sem_destroy(c->child_is_blocking); */ + } + sem_init(c->blocking_req_ready, FALSE, 0); + sem_init(c->wake_scheduled_sleep, FALSE, 0); + /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */ +} +#endif + + +static int +wait_for_sem( + sem_ref sem, + struct timespec * timeout /* wall-clock */ + ) +#ifdef SYS_WINNT +{ + struct timespec now; + struct timespec delta; + DWORD msec; + DWORD rc; + + if (NULL == timeout) { + msec = INFINITE; + } else { + getclock(TIMEOFDAY, &now); + delta = sub_tspec(*timeout, now); + if (delta.tv_sec < 0) { + msec = 0; + } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { + msec = INFINITE; + } else { + msec = 1000 * (DWORD)delta.tv_sec; + msec += delta.tv_nsec / (1000 * 1000); + } + } + rc = WaitForSingleObject(sem, msec); + if (WAIT_OBJECT_0 == rc) + return 0; + if (WAIT_TIMEOUT == rc) { + errno = ETIMEDOUT; + return -1; + } + msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); + errno = EFAULT; + return -1; +} +#else /* pthreads wait_for_sem() follows */ +{ + int rc; + + if (NULL == timeout) + rc = sem_wait(sem); + else + rc = sem_timedwait(sem, timeout); + + return rc; +} +#endif + + +/* + * blocking_thread - thread functions have WINAPI calling convention + */ +#ifdef SYS_WINNT +u_int +WINAPI +#else +void * +#endif +blocking_thread( + void * ThreadArg + ) +{ + blocking_child *c; + + c = ThreadArg; + exit_worker(blocking_child_common(c)); + + /* NOTREACHED */ + return 0; +} + + +/* + * req_child_exit() runs in the parent. + */ +int +req_child_exit( + blocking_child *c + ) +{ + return queue_req_pointer(c, CHILD_EXIT_REQ); +} + + +/* + * cleanup_after_child() runs in parent. + */ +static void +cleanup_after_child( + blocking_child * c + ) +{ + u_int idx; + + DEBUG_INSIST(!c->reusable); +#ifdef SYS_WINNT + INSIST(CloseHandle(c->thread_ref)); +#else + free(c->thread_ref); +#endif + c->thread_ref = NULL; + c->thread_id = 0; +#ifdef WORK_PIPE + DEBUG_INSIST(-1 != c->resp_read_pipe); + DEBUG_INSIST(-1 != c->resp_write_pipe); + (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); + close(c->resp_write_pipe); + close(c->resp_read_pipe); + c->resp_write_pipe = -1; + c->resp_read_pipe = -1; +#else + DEBUG_INSIST(NULL != c->blocking_response_ready); + (*addremove_io_semaphore)(c->blocking_response_ready, TRUE); +#endif + for (idx = 0; idx < c->workitems_alloc; idx++) + c->workitems[idx] = NULL; + c->next_workitem = 0; + c->next_workeritem = 0; + for (idx = 0; idx < c->responses_alloc; idx++) + c->responses[idx] = NULL; + c->next_response = 0; + c->next_workresp = 0; + c->reusable = TRUE; +} + + +#else /* !WORK_THREAD follows */ +char work_thread_nonempty_compilation_unit; +#endif |