summaryrefslogtreecommitdiff
path: root/extras/dispatch/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'extras/dispatch/src/server.c')
-rw-r--r--extras/dispatch/src/server.c903
1 files changed, 903 insertions, 0 deletions
diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c
new file mode 100644
index 0000000000..0099393f60
--- /dev/null
+++ b/extras/dispatch/src/server.c
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/log.h>
+#include "server_private.h"
+#include "timer_private.h"
+#include "alloc_private.h"
+#include "auth.h"
+#include "work_queue.h"
+#include <stdio.h>
+#include <time.h>
+#include <signal.h>
+
+static char *module="SERVER";
+
+typedef struct dx_thread_t {
+ int thread_id;
+ volatile int running;
+ volatile int canceled;
+ int using_thread;
+ sys_thread_t *thread;
+} dx_thread_t;
+
+
+typedef struct dx_server_t {
+ int thread_count;
+ pn_driver_t *driver;
+ dx_thread_start_cb_t start_handler;
+ dx_conn_handler_cb_t conn_handler;
+ dx_signal_handler_cb_t signal_handler;
+ dx_user_fd_handler_cb_t ufd_handler;
+ void *start_context;
+ void *conn_context;
+ void *signal_context;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ dx_thread_t **threads;
+ work_queue_t *work_queue;
+ dx_timer_list_t pending_timers;
+ bool a_thread_is_waiting;
+ int threads_active;
+ int pause_requests;
+ int threads_paused;
+ int pause_next_sequence;
+ int pause_now_serving;
+ int pending_signal;
+} dx_server_t;
+
+
+ALLOC_DEFINE(dx_listener_t);
+ALLOC_DEFINE(dx_connector_t);
+ALLOC_DEFINE(dx_connection_t);
+ALLOC_DEFINE(dx_user_fd_t);
+
+
+/**
+ * Singleton Concurrent Proton Driver object
+ */
+static dx_server_t *dx_server = 0;
+
+
+static void signal_handler(int signum)
+{
+ dx_server->pending_signal = signum;
+ sys_cond_signal_all(dx_server->cond);
+}
+
+
+static dx_thread_t *thread(int id)
+{
+ dx_thread_t *thread = NEW(dx_thread_t);
+ if (!thread)
+ return 0;
+
+ thread->thread_id = id;
+ thread->running = 0;
+ thread->canceled = 0;
+ thread->using_thread = 0;
+
+ return thread;
+}
+
+
+static void thread_process_listeners(pn_driver_t *driver)
+{
+ pn_listener_t *listener = pn_driver_listener(driver);
+ pn_connector_t *cxtr;
+ dx_connection_t *ctx;
+
+ while (listener) {
+ dx_log(module, LOG_TRACE, "Accepting Connection");
+ cxtr = pn_listener_accept(listener);
+ ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_SASL_SERVER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_cxtr = cxtr;
+ ctx->pn_conn = 0;
+ ctx->listener = (dx_listener_t*) pn_listener_context(listener);
+ ctx->connector = 0;
+ ctx->context = ctx->listener->context;
+ ctx->ufd = 0;
+
+ pn_connector_set_context(cxtr, ctx);
+ listener = pn_driver_listener(driver);
+ }
+}
+
+
+static void handle_signals_LH(void)
+{
+ int signum = dx_server->pending_signal;
+
+ if (signum) {
+ dx_server->pending_signal = 0;
+ if (dx_server->signal_handler) {
+ sys_mutex_unlock(dx_server->lock);
+ dx_server->signal_handler(dx_server->signal_context, signum);
+ sys_mutex_lock(dx_server->lock);
+ }
+ }
+}
+
+
+static void block_if_paused_LH(void)
+{
+ if (dx_server->pause_requests > 0) {
+ dx_server->threads_paused++;
+ sys_cond_signal_all(dx_server->cond);
+ while (dx_server->pause_requests > 0)
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+ dx_server->threads_paused--;
+ }
+}
+
+
+static void process_connector(pn_connector_t *cxtr)
+{
+ dx_connection_t *ctx = pn_connector_context(cxtr);
+ int events = 0;
+ int auth_passes = 0;
+
+ if (ctx->state == CONN_STATE_USER) {
+ dx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
+ return;
+ }
+
+ do {
+ //
+ // Step the engine for pre-handler processing
+ //
+ pn_connector_process(cxtr);
+
+ //
+ // Call the handler that is appropriate for the connector's state.
+ //
+ switch (ctx->state) {
+ case CONN_STATE_CONNECTING:
+ if (!pn_connector_closed(cxtr)) {
+ ctx->state = CONN_STATE_SASL_CLIENT;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
+ events = 1;
+ } else {
+ ctx->state = CONN_STATE_FAILED;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_CLIENT:
+ if (auth_passes == 0) {
+ auth_client_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_SERVER:
+ if (auth_passes == 0) {
+ auth_server_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_OPENING:
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, "dispatch"); // TODO - make unique
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+
+ case CONN_STATE_OPERATIONAL:
+ if (pn_connector_closed(cxtr)) {
+ dx_server->conn_handler(ctx->context,
+ DX_CONN_EVENT_CLOSE,
+ (dx_connection_t*) pn_connector_context(cxtr));
+ events = 0;
+ }
+ else
+ events = dx_server->conn_handler(ctx->context,
+ DX_CONN_EVENT_PROCESS,
+ (dx_connection_t*) pn_connector_context(cxtr));
+ break;
+
+ default:
+ break;
+ }
+ } while (events > 0);
+}
+
+
+//
+// TEMPORARY FUNCTION PROTOTYPES
+//
+void pn_driver_wait_1(pn_driver_t *d);
+int pn_driver_wait_2(pn_driver_t *d, int timeout);
+void pn_driver_wait_3(pn_driver_t *d);
+//
+// END TEMPORARY
+//
+
+static void *thread_run(void *arg)
+{
+ dx_thread_t *thread = (dx_thread_t*) arg;
+ pn_connector_t *work;
+ pn_connection_t *conn;
+ dx_connection_t *ctx;
+ int error;
+ int poll_result;
+ int timer_holdoff = 0;
+
+ if (!thread)
+ return 0;
+
+ thread->running = 1;
+
+ if (thread->canceled)
+ return 0;
+
+ //
+ // Invoke the start handler if the application supplied one.
+ // This handler can be used to set NUMA or processor affinnity for the thread.
+ //
+ if (dx_server->start_handler)
+ dx_server->start_handler(dx_server->start_context, thread->thread_id);
+
+ //
+ // Main Loop
+ //
+ while (thread->running) {
+ sys_mutex_lock(dx_server->lock);
+
+ //
+ // Check for pending signals to process
+ //
+ handle_signals_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Check to see if the server is pausing. If so, block here.
+ //
+ block_if_paused_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Service pending timers.
+ //
+ dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(dx_server->pending_timers);
+
+ //
+ // Mark the timer as idle in case it reschedules itself.
+ //
+ dx_timer_idle_LH(timer);
+
+ //
+ // Release the lock and invoke the connection handler.
+ //
+ sys_mutex_unlock(dx_server->lock);
+ timer->handler(timer->context);
+ pn_driver_wakeup(dx_server->driver);
+ continue;
+ }
+
+ //
+ // Check the work queue for connectors scheduled for processing.
+ //
+ work = work_queue_get(dx_server->work_queue);
+ if (!work) {
+ //
+ // There is no pending work to do
+ //
+ if (dx_server->a_thread_is_waiting) {
+ //
+ // Another thread is waiting on the proton driver, this thread must
+ // wait on the condition variable until signaled.
+ //
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+ } else {
+ //
+ // This thread elects itself to wait on the proton driver. Set the
+ // thread-is-waiting flag so other idle threads will not interfere.
+ //
+ dx_server->a_thread_is_waiting = true;
+
+ //
+ // Ask the timer module when its next timer is scheduled to fire. We'll
+ // use this value in driver_wait as the timeout. If there are no scheduled
+ // timers, the returned value will be -1.
+ //
+ long duration = dx_timer_next_duration_LH();
+
+ //
+ // Invoke the proton driver's wait sequence. This is a bit of a hack for now
+ // and will be improved in the future. The wait process is divided into three parts,
+ // the first and third of which need to be non-reentrant, and the second of which
+ // must be reentrant (and blocks).
+ //
+ pn_driver_wait_1(dx_server->driver);
+ sys_mutex_unlock(dx_server->lock);
+
+ do {
+ error = 0;
+ poll_result = pn_driver_wait_2(dx_server->driver, duration);
+ if (poll_result == -1)
+ error = pn_driver_errno(dx_server->driver);
+ } while (error == PN_INTR);
+ if (error) {
+ dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver)));
+ exit(-1);
+ }
+
+ sys_mutex_lock(dx_server->lock);
+ pn_driver_wait_3(dx_server->driver);
+
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Visit the timer module.
+ //
+ if (poll_result == 0 || ++timer_holdoff == 100) {
+ struct timespec tv;
+ clock_gettime(CLOCK_REALTIME, &tv);
+ long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
+ dx_timer_visit_LH(milliseconds);
+ timer_holdoff = 0;
+ }
+
+ //
+ // Process listeners (incoming connections).
+ //
+ thread_process_listeners(dx_server->driver);
+
+ //
+ // Traverse the list of connectors-needing-service from the proton driver.
+ // If the connector is not already in the work queue and it is not currently
+ // being processed by another thread, put it in the work queue and signal the
+ // condition variable.
+ //
+ work = pn_driver_connector(dx_server->driver);
+ while (work) {
+ ctx = pn_connector_context(work);
+ if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->enqueued = 1;
+ work_queue_put(dx_server->work_queue, work);
+ sys_cond_signal(dx_server->cond);
+ }
+ work = pn_driver_connector(dx_server->driver);
+ }
+
+ //
+ // Release our exclusive claim on pn_driver_wait.
+ //
+ dx_server->a_thread_is_waiting = false;
+ }
+ }
+
+ //
+ // If we were given a connector to work on from the work queue, mark it as
+ // owned by this thread and as no longer enqueued.
+ //
+ if (work) {
+ ctx = pn_connector_context(work);
+ if (ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->owner_thread = thread->thread_id;
+ ctx->enqueued = 0;
+ dx_server->threads_active++;
+ } else {
+ //
+ // This connector is being processed by another thread, re-queue it.
+ //
+ work_queue_put(dx_server->work_queue, work);
+ work = 0;
+ }
+ }
+ sys_mutex_unlock(dx_server->lock);
+
+ //
+ // Process the connector that we now have exclusive access to.
+ //
+ if (work) {
+ process_connector(work);
+
+ //
+ // Check to see if the connector was closed during processing
+ //
+ if (pn_connector_closed(work)) {
+ //
+ // Connector is closed. Free the context and the connector.
+ //
+ conn = pn_connector_connection(work);
+ if (ctx->connector) {
+ ctx->connector->ctx = 0;
+ ctx->connector->state = CXTR_STATE_CONNECTING;
+ dx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+ }
+ sys_mutex_lock(dx_server->lock);
+ free_dx_connection_t(ctx);
+ pn_connector_free(work);
+ if (conn)
+ pn_connection_free(conn);
+ dx_server->threads_active--;
+ sys_mutex_unlock(dx_server->lock);
+ } else {
+ //
+ // The connector lives on. Mark it as no longer owned by this thread.
+ //
+ sys_mutex_lock(dx_server->lock);
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ dx_server->threads_active--;
+ sys_mutex_unlock(dx_server->lock);
+ }
+
+ //
+ // Wake up the proton driver to force it to reconsider its set of FDs
+ // in light of the processing that just occurred.
+ //
+ pn_driver_wakeup(dx_server->driver);
+ }
+ }
+
+ return 0;
+}
+
+
+static void thread_start(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->using_thread = 1;
+ thread->thread = sys_thread(thread_run, (void*) thread);
+}
+
+
+static void thread_cancel(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->running = 0;
+ thread->canceled = 1;
+}
+
+
+static void thread_join(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ if (thread->using_thread)
+ sys_thread_join(thread->thread);
+}
+
+
+static void thread_free(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ free(thread);
+}
+
+
+static void cxtr_try_open(void *context)
+{
+ dx_connector_t *ct = (dx_connector_t*) context;
+ if (ct->state != CXTR_STATE_CONNECTING)
+ return;
+
+ dx_connection_t *ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_CONNECTING;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = ct;
+ ctx->context = ct->context;
+ ctx->user_context = 0;
+ ctx->ufd = 0;
+
+ //
+ // pn_connector is not thread safe
+ //
+ sys_mutex_lock(dx_server->lock);
+ ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
+ sys_mutex_unlock(dx_server->lock);
+
+ ct->ctx = ctx;
+ ct->delay = 5000;
+ dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
+}
+
+
+void dx_server_initialize(int thread_count)
+{
+ int i;
+
+ if (dx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ dx_alloc_initialize();
+ dx_server = NEW(dx_server_t);
+
+ if (!dx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ dx_server->thread_count = thread_count;
+ dx_server->driver = pn_driver();
+ dx_server->start_handler = 0;
+ dx_server->conn_handler = 0;
+ dx_server->signal_handler = 0;
+ dx_server->ufd_handler = 0;
+ dx_server->start_context = 0;
+ dx_server->signal_context = 0;
+ dx_server->lock = sys_mutex();
+ dx_server->cond = sys_cond();
+
+ dx_timer_initialize(dx_server->lock);
+
+ dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
+ for (i = 0; i < thread_count; i++)
+ dx_server->threads[i] = thread(i);
+
+ dx_server->work_queue = work_queue();
+ DEQ_INIT(dx_server->pending_timers);
+ dx_server->a_thread_is_waiting = false;
+ dx_server->threads_active = 0;
+ dx_server->pause_requests = 0;
+ dx_server->threads_paused = 0;
+ dx_server->pause_next_sequence = 0;
+ dx_server->pause_now_serving = 0;
+ dx_server->pending_signal = 0;
+}
+
+
+void dx_server_finalize(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ for (i = 0; i < dx_server->thread_count; i++)
+ thread_free(dx_server->threads[i]);
+
+ work_queue_free(dx_server->work_queue);
+
+ pn_driver_free(dx_server->driver);
+ sys_mutex_free(dx_server->lock);
+ sys_cond_free(dx_server->cond);
+ free(dx_server);
+ dx_server = 0;
+}
+
+
+void dx_server_set_conn_handler(dx_conn_handler_cb_t handler)
+{
+ dx_server->conn_handler = handler;
+}
+
+
+void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context)
+{
+ dx_server->signal_handler = handler;
+ dx_server->signal_context = context;
+}
+
+
+void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context)
+{
+ dx_server->start_handler = handler;
+ dx_server->start_context = context;
+}
+
+
+void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler)
+{
+ dx_server->ufd_handler = ufd_handler;
+}
+
+
+void dx_server_run(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ assert(dx_server->conn_handler); // Server can't run without a connection handler.
+
+ for (i = 1; i < dx_server->thread_count; i++)
+ thread_start(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
+
+ thread_run((void*) dx_server->threads[0]);
+
+ for (i = 1; i < dx_server->thread_count; i++)
+ thread_join(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Shut Down");
+}
+
+
+void dx_server_stop(void)
+{
+ int idx;
+
+ sys_mutex_lock(dx_server->lock);
+ for (idx = 0; idx < dx_server->thread_count; idx++)
+ thread_cancel(dx_server->threads[idx]);
+ sys_cond_signal_all(dx_server->cond);
+ pn_driver_wakeup(dx_server->driver);
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_signal(int signum)
+{
+ signal(signum, signal_handler);
+}
+
+
+void dx_server_pause(void)
+{
+ sys_mutex_lock(dx_server->lock);
+
+ //
+ // Bump the request count to stop all the threads.
+ //
+ dx_server->pause_requests++;
+ int my_sequence = dx_server->pause_next_sequence++;
+
+ //
+ // Awaken all threads that are currently blocking.
+ //
+ sys_cond_signal_all(dx_server->cond);
+ pn_driver_wakeup(dx_server->driver);
+
+ //
+ // Wait for the paused thread count plus the number of threads requesting a pause to equal
+ // the total thread count. Also, don't exit the blocking loop until now_serving equals our
+ // sequence number. This ensures that concurrent pausers don't run at the same time.
+ //
+ while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) ||
+ (my_sequence != dx_server->pause_now_serving))
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_resume(void)
+{
+ sys_mutex_lock(dx_server->lock);
+ dx_server->pause_requests--;
+ dx_server->pause_now_serving++;
+ sys_cond_signal_all(dx_server->cond);
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_activate(dx_connection_t *ctx)
+{
+ if (!ctx)
+ return;
+
+ pn_connector_t *ctor = ctx->pn_cxtr;
+ if (!ctor)
+ return;
+
+ if (!pn_connector_closed(ctor))
+ pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE);
+}
+
+
+void dx_connection_set_context(dx_connection_t *conn, void *context)
+{
+ conn->user_context = context;
+}
+
+
+void *dx_connection_get_context(dx_connection_t *conn)
+{
+ return conn->user_context;
+}
+
+
+pn_connection_t *dx_connection_pn(dx_connection_t *conn)
+{
+ return conn->pn_conn;
+}
+
+
+dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context)
+{
+ dx_listener_t *li = new_dx_listener_t();
+
+ if (!li)
+ return 0;
+
+ li->config = config;
+ li->context = context;
+ li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);
+
+ if (!li->pn_listener) {
+ dx_log(module, LOG_ERROR, "Driver Error %d (%s)",
+ pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver));
+ free_dx_listener_t(li);
+ return 0;
+ }
+ dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
+
+ return li;
+}
+
+
+void dx_server_listener_free(dx_listener_t* li)
+{
+ pn_listener_free(li->pn_listener);
+ free_dx_listener_t(li);
+}
+
+
+void dx_server_listener_close(dx_listener_t* li)
+{
+ pn_listener_close(li->pn_listener);
+}
+
+
+dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context)
+{
+ dx_connector_t *ct = new_dx_connector_t();
+
+ if (!ct)
+ return 0;
+
+ ct->state = CXTR_STATE_CONNECTING;
+ ct->config = config;
+ ct->context = context;
+ ct->ctx = 0;
+ ct->timer = dx_timer(cxtr_try_open, (void*) ct);
+ ct->delay = 0;
+
+ dx_timer_schedule(ct->timer, ct->delay);
+ return ct;
+}
+
+
+void dx_server_connector_free(dx_connector_t* ct)
+{
+ // Don't free the proton connector. This will be done by the connector
+ // processing/cleanup.
+
+ if (ct->ctx) {
+ pn_connector_close(ct->ctx->pn_cxtr);
+ ct->ctx->connector = 0;
+ }
+
+ dx_timer_free(ct->timer);
+ free_dx_connector_t(ct);
+}
+
+
+dx_user_fd_t *dx_user_fd(int fd, void *context)
+{
+ dx_user_fd_t *ufd = new_dx_user_fd_t();
+
+ if (!ufd)
+ return 0;
+
+ dx_connection_t *ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_USER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = 0;
+ ctx->context = 0;
+ ctx->user_context = 0;
+ ctx->ufd = ufd;
+
+ ufd->context = context;
+ ufd->fd = fd;
+ ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
+ pn_driver_wakeup(dx_server->driver);
+
+ return ufd;
+}
+
+
+void dx_user_fd_free(dx_user_fd_t *ufd)
+{
+ pn_connector_close(ufd->pn_conn);
+ free_dx_user_fd_t(ufd);
+}
+
+
+void dx_user_fd_activate_read(dx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
+ pn_driver_wakeup(dx_server->driver);
+}
+
+
+void dx_user_fd_activate_write(dx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+ pn_driver_wakeup(dx_server->driver);
+}
+
+
+bool dx_user_fd_is_readable(dx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
+}
+
+
+bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+}
+
+
+void dx_server_timer_pending_LH(dx_timer_t *timer)
+{
+ DEQ_INSERT_TAIL(dx_server->pending_timers, timer);
+}
+
+
+void dx_server_timer_cancel_LH(dx_timer_t *timer)
+{
+ DEQ_REMOVE(dx_server->pending_timers, timer);
+}
+