summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/extras/dispatch/src/server.c')
-rw-r--r--qpid/extras/dispatch/src/server.c71
1 files changed, 38 insertions, 33 deletions
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c
index 5420d3b776..65e181bd2c 100644
--- a/qpid/extras/dispatch/src/server.c
+++ b/qpid/extras/dispatch/src/server.c
@@ -109,6 +109,8 @@ static void thread_process_listeners(dx_server_t *dx_server)
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
+ ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = 0;
pn_connection_t *conn = pn_connection();
@@ -327,7 +329,6 @@ static void *thread_run(void *arg)
dx_connection_t *ctx;
int error;
int poll_result;
- int timer_holdoff = 0;
if (!thread)
return 0;
@@ -372,35 +373,20 @@ static void *thread_run(void *arg)
//
// Service pending timers.
//
- if (DEQ_SIZE(dx_server->pending_timers) > 0) {
- dx_timer_list_t local_list;
- dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
-
- DEQ_INIT(local_list);
- while (timer) {
- DEQ_REMOVE_HEAD(dx_server->pending_timers);
- DEQ_INSERT_TAIL(local_list, timer);
- timer = DEQ_HEAD(dx_server->pending_timers);
- }
+ dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(dx_server->pending_timers);
//
- // Release the lock and invoke the connection handlers.
+ // Mark the timer as idle in case it reschedules itself.
//
- sys_mutex_unlock(dx_server->lock);
-
- timer = DEQ_HEAD(local_list);
- while (timer) {
- DEQ_REMOVE_HEAD(local_list);
-
- //
- // Mark the timer as idle in case it reschedules itself.
- //
- dx_timer_idle_LH(timer);
-
- timer->handler(timer->context);
- timer = DEQ_HEAD(local_list);
- }
+ dx_timer_idle_LH(timer);
+ //
+ // Release the lock and invoke the connection handler.
+ //
+ sys_mutex_unlock(dx_server->lock);
+ timer->handler(timer->context);
pn_driver_wakeup(dx_server->driver);
continue;
}
@@ -464,13 +450,10 @@ static void *thread_run(void *arg)
//
// Visit the timer module.
//
- if (poll_result == 0 || ++timer_holdoff == 100) {
- struct timespec tv;
- clock_gettime(CLOCK_REALTIME, &tv);
- long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
- dx_timer_visit_LH(milliseconds);
- timer_holdoff = 0;
- }
+ struct timespec tv;
+ clock_gettime(CLOCK_REALTIME, &tv);
+ long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
+ dx_timer_visit_LH(milliseconds);
//
// Process listeners (incoming connections).
@@ -630,6 +613,7 @@ static void cxtr_try_open(void *context)
ctx->connector = ct;
ctx->context = ct->context;
ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = 0;
//
@@ -878,12 +862,32 @@ void *dx_connection_get_context(dx_connection_t *conn)
}
+void dx_connection_set_link_context(dx_connection_t *conn, void *context)
+{
+ conn->link_context = context;
+}
+
+
+void *dx_connection_get_link_context(dx_connection_t *conn)
+{
+ return conn->link_context;
+}
+
+
pn_connection_t *dx_connection_pn(dx_connection_t *conn)
{
return conn->pn_conn;
}
+const dx_server_config_t *dx_connection_config(const dx_connection_t *conn)
+{
+ if (conn->listener)
+ return conn->listener->config;
+ return conn->connector->config;
+}
+
+
dx_listener_t *dx_server_listen(dx_dispatch_t *dx, const dx_server_config_t *config, void *context)
{
dx_server_t *dx_server = dx->server;
@@ -976,6 +980,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context)
ctx->connector = 0;
ctx->context = 0;
ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = ufd;
ufd->context = context;