diff options
| author | Ted Ross <tross@apache.org> | 2013-03-07 00:32:32 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-03-07 00:32:32 +0000 |
| commit | d507fcd7d13652075b812b1761bf795a50c54e18 (patch) | |
| tree | e7ba848b7011b5a0a9d5a1ac28994827436d390b /qpid/extras/dispatch/src | |
| parent | b154cdb711e8f93a2da51a1a10fa2c244b946748 (diff) | |
| download | qpid-python-d507fcd7d13652075b812b1761bf795a50c54e18.tar.gz | |
QPID-4612 - Major cleanup in the API.
Removed the singleton patterns.
Added a single header file for all of Dispatch.
Doxygen comments still need to be updated.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1453628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c | 48 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/container.c | 153 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/dispatch.c | 59 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/dispatch_private.h | 35 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 23 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/server.c | 162 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/server_private.h | 5 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/timer.c | 4 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/timer_private.h | 2 |
9 files changed, 316 insertions, 175 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index a885042b45..864d58fadc 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -17,6 +17,7 @@ * under the License. */ +#include "dispatch_private.h" #include <qpid/dispatch/agent.h> #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/hash.h> @@ -28,6 +29,7 @@ typedef struct dx_agent_t { + dx_server_t *server; hash_t *class_hash; dx_message_list_t in_fifo; dx_message_list_t out_fifo; @@ -35,8 +37,6 @@ typedef struct dx_agent_t { dx_timer_t *timer; } dx_agent_t; -static dx_agent_t *agent = 0; - struct dx_agent_class_t { char *fqname; @@ -52,33 +52,37 @@ static void dx_agent_timer_handler(void *context) } -void dx_agent_initialize() +dx_agent_t *dx_agent(dx_dispatch_t *dx) { - assert(!agent); - agent = NEW(dx_agent_t); + dx_agent_t *agent = NEW(dx_agent_t); + agent->server = dx->server; agent->class_hash = hash(6, 10, 1); DEQ_INIT(agent->in_fifo); DEQ_INIT(agent->out_fifo); agent->lock = sys_mutex(); - agent->timer = dx_timer(dx_agent_timer_handler, agent); + agent->timer = dx_timer(dx, dx_agent_timer_handler, agent); + + return agent; } -void dx_agent_finalize(void) +void dx_agent_free(dx_agent_t *agent) { sys_mutex_free(agent->lock); dx_timer_free(agent->timer); hash_free(agent->class_hash); free(agent); - agent = 0; } -dx_agent_class_t *dx_agent_register_class(const char *fqname, +dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx, + const char *fqname, void *context, dx_agent_schema_cb_t schema_handler, dx_agent_query_cb_t query_handler) { + dx_agent_t *agent = dx->agent; + dx_agent_class_t *cls = NEW(dx_agent_class_t); assert(cls); cls->fqname = (char*) malloc(strlen(fqname) + 1); @@ -90,61 +94,63 @@ dx_agent_class_t *dx_agent_register_class(const char *fqname, dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); int result = hash_insert_const(agent->class_hash, iter, cls); dx_field_iterator_free(iter); - assert(result >= 0); + if (result < 0) + assert(false); return cls; } -dx_agent_class_t *dx_agent_register_event(const char *fqname, +dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx, + const char *fqname, void *context, dx_agent_schema_cb_t schema_handler) { - return dx_agent_register_class(fqname, context, schema_handler, 0); + return dx_agent_register_class(dx, fqname, context, schema_handler, 0); } -void dx_agent_value_string(const void *correlator, const char *key, const char *value) +void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value) { } -void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value) +void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value) { } -void dx_agent_value_null(const void *correlator, const char *key) +void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key) { } -void dx_agent_value_boolean(const void *correlator, const char *key, bool value) +void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value) { } -void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) +void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len) { } -void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) +void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value) { } -void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) +void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value) { } -void dx_agent_value_complete(const void *correlator, bool more) +void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more) { } -void *dx_agent_raise_event(dx_agent_class_t *event) +void *dx_agent_raise_event(dx_dispatch_t *dx, dx_agent_class_t *event) { return 0; } diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index 68e2afa3eb..0b31ab8ab5 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -19,7 +19,9 @@ #include <stdio.h> #include <string.h> +#include "dispatch_private.h" #include <qpid/dispatch/container.h> +#include <qpid/dispatch/server.h> #include <qpid/dispatch/message.h> #include <proton/engine.h> #include <proton/message.h> @@ -31,7 +33,10 @@ static char *module="CONTAINER"; +typedef struct dx_container_t dx_container_t; + struct dx_node_t { + dx_container_t *container; const dx_node_type_t *ntype; char *name; void *context; @@ -52,22 +57,25 @@ struct dx_link_t { ALLOC_DECLARE(dx_link_t); ALLOC_DEFINE(dx_link_t); -typedef struct nxc_node_type_t { - DEQ_LINKS(struct nxc_node_type_t); +typedef struct dxc_node_type_t { + DEQ_LINKS(struct dxc_node_type_t); const dx_node_type_t *ntype; -} nxc_node_type_t; -DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t); +} dxc_node_type_t; +DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t); -static hash_t *node_type_map; -static hash_t *node_map; -static sys_mutex_t *lock; -static dx_node_t *default_node; -static nxc_node_type_list_t node_type_list; +struct dx_container_t { + dx_server_t *server; + hash_t *node_type_map; + hash_t *node_map; + sys_mutex_t *lock; + dx_node_t *default_node; + dxc_node_type_list_t node_type_list; +}; -static void setup_outgoing_link(pn_link_t *pn_link) +static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link) { - sys_mutex_lock(lock); + sys_mutex_lock(container->lock); dx_node_t *node; int result; const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); @@ -76,15 +84,15 @@ static void setup_outgoing_link(pn_link_t *pn_link) if (source) { iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); - result = hash_retrieve(node_map, iter, (void*) &node); + result = hash_retrieve(container->node_map, iter, (void*) &node); dx_field_iterator_free(iter); } else result = -1; - sys_mutex_unlock(lock); + sys_mutex_unlock(container->lock); if (result < 0) { - if (default_node) - node = default_node; + if (container->default_node) + node = container->default_node; else { // Reject the link // TODO - When the API allows, add an error message for "no available node" @@ -108,9 +116,9 @@ static void setup_outgoing_link(pn_link_t *pn_link) } -static void setup_incoming_link(pn_link_t *pn_link) +static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link) { - sys_mutex_lock(lock); + sys_mutex_lock(container->lock); dx_node_t *node; int result; const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); @@ -119,15 +127,15 @@ static void setup_incoming_link(pn_link_t *pn_link) if (target) { iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); - result = hash_retrieve(node_map, iter, (void*) &node); + result = hash_retrieve(container->node_map, iter, (void*) &node); dx_field_iterator_free(iter); } else result = -1; - sys_mutex_unlock(lock); + sys_mutex_unlock(container->lock); if (result < 0) { - if (default_node) - node = default_node; + if (container->default_node) + node = container->default_node; else { // Reject the link // TODO - When the API allows, add an error message for "no available node" @@ -248,7 +256,7 @@ static int close_handler(void* unused, pn_connection_t *conn) } -static int process_handler(void* unused, pn_connection_t *conn) +static int process_handler(dx_container_t *container, void* unused, pn_connection_t *conn) { pn_session_t *ssn; pn_link_t *pn_link; @@ -276,9 +284,9 @@ static int process_handler(void* unused, pn_connection_t *conn) pn_link = pn_link_head(conn, PN_LOCAL_UNINIT); while (pn_link) { if (pn_link_is_sender(pn_link)) - setup_outgoing_link(pn_link); + setup_outgoing_link(container, pn_link); else - setup_incoming_link(pn_link); + setup_incoming_link(container, pn_link); pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT); event_count++; } @@ -348,7 +356,7 @@ static int process_handler(void* unused, pn_connection_t *conn) } -static void open_handler(dx_connection_t *conn, dx_direction_t dir) +static void open_handler(dx_container_t *container, dx_connection_t *conn, dx_direction_t dir) { const dx_node_type_t *nt; @@ -357,9 +365,9 @@ static void open_handler(dx_connection_t *conn, dx_direction_t dir) // this particular list is only ever appended to and never has items inserted or deleted, // this usage is safe in this case. // - sys_mutex_lock(lock); - nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list); - sys_mutex_unlock(lock); + sys_mutex_lock(container->lock); + dxc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list); + sys_mutex_unlock(container->lock); pn_connection_open(dx_connection_pn(conn)); @@ -373,59 +381,70 @@ static void open_handler(dx_connection_t *conn, dx_direction_t dir) nt->outbound_conn_open_handler(nt->type_context, conn); } - sys_mutex_lock(lock); + sys_mutex_lock(container->lock); nt_item = DEQ_NEXT(nt_item); - sys_mutex_unlock(lock); + sys_mutex_unlock(container->lock); } } -static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn) +static int handler(void *handler_context, void *conn_context, dx_conn_event_t event, dx_connection_t *dx_conn) { - pn_connection_t *conn = dx_connection_pn(dx_conn); + dx_container_t *container = (dx_container_t*) handler_context; + pn_connection_t *conn = dx_connection_pn(dx_conn); switch (event) { - case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break; - case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break; - case DX_CONN_EVENT_CLOSE: return close_handler(context, conn); - case DX_CONN_EVENT_PROCESS: return process_handler(context, conn); + case DX_CONN_EVENT_LISTENER_OPEN: open_handler(container, dx_conn, DX_INCOMING); break; + case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, dx_conn, DX_OUTGOING); break; + case DX_CONN_EVENT_CLOSE: return close_handler(conn_context, conn); + case DX_CONN_EVENT_PROCESS: return process_handler(container, conn_context, conn); } return 0; } -void dx_container_initialize(void) +dx_container_t *dx_container(dx_dispatch_t *dx) { - dx_log(module, LOG_TRACE, "Container Initializing"); + dx_container_t *container = NEW(dx_container_t); + + container->server = dx->server; + container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 + container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 + container->lock = sys_mutex(); + container->default_node = 0; + DEQ_INIT(container->node_type_list); - node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 - node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 - lock = sys_mutex(); - default_node = 0; - DEQ_INIT(node_type_list); + dx_log(module, LOG_TRACE, "Container Initializing"); + dx_server_set_conn_handler(dx, handler, container); - dx_server_set_conn_handler(handler); + return container; } -void dx_container_finalize(void) +void dx_container_free(dx_container_t *container) { + // TODO - Free the nodes + // TODO - Free the node types + sys_mutex_free(container->lock); + free(container); } -int dx_container_register_node_type(const dx_node_type_t *nt) +int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt) { + dx_container_t *container = dx->container; + int result; dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); - nxc_node_type_t *nt_item = NEW(nxc_node_type_t); + dxc_node_type_t *nt_item = NEW(dxc_node_type_t); DEQ_ITEM_INIT(nt_item); nt_item->ntype = nt; - sys_mutex_lock(lock); - result = hash_insert_const(node_type_map, iter, nt); - DEQ_INSERT_TAIL(node_type_list, nt_item); - sys_mutex_unlock(lock); + sys_mutex_lock(container->lock); + result = hash_insert_const(container->node_type_map, iter, nt); + DEQ_INSERT_TAIL(container->node_type_list, nt_item); + sys_mutex_unlock(container->lock); dx_field_iterator_free(iter); if (result < 0) @@ -436,34 +455,40 @@ int dx_container_register_node_type(const dx_node_type_t *nt) } -void dx_container_set_default_node_type(const dx_node_type_t *nt, +void dx_container_set_default_node_type(dx_dispatch_t *dx, + const dx_node_type_t *nt, void *context, dx_dist_mode_t supported_dist) { - if (default_node) - dx_container_destroy_node(default_node); + dx_container_t *container = dx->container; + + if (container->default_node) + dx_container_destroy_node(container->default_node); if (nt) { - default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT); + container->default_node = dx_container_create_node(dx, nt, 0, context, supported_dist, DX_LIFE_PERMANENT); dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); } else { - default_node = 0; + container->default_node = 0; dx_log(module, LOG_TRACE, "Default node removed"); } } -dx_node_t *dx_container_create_node(const dx_node_type_t *nt, +dx_node_t *dx_container_create_node(dx_dispatch_t *dx, + const dx_node_type_t *nt, const char *name, void *context, dx_dist_mode_t supported_dist, dx_lifetime_policy_t life_policy) { + dx_container_t *container = dx->container; int result; dx_node_t *node = new_dx_node_t(); if (!node) return 0; + node->container = container; node->ntype = nt; node->name = 0; node->context = context; @@ -472,9 +497,9 @@ dx_node_t *dx_container_create_node(const dx_node_type_t *nt, if (name) { dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); - sys_mutex_lock(lock); - result = hash_insert(node_map, iter, node); - sys_mutex_unlock(lock); + sys_mutex_lock(container->lock); + result = hash_insert(container->node_map, iter, node); + sys_mutex_unlock(container->lock); dx_field_iterator_free(iter); if (result < 0) { free_dx_node_t(node); @@ -494,11 +519,13 @@ dx_node_t *dx_container_create_node(const dx_node_type_t *nt, void dx_container_destroy_node(dx_node_t *node) { + dx_container_t *container = node->container; + if (node->name) { dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL); - sys_mutex_lock(lock); - hash_remove(node_map, iter); - sys_mutex_unlock(lock); + sys_mutex_lock(container->lock); + hash_remove(container->node_map, iter); + sys_mutex_unlock(container->lock); dx_field_iterator_free(iter); free(node->name); } diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c new file mode 100644 index 0000000000..87eb535b8a --- /dev/null +++ b/qpid/extras/dispatch/src/dispatch.c @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch.h> +#include "dispatch_private.h" +#include "alloc_private.h" + +/** + * Private Function Prototypes + */ +dx_server_t *dx_server(int tc); +void dx_server_free(dx_server_t *server); +dx_container_t *dx_container(dx_dispatch_t *dx); +void dx_container_free(dx_container_t *container); +dx_router_t *dx_router(dx_dispatch_t *dx); +void dx_router_free(dx_router_t *router); +dx_agent_t *dx_agent(dx_dispatch_t *dx); +void dx_agent_free(dx_agent_t *agent); + + +dx_dispatch_t *dx_dispatch(int thread_count) +{ + dx_dispatch_t *dx = NEW(dx_dispatch_t); + + dx_alloc_initialize(); + + dx->server = dx_server(thread_count); + dx->container = dx_container(dx); + dx->router = dx_router(dx); + dx->agent = dx_agent(dx); + + return dx; +} + + +void dx_dispatch_free(dx_dispatch_t *dx) +{ + dx_agent_free(dx->agent); + dx_router_free(dx->router); + dx_container_free(dx->container); + dx_server_free(dx->server); +} + diff --git a/qpid/extras/dispatch/src/dispatch_private.h b/qpid/extras/dispatch/src/dispatch_private.h new file mode 100644 index 0000000000..0e8d7aa826 --- /dev/null +++ b/qpid/extras/dispatch/src/dispatch_private.h @@ -0,0 +1,35 @@ +#ifndef __dispatch_private_h__ +#define __dispatch_private_h__ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct dx_server_t dx_server_t; +typedef struct dx_container_t dx_container_t; +typedef struct dx_router_t dx_router_t; +typedef struct dx_agent_t dx_agent_t; + +struct dx_dispatch_t { + dx_server_t *server; + dx_container_t *container; + dx_router_t *router; + dx_agent_t *agent; +}; + +#endif + diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 6ddc8f45dd..0513b08a6b 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -18,19 +18,13 @@ */ #include <stdio.h> -#include <qpid/dispatch/server.h> -#include <qpid/dispatch/message.h> -#include <qpid/dispatch/threading.h> -#include <qpid/dispatch/timer.h> -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/hash.h> -#include <qpid/dispatch/iterator.h> -#include <qpid/dispatch/log.h> -#include <qpid/dispatch/router.h> +#include <qpid/dispatch.h> +#include "dispatch_private.h" static char *module="ROUTER_NODE"; struct dx_router_t { + dx_dispatch_t *dx; dx_node_t *node; dx_link_list_t in_links; dx_link_list_t out_links; @@ -389,23 +383,24 @@ static dx_node_type_t router_node = {"router", 0, 0, static int type_registered = 0; -dx_router_t *dx_router(dx_router_configuration_t *config) +dx_router_t *dx_router(dx_dispatch_t *dx) { if (!type_registered) { type_registered = 1; - dx_container_register_node_type(&router_node); + dx_container_register_node_type(dx, &router_node); } dx_router_t *router = NEW(dx_router_t); - dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH); + dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); DEQ_INIT(router->in_links); DEQ_INIT(router->out_links); DEQ_INIT(router->in_fifo); + router->dx = dx; router->lock = sys_mutex(); - router->timer = dx_timer(dx_router_timer_handler, (void*) router); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); dx_timer_schedule(router->timer, 0); // Immediate router->out_hash = hash(10, 32, 0); @@ -417,7 +412,7 @@ dx_router_t *dx_router(dx_router_configuration_t *config) void dx_router_free(dx_router_t *router) { - dx_container_set_default_node_type(0, 0, DX_DIST_BOTH); + dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH); sys_mutex_free(router->lock); free(router); } diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c index e5e521b47e..a2d2d4980a 100644 --- a/qpid/extras/dispatch/src/server.c +++ b/qpid/extras/dispatch/src/server.c @@ -23,16 +23,17 @@ #include "server_private.h" #include "timer_private.h" #include "alloc_private.h" +#include "dispatch_private.h" #include "auth.h" #include "work_queue.h" #include <stdio.h> #include <time.h> -#include <signal.h> static char *module="SERVER"; -static __thread int server_thread = 0; +static __thread dx_server_t *thread_server = 0; typedef struct dx_thread_t { + dx_server_t *dx_server; int thread_id; volatile int running; volatile int canceled; @@ -41,16 +42,14 @@ typedef struct dx_thread_t { } dx_thread_t; -typedef struct dx_server_t { +struct dx_server_t { int thread_count; pn_driver_t *driver; dx_thread_start_cb_t start_handler; dx_conn_handler_cb_t conn_handler; - dx_signal_handler_cb_t signal_handler; dx_user_fd_handler_cb_t ufd_handler; void *start_context; - void *conn_context; - void *signal_context; + void *conn_handler_context; sys_cond_t *cond; sys_mutex_t *lock; dx_thread_t **threads; @@ -62,8 +61,12 @@ typedef struct dx_server_t { int threads_paused; int pause_next_sequence; int pause_now_serving; + dx_signal_handler_cb_t signal_handler; + void *signal_context; int pending_signal; -} dx_server_t; +}; + + ALLOC_DEFINE(dx_listener_t); @@ -72,25 +75,13 @@ ALLOC_DEFINE(dx_connection_t); ALLOC_DEFINE(dx_user_fd_t); -/** - * Singleton Concurrent Proton Driver object - */ -static dx_server_t *dx_server = 0; - - -static void signal_handler(int signum) -{ - dx_server->pending_signal = signum; - sys_cond_signal_all(dx_server->cond); -} - - -static dx_thread_t *thread(int id) +static dx_thread_t *thread(dx_server_t *dx_server, int id) { dx_thread_t *thread = NEW(dx_thread_t); if (!thread) return 0; + thread->dx_server = dx_server; thread->thread_id = id; thread->running = 0; thread->canceled = 0; @@ -126,7 +117,7 @@ static void thread_process_listeners(pn_driver_t *driver) } -static void handle_signals_LH(void) +static void handle_signals_LH(dx_server_t *dx_server) { int signum = dx_server->pending_signal; @@ -141,7 +132,7 @@ static void handle_signals_LH(void) } -static void block_if_paused_LH(void) +static void block_if_paused_LH(dx_server_t *dx_server) { if (dx_server->pause_requests > 0) { dx_server->threads_paused++; @@ -153,7 +144,7 @@ static void block_if_paused_LH(void) } -static void process_connector(pn_connector_t *cxtr) +static void process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) { dx_connection_t *ctx = pn_connector_context(cxtr); int events = 0; @@ -225,19 +216,20 @@ static void process_connector(pn_connector_t *cxtr) } else assert(0); - dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + dx_server->conn_handler(dx_server->conn_handler_context, + ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); events = 1; break; case CONN_STATE_OPERATIONAL: if (pn_connector_closed(cxtr)) { - dx_server->conn_handler(ctx->context, + dx_server->conn_handler(dx_server->conn_handler_context, ctx->context, DX_CONN_EVENT_CLOSE, (dx_connection_t*) pn_connector_context(cxtr)); events = 0; } else - events = dx_server->conn_handler(ctx->context, + events = dx_server->conn_handler(dx_server->conn_handler_context, ctx->context, DX_CONN_EVENT_PROCESS, (dx_connection_t*) pn_connector_context(cxtr)); break; @@ -261,7 +253,8 @@ void pn_driver_wait_3(pn_driver_t *d); static void *thread_run(void *arg) { - dx_thread_t *thread = (dx_thread_t*) arg; + dx_thread_t *thread = (dx_thread_t*) arg; + dx_server_t *dx_server = thread->dx_server; pn_connector_t *work; pn_connection_t *conn; dx_connection_t *ctx; @@ -272,7 +265,7 @@ static void *thread_run(void *arg) if (!thread) return 0; - server_thread = 1; + thread_server = dx_server; thread->running = 1; if (thread->canceled) @@ -294,7 +287,7 @@ static void *thread_run(void *arg) // // Check for pending signals to process // - handle_signals_LH(); + handle_signals_LH(dx_server); if (!thread->running) { sys_mutex_unlock(dx_server->lock); break; @@ -303,7 +296,7 @@ static void *thread_run(void *arg) // // Check to see if the server is pausing. If so, block here. // - block_if_paused_LH(); + block_if_paused_LH(dx_server); if (!thread->running) { sys_mutex_unlock(dx_server->lock); break; @@ -450,7 +443,7 @@ static void *thread_run(void *arg) // Process the connector that we now have exclusive access to. // if (work) { - process_connector(work); + process_connector(dx_server, work); // // Check to see if the connector was closed during processing @@ -540,6 +533,7 @@ static void cxtr_try_open(void *context) return; dx_connection_t *ctx = new_dx_connection_t(); + ctx->server = ct->server; ctx->state = CONN_STATE_CONNECTING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; @@ -553,9 +547,9 @@ static void cxtr_try_open(void *context) // // pn_connector is not thread safe // - sys_mutex_lock(dx_server->lock); - ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx); - sys_mutex_unlock(dx_server->lock); + sys_mutex_lock(ct->server->lock); + ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, ct->config->port, (void*) ctx); + sys_mutex_unlock(ct->server->lock); ct->ctx = ctx; ct->delay = 5000; @@ -563,18 +557,13 @@ static void cxtr_try_open(void *context) } -void dx_server_initialize(int thread_count) +dx_server_t *dx_server(int thread_count) { int i; - if (dx_server) - return; // TODO - Fail in a more dramatic way - - dx_alloc_initialize(); - dx_server = NEW(dx_server_t); - - if (!dx_server) - return; // TODO - Fail in a more dramatic way + dx_server_t *dx_server = NEW(dx_server_t); + if (dx_server == 0) + return 0; dx_server->thread_count = thread_count; dx_server->driver = pn_driver(); @@ -591,7 +580,7 @@ void dx_server_initialize(int thread_count) dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); for (i = 0; i < thread_count; i++) - dx_server->threads[i] = thread(i); + dx_server->threads[i] = thread(dx_server, i); dx_server->work_queue = work_queue(); DEQ_INIT(dx_server->pending_timers); @@ -602,10 +591,12 @@ void dx_server_initialize(int thread_count) dx_server->pause_next_sequence = 0; dx_server->pause_now_serving = 0; dx_server->pending_signal = 0; + + return dx_server; } -void dx_server_finalize(void) +void dx_server_free(dx_server_t *dx_server) { int i; if (!dx_server) @@ -620,38 +611,40 @@ void dx_server_finalize(void) sys_mutex_free(dx_server->lock); sys_cond_free(dx_server->cond); free(dx_server); - dx_server = 0; } -void dx_server_set_conn_handler(dx_conn_handler_cb_t handler) +void dx_server_set_conn_handler(dx_dispatch_t *dx, dx_conn_handler_cb_t handler, void *handler_context) { - dx_server->conn_handler = handler; + dx->server->conn_handler = handler; + dx->server->conn_handler_context = handler_context; } -void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context) +void dx_server_set_signal_handler(dx_dispatch_t *dx, dx_signal_handler_cb_t handler, void *context) { - dx_server->signal_handler = handler; - dx_server->signal_context = context; + dx->server->signal_handler = handler; + dx->server->signal_context = context; } -void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context) +void dx_server_set_start_handler(dx_dispatch_t *dx, dx_thread_start_cb_t handler, void *context) { - dx_server->start_handler = handler; - dx_server->start_context = context; + dx->server->start_handler = handler; + dx->server->start_context = context; } -void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler) +void dx_server_set_user_fd_handler(dx_dispatch_t *dx, dx_user_fd_handler_cb_t ufd_handler) { - dx_server->ufd_handler = ufd_handler; + dx->server->ufd_handler = ufd_handler; } -void dx_server_run(void) +void dx_server_run(dx_dispatch_t *dx) { + dx_server_t *dx_server = dx->server; + int i; if (!dx_server) return; @@ -672,9 +665,11 @@ void dx_server_run(void) } -void dx_server_start(void) +void dx_server_start(dx_dispatch_t *dx) { + dx_server_t *dx_server = dx->server; int i; + if (!dx_server) return; @@ -687,8 +682,9 @@ void dx_server_start(void) } -void dx_server_stop(void) +void dx_server_stop(dx_dispatch_t *dx) { + dx_server_t *dx_server = dx->server; int idx; sys_mutex_lock(dx_server->lock); @@ -698,7 +694,7 @@ void dx_server_stop(void) pn_driver_wakeup(dx_server->driver); sys_mutex_unlock(dx_server->lock); - if (!server_thread) { + if (thread_server != dx_server) { for (idx = 0; idx < dx_server->thread_count; idx++) thread_join(dx_server->threads[idx]); dx_log(module, LOG_INFO, "Shut Down"); @@ -706,14 +702,19 @@ void dx_server_stop(void) } -void dx_server_signal(int signum) +void dx_server_signal(dx_dispatch_t *dx, int signum) { - signal(signum, signal_handler); + dx_server_t *dx_server = dx->server; + + dx_server->pending_signal = signum; + sys_cond_signal_all(dx_server->cond); } -void dx_server_pause(void) +void dx_server_pause(dx_dispatch_t *dx) { + dx_server_t *dx_server = dx->server; + sys_mutex_lock(dx_server->lock); // @@ -741,8 +742,10 @@ void dx_server_pause(void) } -void dx_server_resume(void) +void dx_server_resume(dx_dispatch_t *dx) { + dx_server_t *dx_server = dx->server; + sys_mutex_lock(dx_server->lock); dx_server->pause_requests--; dx_server->pause_now_serving++; @@ -783,13 +786,15 @@ pn_connection_t *dx_connection_pn(dx_connection_t *conn) } -dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context) +dx_listener_t *dx_server_listen(dx_dispatch_t *dx, const dx_server_config_t *config, void *context) { - dx_listener_t *li = new_dx_listener_t(); + dx_server_t *dx_server = dx->server; + dx_listener_t *li = new_dx_listener_t(); if (!li) return 0; + li->server = dx_server; li->config = config; li->context = context; li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); @@ -819,18 +824,20 @@ void dx_server_listener_close(dx_listener_t* li) } -dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context) +dx_connector_t *dx_server_connect(dx_dispatch_t *dx, const dx_server_config_t *config, void *context) { - dx_connector_t *ct = new_dx_connector_t(); + dx_server_t *dx_server = dx->server; + dx_connector_t *ct = new_dx_connector_t(); if (!ct) return 0; + ct->server = dx_server; ct->state = CXTR_STATE_CONNECTING; ct->config = config; ct->context = context; ct->ctx = 0; - ct->timer = dx_timer(cxtr_try_open, (void*) ct); + ct->timer = dx_timer(dx, cxtr_try_open, (void*) ct); ct->delay = 0; dx_timer_schedule(ct->timer, ct->delay); @@ -853,14 +860,16 @@ void dx_server_connector_free(dx_connector_t* ct) } -dx_user_fd_t *dx_user_fd(int fd, void *context) +dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context) { - dx_user_fd_t *ufd = new_dx_user_fd_t(); + dx_server_t *dx_server = dx->server; + dx_user_fd_t *ufd = new_dx_user_fd_t(); if (!ufd) return 0; dx_connection_t *ctx = new_dx_connection_t(); + ctx->server = dx_server; ctx->state = CONN_STATE_USER; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; @@ -872,6 +881,7 @@ dx_user_fd_t *dx_user_fd(int fd, void *context) ctx->ufd = ufd; ufd->context = context; + ufd->server = dx_server; ufd->fd = fd; ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); pn_driver_wakeup(dx_server->driver); @@ -890,14 +900,14 @@ void dx_user_fd_free(dx_user_fd_t *ufd) void dx_user_fd_activate_read(dx_user_fd_t *ufd) { pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); - pn_driver_wakeup(dx_server->driver); + pn_driver_wakeup(ufd->server->driver); } void dx_user_fd_activate_write(dx_user_fd_t *ufd) { pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); - pn_driver_wakeup(dx_server->driver); + pn_driver_wakeup(ufd->server->driver); } @@ -915,12 +925,12 @@ bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) void dx_server_timer_pending_LH(dx_timer_t *timer) { - DEQ_INSERT_TAIL(dx_server->pending_timers, timer); + DEQ_INSERT_TAIL(timer->server->pending_timers, timer); } void dx_server_timer_cancel_LH(dx_timer_t *timer) { - DEQ_REMOVE(dx_server->pending_timers, timer); + DEQ_REMOVE(timer->server->pending_timers, timer); } diff --git a/qpid/extras/dispatch/src/server_private.h b/qpid/extras/dispatch/src/server_private.h index 1722175e35..db61090324 100644 --- a/qpid/extras/dispatch/src/server_private.h +++ b/qpid/extras/dispatch/src/server_private.h @@ -48,8 +48,10 @@ typedef enum { CXTR_STATE_FAILED } cxtr_state_t; +typedef struct dx_server_t dx_server_t; struct dx_listener_t { + dx_server_t *server; const dx_server_config_t *config; void *context; pn_listener_t *pn_listener; @@ -57,6 +59,7 @@ struct dx_listener_t { struct dx_connector_t { + dx_server_t *server; cxtr_state_t state; const dx_server_config_t *config; void *context; @@ -67,6 +70,7 @@ struct dx_connector_t { struct dx_connection_t { + dx_server_t *server; conn_state_t state; int owner_thread; int enqueued; @@ -81,6 +85,7 @@ struct dx_connection_t { struct dx_user_fd_t { + dx_server_t *server; void *context; int fd; pn_connector_t *pn_conn; diff --git a/qpid/extras/dispatch/src/timer.c b/qpid/extras/dispatch/src/timer.c index b6b4864e26..cb957e8400 100644 --- a/qpid/extras/dispatch/src/timer.c +++ b/qpid/extras/dispatch/src/timer.c @@ -19,6 +19,7 @@ #include "timer_private.h" #include "server_private.h" +#include "dispatch_private.h" #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/alloc.h> @@ -67,7 +68,7 @@ static void dx_timer_cancel_LH(dx_timer_t *timer) // Public Functions from timer.h //========================================================================= -dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context) +dx_timer_t *dx_timer(dx_dispatch_t *dx, dx_timer_cb_t cb, void* context) { dx_timer_t *timer = new_dx_timer_t(); if (!timer) @@ -75,6 +76,7 @@ dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context) DEQ_ITEM_INIT(timer); + timer->server = dx ? dx->server : 0; timer->handler = cb; timer->context = context; timer->delta_time = 0; diff --git a/qpid/extras/dispatch/src/timer_private.h b/qpid/extras/dispatch/src/timer_private.h index 618297b18e..905a8f5bd1 100644 --- a/qpid/extras/dispatch/src/timer_private.h +++ b/qpid/extras/dispatch/src/timer_private.h @@ -22,6 +22,7 @@ #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/timer.h> #include <qpid/dispatch/threading.h> +#include "server_private.h" typedef enum { TIMER_FREE, @@ -33,6 +34,7 @@ typedef enum { struct dx_timer_t { DEQ_LINKS(dx_timer_t); + dx_server_t *server; dx_timer_cb_t handler; void *context; long delta_time; |
