summaryrefslogtreecommitdiff
path: root/libntp/work_thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'libntp/work_thread.c')
-rw-r--r--libntp/work_thread.c726
1 files changed, 726 insertions, 0 deletions
diff --git a/libntp/work_thread.c b/libntp/work_thread.c
new file mode 100644
index 0000000..38d8747
--- /dev/null
+++ b/libntp/work_thread.c
@@ -0,0 +1,726 @@
+/*
+ * work_thread.c - threads implementation for blocking worker child.
+ */
+#include <config.h>
+#include "ntp_workimpl.h"
+
+#ifdef WORK_THREAD
+
+#include <stdio.h>
+#include <ctype.h>
+#include <signal.h>
+#ifndef SYS_WINNT
+#include <pthread.h>
+#endif
+
+#include "ntp_stdlib.h"
+#include "ntp_malloc.h"
+#include "ntp_syslog.h"
+#include "ntpd.h"
+#include "ntp_io.h"
+#include "ntp_assert.h"
+#include "ntp_unixtime.h"
+#include "timespecops.h"
+#include "ntp_worker.h"
+
+#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
+#define CHILD_GONE_RESP CHILD_EXIT_REQ
+#define WORKITEMS_ALLOC_INC 16
+#define RESPONSES_ALLOC_INC 4
+
+#ifndef THREAD_MINSTACKSIZE
+#define THREAD_MINSTACKSIZE (64U * 1024)
+#endif
+
+#ifndef DEVOLATILE
+#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
+#endif
+
+#ifdef SYS_WINNT
+# define thread_exit(c) _endthreadex(c)
+# define tickle_sem SetEvent
+#else
+# define thread_exit(c) pthread_exit((void*)(size_t)(c))
+# define tickle_sem sem_post
+#endif
+
+#ifdef WORK_PIPE
+addremove_io_fd_func addremove_io_fd;
+#else
+addremove_io_semaphore_func addremove_io_semaphore;
+#endif
+
+static void start_blocking_thread(blocking_child *);
+static void start_blocking_thread_internal(blocking_child *);
+static void prepare_child_sems(blocking_child *);
+static int wait_for_sem(sem_ref, struct timespec *);
+static void ensure_workitems_empty_slot(blocking_child *);
+static void ensure_workresp_empty_slot(blocking_child *);
+static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
+static void cleanup_after_child(blocking_child *);
+#ifdef SYS_WINNT
+u_int WINAPI blocking_thread(void *);
+#else
+void * blocking_thread(void *);
+#endif
+#ifndef SYS_WINNT
+static void block_thread_signals(sigset_t *);
+#endif
+
+
+void
+exit_worker(
+ int exitcode
+ )
+{
+ thread_exit(exitcode); /* see #define thread_exit */
+}
+
+
+int
+worker_sleep(
+ blocking_child * c,
+ time_t seconds
+ )
+{
+ struct timespec until;
+ int rc;
+
+# ifdef HAVE_CLOCK_GETTIME
+ if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
+ msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
+ return -1;
+ }
+# else
+ if (0 != getclock(TIMEOFDAY, &until)) {
+ msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
+ return -1;
+ }
+# endif
+ until.tv_sec += seconds;
+ do {
+ rc = wait_for_sem(c->wake_scheduled_sleep, &until);
+ } while (-1 == rc && EINTR == errno);
+ if (0 == rc)
+ return -1;
+ if (-1 == rc && ETIMEDOUT == errno)
+ return 0;
+ msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
+ return -1;
+}
+
+
+void
+interrupt_worker_sleep(void)
+{
+ u_int idx;
+ blocking_child * c;
+
+ for (idx = 0; idx < blocking_children_alloc; idx++) {
+ c = blocking_children[idx];
+ if (NULL == c || NULL == c->wake_scheduled_sleep)
+ continue;
+ tickle_sem(c->wake_scheduled_sleep);
+ }
+}
+
+
+static void
+ensure_workitems_empty_slot(
+ blocking_child *c
+ )
+{
+ const size_t each = sizeof(blocking_children[0]->workitems[0]);
+ size_t new_alloc;
+ size_t old_octets;
+ size_t new_octets;
+ void * nonvol_workitems;
+
+
+ if (c->workitems != NULL &&
+ NULL == c->workitems[c->next_workitem])
+ return;
+
+ new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
+ old_octets = c->workitems_alloc * each;
+ new_octets = new_alloc * each;
+ nonvol_workitems = DEVOLATILE(void *, c->workitems);
+ c->workitems = erealloc_zero(nonvol_workitems, new_octets,
+ old_octets);
+ if (0 == c->next_workitem)
+ c->next_workitem = c->workitems_alloc;
+ c->workitems_alloc = new_alloc;
+}
+
+
+static void
+ensure_workresp_empty_slot(
+ blocking_child *c
+ )
+{
+ const size_t each = sizeof(blocking_children[0]->responses[0]);
+ size_t new_alloc;
+ size_t old_octets;
+ size_t new_octets;
+ void * nonvol_responses;
+
+ if (c->responses != NULL &&
+ NULL == c->responses[c->next_response])
+ return;
+
+ new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
+ old_octets = c->responses_alloc * each;
+ new_octets = new_alloc * each;
+ nonvol_responses = DEVOLATILE(void *, c->responses);
+ c->responses = erealloc_zero(nonvol_responses, new_octets,
+ old_octets);
+ if (0 == c->next_response)
+ c->next_response = c->responses_alloc;
+ c->responses_alloc = new_alloc;
+}
+
+
+/*
+ * queue_req_pointer() - append a work item or idle exit request to
+ * blocking_workitems[].
+ */
+static int
+queue_req_pointer(
+ blocking_child * c,
+ blocking_pipe_header * hdr
+ )
+{
+ c->workitems[c->next_workitem] = hdr;
+ c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
+
+ /*
+ * We only want to signal the wakeup event if the child is
+ * blocking on it, which is indicated by setting the blocking
+ * event. Wait with zero timeout to test.
+ */
+ /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
+ tickle_sem(c->blocking_req_ready);
+
+ return 0;
+}
+
+
+int
+send_blocking_req_internal(
+ blocking_child * c,
+ blocking_pipe_header * hdr,
+ void * data
+ )
+{
+ blocking_pipe_header * threadcopy;
+ size_t payload_octets;
+
+ REQUIRE(hdr != NULL);
+ REQUIRE(data != NULL);
+ DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
+
+ if (hdr->octets <= sizeof(*hdr))
+ return 1; /* failure */
+ payload_octets = hdr->octets - sizeof(*hdr);
+
+ ensure_workitems_empty_slot(c);
+ if (NULL == c->thread_ref) {
+ ensure_workresp_empty_slot(c);
+ start_blocking_thread(c);
+ }
+
+ threadcopy = emalloc(hdr->octets);
+ memcpy(threadcopy, hdr, sizeof(*hdr));
+ memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
+
+ return queue_req_pointer(c, threadcopy);
+}
+
+
+blocking_pipe_header *
+receive_blocking_req_internal(
+ blocking_child * c
+ )
+{
+ blocking_pipe_header * req;
+ int rc;
+
+ /*
+ * Child blocks here when idle. SysV semaphores maintain a
+ * count and release from sem_wait() only when it reaches 0.
+ * Windows auto-reset events are simpler, and multiple SetEvent
+ * calls before any thread waits result in a single wakeup.
+ * On Windows, the child drains all workitems each wakeup, while
+ * with SysV semaphores wait_sem() is used before each item.
+ */
+#ifdef SYS_WINNT
+ while (NULL == c->workitems[c->next_workeritem]) {
+ /* !!!! SetEvent(c->child_is_blocking); */
+ rc = wait_for_sem(c->blocking_req_ready, NULL);
+ INSIST(0 == rc);
+ /* !!!! ResetEvent(c->child_is_blocking); */
+ }
+#else
+ do {
+ rc = wait_for_sem(c->blocking_req_ready, NULL);
+ } while (-1 == rc && EINTR == errno);
+ INSIST(0 == rc);
+#endif
+
+ req = c->workitems[c->next_workeritem];
+ INSIST(NULL != req);
+ c->workitems[c->next_workeritem] = NULL;
+ c->next_workeritem = (1 + c->next_workeritem) %
+ c->workitems_alloc;
+
+ if (CHILD_EXIT_REQ == req) { /* idled out */
+ send_blocking_resp_internal(c, CHILD_GONE_RESP);
+ req = NULL;
+ }
+
+ return req;
+}
+
+
+int
+send_blocking_resp_internal(
+ blocking_child * c,
+ blocking_pipe_header * resp
+ )
+{
+ ensure_workresp_empty_slot(c);
+
+ c->responses[c->next_response] = resp;
+ c->next_response = (1 + c->next_response) % c->responses_alloc;
+
+#ifdef WORK_PIPE
+ write(c->resp_write_pipe, "", 1);
+#else
+ tickle_sem(c->blocking_response_ready);
+#endif
+
+ return 0;
+}
+
+
+#ifndef WORK_PIPE
+void
+handle_blocking_resp_sem(
+ void * context
+ )
+{
+ HANDLE ready;
+ blocking_child * c;
+ u_int idx;
+
+ ready = (HANDLE)context;
+ c = NULL;
+ for (idx = 0; idx < blocking_children_alloc; idx++) {
+ c = blocking_children[idx];
+ if (c != NULL && c->thread_ref != NULL &&
+ ready == c->blocking_response_ready)
+ break;
+ }
+ if (idx < blocking_children_alloc)
+ process_blocking_resp(c);
+}
+#endif /* !WORK_PIPE */
+
+
+blocking_pipe_header *
+receive_blocking_resp_internal(
+ blocking_child * c
+ )
+{
+ blocking_pipe_header * removed;
+#ifdef WORK_PIPE
+ int rc;
+ char scratch[32];
+
+ do {
+ rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
+ } while (-1 == rc && EINTR == errno);
+#endif
+ removed = c->responses[c->next_workresp];
+ if (NULL != removed) {
+ c->responses[c->next_workresp] = NULL;
+ c->next_workresp = (1 + c->next_workresp) %
+ c->responses_alloc;
+ DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
+ BLOCKING_RESP_MAGIC == removed->magic_sig);
+ }
+ if (CHILD_GONE_RESP == removed) {
+ cleanup_after_child(c);
+ removed = NULL;
+ }
+
+ return removed;
+}
+
+
+static void
+start_blocking_thread(
+ blocking_child * c
+ )
+{
+
+ DEBUG_INSIST(!c->reusable);
+
+ prepare_child_sems(c);
+ start_blocking_thread_internal(c);
+}
+
+
+static void
+start_blocking_thread_internal(
+ blocking_child * c
+ )
+#ifdef SYS_WINNT
+{
+ thr_ref blocking_child_thread;
+ u_int blocking_thread_id;
+ BOOL resumed;
+
+ (*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
+ blocking_child_thread =
+ (HANDLE)_beginthreadex(
+ NULL,
+ 0,
+ &blocking_thread,
+ c,
+ CREATE_SUSPENDED,
+ &blocking_thread_id);
+
+ if (NULL == blocking_child_thread) {
+ msyslog(LOG_ERR, "start blocking thread failed: %m");
+ exit(-1);
+ }
+ c->thread_id = blocking_thread_id;
+ c->thread_ref = blocking_child_thread;
+ /* remember the thread priority is only within the process class */
+ if (!SetThreadPriority(blocking_child_thread,
+ THREAD_PRIORITY_BELOW_NORMAL))
+ msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
+
+ resumed = ResumeThread(blocking_child_thread);
+ DEBUG_INSIST(resumed);
+}
+#else /* pthreads start_blocking_thread_internal() follows */
+{
+# ifdef NEED_PTHREAD_INIT
+ static int pthread_init_called;
+# endif
+ pthread_attr_t thr_attr;
+ int rc;
+ int saved_errno;
+ int pipe_ends[2]; /* read then write */
+ int is_pipe;
+ int flags;
+ size_t stacksize;
+ sigset_t saved_sig_mask;
+
+# ifdef NEED_PTHREAD_INIT
+ /*
+ * from lib/isc/unix/app.c:
+ * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
+ */
+ if (!pthread_init_called) {
+ pthread_init();
+ pthread_init_called = TRUE;
+ }
+# endif
+
+ rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
+ if (0 != rc) {
+ msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
+ exit(1);
+ }
+ c->resp_read_pipe = move_fd(pipe_ends[0]);
+ c->resp_write_pipe = move_fd(pipe_ends[1]);
+ c->ispipe = is_pipe;
+ flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
+ if (-1 == flags) {
+ msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
+ exit(1);
+ }
+ rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
+ if (-1 == rc) {
+ msyslog(LOG_ERR,
+ "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
+ exit(1);
+ }
+ (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
+ pthread_attr_init(&thr_attr);
+ pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
+#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
+ defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
+ rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
+ if (-1 == rc) {
+ msyslog(LOG_ERR,
+ "start_blocking_thread: pthread_attr_getstacksize %m");
+ } else if (stacksize < THREAD_MINSTACKSIZE) {
+ rc = pthread_attr_setstacksize(&thr_attr,
+ THREAD_MINSTACKSIZE);
+ if (-1 == rc)
+ msyslog(LOG_ERR,
+ "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
+ (u_long)stacksize,
+ (u_long)THREAD_MINSTACKSIZE);
+ }
+#else
+ UNUSED_ARG(stacksize);
+#endif
+#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
+ pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
+#endif
+ c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
+ block_thread_signals(&saved_sig_mask);
+ rc = pthread_create(c->thread_ref, &thr_attr,
+ &blocking_thread, c);
+ saved_errno = errno;
+ pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
+ pthread_attr_destroy(&thr_attr);
+ if (0 != rc) {
+ errno = saved_errno;
+ msyslog(LOG_ERR, "pthread_create() blocking child: %m");
+ exit(1);
+ }
+}
+#endif
+
+
+/*
+ * block_thread_signals()
+ *
+ * Temporarily block signals used by ntpd main thread, so that signal
+ * mask inherited by child threads leaves them blocked. Returns prior
+ * active signal mask via pmask, to be restored by the main thread
+ * after pthread_create().
+ */
+#ifndef SYS_WINNT
+void
+block_thread_signals(
+ sigset_t * pmask
+ )
+{
+ sigset_t block;
+
+ sigemptyset(&block);
+# ifdef HAVE_SIGNALED_IO
+# ifdef SIGIO
+ sigaddset(&block, SIGIO);
+# endif
+# ifdef SIGPOLL
+ sigaddset(&block, SIGPOLL);
+# endif
+# endif /* HAVE_SIGNALED_IO */
+ sigaddset(&block, SIGALRM);
+ sigaddset(&block, MOREDEBUGSIG);
+ sigaddset(&block, LESSDEBUGSIG);
+# ifdef SIGDIE1
+ sigaddset(&block, SIGDIE1);
+# endif
+# ifdef SIGDIE2
+ sigaddset(&block, SIGDIE2);
+# endif
+# ifdef SIGDIE3
+ sigaddset(&block, SIGDIE3);
+# endif
+# ifdef SIGDIE4
+ sigaddset(&block, SIGDIE4);
+# endif
+# ifdef SIGBUS
+ sigaddset(&block, SIGBUS);
+# endif
+ sigemptyset(pmask);
+ pthread_sigmask(SIG_BLOCK, &block, pmask);
+}
+#endif /* !SYS_WINNT */
+
+
+/*
+ * prepare_child_sems()
+ *
+ * create sync events (semaphores)
+ * child_is_blocking initially unset
+ * blocking_req_ready initially unset
+ *
+ * Child waits for blocking_req_ready to be set after
+ * setting child_is_blocking. blocking_req_ready and
+ * blocking_response_ready are auto-reset, so wake one
+ * waiter and become unset (unsignalled) in one operation.
+ */
+static void
+prepare_child_sems(
+ blocking_child *c
+ )
+#ifdef SYS_WINNT
+{
+ if (NULL == c->blocking_req_ready) {
+ /* manual reset using ResetEvent() */
+ /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
+ /* auto reset - one thread released from wait each set */
+ c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
+ c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
+ c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
+ } else {
+ /* !!!! ResetEvent(c->child_is_blocking); */
+ /* ResetEvent(c->blocking_req_ready); */
+ /* ResetEvent(c->blocking_response_ready); */
+ /* ResetEvent(c->wake_scheduled_sleep); */
+ }
+}
+#else /* pthreads prepare_child_sems() follows */
+{
+ size_t octets;
+
+ if (NULL == c->blocking_req_ready) {
+ octets = sizeof(*c->blocking_req_ready);
+ octets += sizeof(*c->wake_scheduled_sleep);
+ /* !!!! octets += sizeof(*c->child_is_blocking); */
+ c->blocking_req_ready = emalloc_zero(octets);;
+ c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
+ /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
+ } else {
+ sem_destroy(c->blocking_req_ready);
+ sem_destroy(c->wake_scheduled_sleep);
+ /* !!!! sem_destroy(c->child_is_blocking); */
+ }
+ sem_init(c->blocking_req_ready, FALSE, 0);
+ sem_init(c->wake_scheduled_sleep, FALSE, 0);
+ /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
+}
+#endif
+
+
+static int
+wait_for_sem(
+ sem_ref sem,
+ struct timespec * timeout /* wall-clock */
+ )
+#ifdef SYS_WINNT
+{
+ struct timespec now;
+ struct timespec delta;
+ DWORD msec;
+ DWORD rc;
+
+ if (NULL == timeout) {
+ msec = INFINITE;
+ } else {
+ getclock(TIMEOFDAY, &now);
+ delta = sub_tspec(*timeout, now);
+ if (delta.tv_sec < 0) {
+ msec = 0;
+ } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
+ msec = INFINITE;
+ } else {
+ msec = 1000 * (DWORD)delta.tv_sec;
+ msec += delta.tv_nsec / (1000 * 1000);
+ }
+ }
+ rc = WaitForSingleObject(sem, msec);
+ if (WAIT_OBJECT_0 == rc)
+ return 0;
+ if (WAIT_TIMEOUT == rc) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+ msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
+ errno = EFAULT;
+ return -1;
+}
+#else /* pthreads wait_for_sem() follows */
+{
+ int rc;
+
+ if (NULL == timeout)
+ rc = sem_wait(sem);
+ else
+ rc = sem_timedwait(sem, timeout);
+
+ return rc;
+}
+#endif
+
+
+/*
+ * blocking_thread - thread functions have WINAPI calling convention
+ */
+#ifdef SYS_WINNT
+u_int
+WINAPI
+#else
+void *
+#endif
+blocking_thread(
+ void * ThreadArg
+ )
+{
+ blocking_child *c;
+
+ c = ThreadArg;
+ exit_worker(blocking_child_common(c));
+
+ /* NOTREACHED */
+ return 0;
+}
+
+
+/*
+ * req_child_exit() runs in the parent.
+ */
+int
+req_child_exit(
+ blocking_child *c
+ )
+{
+ return queue_req_pointer(c, CHILD_EXIT_REQ);
+}
+
+
+/*
+ * cleanup_after_child() runs in parent.
+ */
+static void
+cleanup_after_child(
+ blocking_child * c
+ )
+{
+ u_int idx;
+
+ DEBUG_INSIST(!c->reusable);
+#ifdef SYS_WINNT
+ INSIST(CloseHandle(c->thread_ref));
+#else
+ free(c->thread_ref);
+#endif
+ c->thread_ref = NULL;
+ c->thread_id = 0;
+#ifdef WORK_PIPE
+ DEBUG_INSIST(-1 != c->resp_read_pipe);
+ DEBUG_INSIST(-1 != c->resp_write_pipe);
+ (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
+ close(c->resp_write_pipe);
+ close(c->resp_read_pipe);
+ c->resp_write_pipe = -1;
+ c->resp_read_pipe = -1;
+#else
+ DEBUG_INSIST(NULL != c->blocking_response_ready);
+ (*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
+#endif
+ for (idx = 0; idx < c->workitems_alloc; idx++)
+ c->workitems[idx] = NULL;
+ c->next_workitem = 0;
+ c->next_workeritem = 0;
+ for (idx = 0; idx < c->responses_alloc; idx++)
+ c->responses[idx] = NULL;
+ c->next_response = 0;
+ c->next_workresp = 0;
+ c->reusable = TRUE;
+}
+
+
+#else /* !WORK_THREAD follows */
+char work_thread_nonempty_compilation_unit;
+#endif