summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-03-07 00:32:32 +0000
committerTed Ross <tross@apache.org>2013-03-07 00:32:32 +0000
commitd507fcd7d13652075b812b1761bf795a50c54e18 (patch)
treee7ba848b7011b5a0a9d5a1ac28994827436d390b /qpid/extras/dispatch/src
parentb154cdb711e8f93a2da51a1a10fa2c244b946748 (diff)
downloadqpid-python-d507fcd7d13652075b812b1761bf795a50c54e18.tar.gz
QPID-4612 - Major cleanup in the API.
Removed the singleton patterns. Added a single header file for all of Dispatch. Doxygen comments still need to be updated. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1453628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
-rw-r--r--qpid/extras/dispatch/src/agent.c48
-rw-r--r--qpid/extras/dispatch/src/container.c153
-rw-r--r--qpid/extras/dispatch/src/dispatch.c59
-rw-r--r--qpid/extras/dispatch/src/dispatch_private.h35
-rw-r--r--qpid/extras/dispatch/src/router_node.c23
-rw-r--r--qpid/extras/dispatch/src/server.c162
-rw-r--r--qpid/extras/dispatch/src/server_private.h5
-rw-r--r--qpid/extras/dispatch/src/timer.c4
-rw-r--r--qpid/extras/dispatch/src/timer_private.h2
9 files changed, 316 insertions, 175 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index a885042b45..864d58fadc 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -17,6 +17,7 @@
* under the License.
*/
+#include "dispatch_private.h"
#include <qpid/dispatch/agent.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
@@ -28,6 +29,7 @@
typedef struct dx_agent_t {
+ dx_server_t *server;
hash_t *class_hash;
dx_message_list_t in_fifo;
dx_message_list_t out_fifo;
@@ -35,8 +37,6 @@ typedef struct dx_agent_t {
dx_timer_t *timer;
} dx_agent_t;
-static dx_agent_t *agent = 0;
-
struct dx_agent_class_t {
char *fqname;
@@ -52,33 +52,37 @@ static void dx_agent_timer_handler(void *context)
}
-void dx_agent_initialize()
+dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
- assert(!agent);
- agent = NEW(dx_agent_t);
+ dx_agent_t *agent = NEW(dx_agent_t);
+ agent->server = dx->server;
agent->class_hash = hash(6, 10, 1);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
- agent->timer = dx_timer(dx_agent_timer_handler, agent);
+ agent->timer = dx_timer(dx, dx_agent_timer_handler, agent);
+
+ return agent;
}
-void dx_agent_finalize(void)
+void dx_agent_free(dx_agent_t *agent)
{
sys_mutex_free(agent->lock);
dx_timer_free(agent->timer);
hash_free(agent->class_hash);
free(agent);
- agent = 0;
}
-dx_agent_class_t *dx_agent_register_class(const char *fqname,
+dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx,
+ const char *fqname,
void *context,
dx_agent_schema_cb_t schema_handler,
dx_agent_query_cb_t query_handler)
{
+ dx_agent_t *agent = dx->agent;
+
dx_agent_class_t *cls = NEW(dx_agent_class_t);
assert(cls);
cls->fqname = (char*) malloc(strlen(fqname) + 1);
@@ -90,61 +94,63 @@ dx_agent_class_t *dx_agent_register_class(const char *fqname,
dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
int result = hash_insert_const(agent->class_hash, iter, cls);
dx_field_iterator_free(iter);
- assert(result >= 0);
+ if (result < 0)
+ assert(false);
return cls;
}
-dx_agent_class_t *dx_agent_register_event(const char *fqname,
+dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx,
+ const char *fqname,
void *context,
dx_agent_schema_cb_t schema_handler)
{
- return dx_agent_register_class(fqname, context, schema_handler, 0);
+ return dx_agent_register_class(dx, fqname, context, schema_handler, 0);
}
-void dx_agent_value_string(const void *correlator, const char *key, const char *value)
+void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value)
{
}
-void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value)
+void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value)
{
}
-void dx_agent_value_null(const void *correlator, const char *key)
+void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key)
{
}
-void dx_agent_value_boolean(const void *correlator, const char *key, bool value)
+void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value)
{
}
-void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len)
+void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len)
{
}
-void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value)
+void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value)
{
}
-void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value)
+void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value)
{
}
-void dx_agent_value_complete(const void *correlator, bool more)
+void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more)
{
}
-void *dx_agent_raise_event(dx_agent_class_t *event)
+void *dx_agent_raise_event(dx_dispatch_t *dx, dx_agent_class_t *event)
{
return 0;
}
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 68e2afa3eb..0b31ab8ab5 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -19,7 +19,9 @@
#include <stdio.h>
#include <string.h>
+#include "dispatch_private.h"
#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/server.h>
#include <qpid/dispatch/message.h>
#include <proton/engine.h>
#include <proton/message.h>
@@ -31,7 +33,10 @@
static char *module="CONTAINER";
+typedef struct dx_container_t dx_container_t;
+
struct dx_node_t {
+ dx_container_t *container;
const dx_node_type_t *ntype;
char *name;
void *context;
@@ -52,22 +57,25 @@ struct dx_link_t {
ALLOC_DECLARE(dx_link_t);
ALLOC_DEFINE(dx_link_t);
-typedef struct nxc_node_type_t {
- DEQ_LINKS(struct nxc_node_type_t);
+typedef struct dxc_node_type_t {
+ DEQ_LINKS(struct dxc_node_type_t);
const dx_node_type_t *ntype;
-} nxc_node_type_t;
-DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);
+} dxc_node_type_t;
+DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t);
-static hash_t *node_type_map;
-static hash_t *node_map;
-static sys_mutex_t *lock;
-static dx_node_t *default_node;
-static nxc_node_type_list_t node_type_list;
+struct dx_container_t {
+ dx_server_t *server;
+ hash_t *node_type_map;
+ hash_t *node_map;
+ sys_mutex_t *lock;
+ dx_node_t *default_node;
+ dxc_node_type_list_t node_type_list;
+};
-static void setup_outgoing_link(pn_link_t *pn_link)
+static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link)
{
- sys_mutex_lock(lock);
+ sys_mutex_lock(container->lock);
dx_node_t *node;
int result;
const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
@@ -76,15 +84,15 @@ static void setup_outgoing_link(pn_link_t *pn_link)
if (source) {
iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
- result = hash_retrieve(node_map, iter, (void*) &node);
+ result = hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
} else
result = -1;
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(container->lock);
if (result < 0) {
- if (default_node)
- node = default_node;
+ if (container->default_node)
+ node = container->default_node;
else {
// Reject the link
// TODO - When the API allows, add an error message for "no available node"
@@ -108,9 +116,9 @@ static void setup_outgoing_link(pn_link_t *pn_link)
}
-static void setup_incoming_link(pn_link_t *pn_link)
+static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link)
{
- sys_mutex_lock(lock);
+ sys_mutex_lock(container->lock);
dx_node_t *node;
int result;
const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
@@ -119,15 +127,15 @@ static void setup_incoming_link(pn_link_t *pn_link)
if (target) {
iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
- result = hash_retrieve(node_map, iter, (void*) &node);
+ result = hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
} else
result = -1;
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(container->lock);
if (result < 0) {
- if (default_node)
- node = default_node;
+ if (container->default_node)
+ node = container->default_node;
else {
// Reject the link
// TODO - When the API allows, add an error message for "no available node"
@@ -248,7 +256,7 @@ static int close_handler(void* unused, pn_connection_t *conn)
}
-static int process_handler(void* unused, pn_connection_t *conn)
+static int process_handler(dx_container_t *container, void* unused, pn_connection_t *conn)
{
pn_session_t *ssn;
pn_link_t *pn_link;
@@ -276,9 +284,9 @@ static int process_handler(void* unused, pn_connection_t *conn)
pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
while (pn_link) {
if (pn_link_is_sender(pn_link))
- setup_outgoing_link(pn_link);
+ setup_outgoing_link(container, pn_link);
else
- setup_incoming_link(pn_link);
+ setup_incoming_link(container, pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
event_count++;
}
@@ -348,7 +356,7 @@ static int process_handler(void* unused, pn_connection_t *conn)
}
-static void open_handler(dx_connection_t *conn, dx_direction_t dir)
+static void open_handler(dx_container_t *container, dx_connection_t *conn, dx_direction_t dir)
{
const dx_node_type_t *nt;
@@ -357,9 +365,9 @@ static void open_handler(dx_connection_t *conn, dx_direction_t dir)
// this particular list is only ever appended to and never has items inserted or deleted,
// this usage is safe in this case.
//
- sys_mutex_lock(lock);
- nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
- sys_mutex_unlock(lock);
+ sys_mutex_lock(container->lock);
+ dxc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list);
+ sys_mutex_unlock(container->lock);
pn_connection_open(dx_connection_pn(conn));
@@ -373,59 +381,70 @@ static void open_handler(dx_connection_t *conn, dx_direction_t dir)
nt->outbound_conn_open_handler(nt->type_context, conn);
}
- sys_mutex_lock(lock);
+ sys_mutex_lock(container->lock);
nt_item = DEQ_NEXT(nt_item);
- sys_mutex_unlock(lock);
+ sys_mutex_unlock(container->lock);
}
}
-static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn)
+static int handler(void *handler_context, void *conn_context, dx_conn_event_t event, dx_connection_t *dx_conn)
{
- pn_connection_t *conn = dx_connection_pn(dx_conn);
+ dx_container_t *container = (dx_container_t*) handler_context;
+ pn_connection_t *conn = dx_connection_pn(dx_conn);
switch (event) {
- case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break;
- case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break;
- case DX_CONN_EVENT_CLOSE: return close_handler(context, conn);
- case DX_CONN_EVENT_PROCESS: return process_handler(context, conn);
+ case DX_CONN_EVENT_LISTENER_OPEN: open_handler(container, dx_conn, DX_INCOMING); break;
+ case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, dx_conn, DX_OUTGOING); break;
+ case DX_CONN_EVENT_CLOSE: return close_handler(conn_context, conn);
+ case DX_CONN_EVENT_PROCESS: return process_handler(container, conn_context, conn);
}
return 0;
}
-void dx_container_initialize(void)
+dx_container_t *dx_container(dx_dispatch_t *dx)
{
- dx_log(module, LOG_TRACE, "Container Initializing");
+ dx_container_t *container = NEW(dx_container_t);
+
+ container->server = dx->server;
+ container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
+ container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
+ container->lock = sys_mutex();
+ container->default_node = 0;
+ DEQ_INIT(container->node_type_list);
- node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
- node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
- lock = sys_mutex();
- default_node = 0;
- DEQ_INIT(node_type_list);
+ dx_log(module, LOG_TRACE, "Container Initializing");
+ dx_server_set_conn_handler(dx, handler, container);
- dx_server_set_conn_handler(handler);
+ return container;
}
-void dx_container_finalize(void)
+void dx_container_free(dx_container_t *container)
{
+ // TODO - Free the nodes
+ // TODO - Free the node types
+ sys_mutex_free(container->lock);
+ free(container);
}
-int dx_container_register_node_type(const dx_node_type_t *nt)
+int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt)
{
+ dx_container_t *container = dx->container;
+
int result;
dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
- nxc_node_type_t *nt_item = NEW(nxc_node_type_t);
+ dxc_node_type_t *nt_item = NEW(dxc_node_type_t);
DEQ_ITEM_INIT(nt_item);
nt_item->ntype = nt;
- sys_mutex_lock(lock);
- result = hash_insert_const(node_type_map, iter, nt);
- DEQ_INSERT_TAIL(node_type_list, nt_item);
- sys_mutex_unlock(lock);
+ sys_mutex_lock(container->lock);
+ result = hash_insert_const(container->node_type_map, iter, nt);
+ DEQ_INSERT_TAIL(container->node_type_list, nt_item);
+ sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
if (result < 0)
@@ -436,34 +455,40 @@ int dx_container_register_node_type(const dx_node_type_t *nt)
}
-void dx_container_set_default_node_type(const dx_node_type_t *nt,
+void dx_container_set_default_node_type(dx_dispatch_t *dx,
+ const dx_node_type_t *nt,
void *context,
dx_dist_mode_t supported_dist)
{
- if (default_node)
- dx_container_destroy_node(default_node);
+ dx_container_t *container = dx->container;
+
+ if (container->default_node)
+ dx_container_destroy_node(container->default_node);
if (nt) {
- default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
+ container->default_node = dx_container_create_node(dx, nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
} else {
- default_node = 0;
+ container->default_node = 0;
dx_log(module, LOG_TRACE, "Default node removed");
}
}
-dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
+dx_node_t *dx_container_create_node(dx_dispatch_t *dx,
+ const dx_node_type_t *nt,
const char *name,
void *context,
dx_dist_mode_t supported_dist,
dx_lifetime_policy_t life_policy)
{
+ dx_container_t *container = dx->container;
int result;
dx_node_t *node = new_dx_node_t();
if (!node)
return 0;
+ node->container = container;
node->ntype = nt;
node->name = 0;
node->context = context;
@@ -472,9 +497,9 @@ dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
if (name) {
dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
- sys_mutex_lock(lock);
- result = hash_insert(node_map, iter, node);
- sys_mutex_unlock(lock);
+ sys_mutex_lock(container->lock);
+ result = hash_insert(container->node_map, iter, node);
+ sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
if (result < 0) {
free_dx_node_t(node);
@@ -494,11 +519,13 @@ dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
void dx_container_destroy_node(dx_node_t *node)
{
+ dx_container_t *container = node->container;
+
if (node->name) {
dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
- sys_mutex_lock(lock);
- hash_remove(node_map, iter);
- sys_mutex_unlock(lock);
+ sys_mutex_lock(container->lock);
+ hash_remove(container->node_map, iter);
+ sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
free(node->name);
}
diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c
new file mode 100644
index 0000000000..87eb535b8a
--- /dev/null
+++ b/qpid/extras/dispatch/src/dispatch.c
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch.h>
+#include "dispatch_private.h"
+#include "alloc_private.h"
+
+/**
+ * Private Function Prototypes
+ */
+dx_server_t *dx_server(int tc);
+void dx_server_free(dx_server_t *server);
+dx_container_t *dx_container(dx_dispatch_t *dx);
+void dx_container_free(dx_container_t *container);
+dx_router_t *dx_router(dx_dispatch_t *dx);
+void dx_router_free(dx_router_t *router);
+dx_agent_t *dx_agent(dx_dispatch_t *dx);
+void dx_agent_free(dx_agent_t *agent);
+
+
+dx_dispatch_t *dx_dispatch(int thread_count)
+{
+ dx_dispatch_t *dx = NEW(dx_dispatch_t);
+
+ dx_alloc_initialize();
+
+ dx->server = dx_server(thread_count);
+ dx->container = dx_container(dx);
+ dx->router = dx_router(dx);
+ dx->agent = dx_agent(dx);
+
+ return dx;
+}
+
+
+void dx_dispatch_free(dx_dispatch_t *dx)
+{
+ dx_agent_free(dx->agent);
+ dx_router_free(dx->router);
+ dx_container_free(dx->container);
+ dx_server_free(dx->server);
+}
+
diff --git a/qpid/extras/dispatch/src/dispatch_private.h b/qpid/extras/dispatch/src/dispatch_private.h
new file mode 100644
index 0000000000..0e8d7aa826
--- /dev/null
+++ b/qpid/extras/dispatch/src/dispatch_private.h
@@ -0,0 +1,35 @@
+#ifndef __dispatch_private_h__
+#define __dispatch_private_h__
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct dx_server_t dx_server_t;
+typedef struct dx_container_t dx_container_t;
+typedef struct dx_router_t dx_router_t;
+typedef struct dx_agent_t dx_agent_t;
+
+struct dx_dispatch_t {
+ dx_server_t *server;
+ dx_container_t *container;
+ dx_router_t *router;
+ dx_agent_t *agent;
+};
+
+#endif
+
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 6ddc8f45dd..0513b08a6b 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -18,19 +18,13 @@
*/
#include <stdio.h>
-#include <qpid/dispatch/server.h>
-#include <qpid/dispatch/message.h>
-#include <qpid/dispatch/threading.h>
-#include <qpid/dispatch/timer.h>
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/hash.h>
-#include <qpid/dispatch/iterator.h>
-#include <qpid/dispatch/log.h>
-#include <qpid/dispatch/router.h>
+#include <qpid/dispatch.h>
+#include "dispatch_private.h"
static char *module="ROUTER_NODE";
struct dx_router_t {
+ dx_dispatch_t *dx;
dx_node_t *node;
dx_link_list_t in_links;
dx_link_list_t out_links;
@@ -389,23 +383,24 @@ static dx_node_type_t router_node = {"router", 0, 0,
static int type_registered = 0;
-dx_router_t *dx_router(dx_router_configuration_t *config)
+dx_router_t *dx_router(dx_dispatch_t *dx)
{
if (!type_registered) {
type_registered = 1;
- dx_container_register_node_type(&router_node);
+ dx_container_register_node_type(dx, &router_node);
}
dx_router_t *router = NEW(dx_router_t);
- dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH);
+ dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
DEQ_INIT(router->in_links);
DEQ_INIT(router->out_links);
DEQ_INIT(router->in_fifo);
+ router->dx = dx;
router->lock = sys_mutex();
- router->timer = dx_timer(dx_router_timer_handler, (void*) router);
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
dx_timer_schedule(router->timer, 0); // Immediate
router->out_hash = hash(10, 32, 0);
@@ -417,7 +412,7 @@ dx_router_t *dx_router(dx_router_configuration_t *config)
void dx_router_free(dx_router_t *router)
{
- dx_container_set_default_node_type(0, 0, DX_DIST_BOTH);
+ dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH);
sys_mutex_free(router->lock);
free(router);
}
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c
index e5e521b47e..a2d2d4980a 100644
--- a/qpid/extras/dispatch/src/server.c
+++ b/qpid/extras/dispatch/src/server.c
@@ -23,16 +23,17 @@
#include "server_private.h"
#include "timer_private.h"
#include "alloc_private.h"
+#include "dispatch_private.h"
#include "auth.h"
#include "work_queue.h"
#include <stdio.h>
#include <time.h>
-#include <signal.h>
static char *module="SERVER";
-static __thread int server_thread = 0;
+static __thread dx_server_t *thread_server = 0;
typedef struct dx_thread_t {
+ dx_server_t *dx_server;
int thread_id;
volatile int running;
volatile int canceled;
@@ -41,16 +42,14 @@ typedef struct dx_thread_t {
} dx_thread_t;
-typedef struct dx_server_t {
+struct dx_server_t {
int thread_count;
pn_driver_t *driver;
dx_thread_start_cb_t start_handler;
dx_conn_handler_cb_t conn_handler;
- dx_signal_handler_cb_t signal_handler;
dx_user_fd_handler_cb_t ufd_handler;
void *start_context;
- void *conn_context;
- void *signal_context;
+ void *conn_handler_context;
sys_cond_t *cond;
sys_mutex_t *lock;
dx_thread_t **threads;
@@ -62,8 +61,12 @@ typedef struct dx_server_t {
int threads_paused;
int pause_next_sequence;
int pause_now_serving;
+ dx_signal_handler_cb_t signal_handler;
+ void *signal_context;
int pending_signal;
-} dx_server_t;
+};
+
+
ALLOC_DEFINE(dx_listener_t);
@@ -72,25 +75,13 @@ ALLOC_DEFINE(dx_connection_t);
ALLOC_DEFINE(dx_user_fd_t);
-/**
- * Singleton Concurrent Proton Driver object
- */
-static dx_server_t *dx_server = 0;
-
-
-static void signal_handler(int signum)
-{
- dx_server->pending_signal = signum;
- sys_cond_signal_all(dx_server->cond);
-}
-
-
-static dx_thread_t *thread(int id)
+static dx_thread_t *thread(dx_server_t *dx_server, int id)
{
dx_thread_t *thread = NEW(dx_thread_t);
if (!thread)
return 0;
+ thread->dx_server = dx_server;
thread->thread_id = id;
thread->running = 0;
thread->canceled = 0;
@@ -126,7 +117,7 @@ static void thread_process_listeners(pn_driver_t *driver)
}
-static void handle_signals_LH(void)
+static void handle_signals_LH(dx_server_t *dx_server)
{
int signum = dx_server->pending_signal;
@@ -141,7 +132,7 @@ static void handle_signals_LH(void)
}
-static void block_if_paused_LH(void)
+static void block_if_paused_LH(dx_server_t *dx_server)
{
if (dx_server->pause_requests > 0) {
dx_server->threads_paused++;
@@ -153,7 +144,7 @@ static void block_if_paused_LH(void)
}
-static void process_connector(pn_connector_t *cxtr)
+static void process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
{
dx_connection_t *ctx = pn_connector_context(cxtr);
int events = 0;
@@ -225,19 +216,20 @@ static void process_connector(pn_connector_t *cxtr)
} else
assert(0);
- dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ dx_server->conn_handler(dx_server->conn_handler_context,
+ ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
events = 1;
break;
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
- dx_server->conn_handler(ctx->context,
+ dx_server->conn_handler(dx_server->conn_handler_context, ctx->context,
DX_CONN_EVENT_CLOSE,
(dx_connection_t*) pn_connector_context(cxtr));
events = 0;
}
else
- events = dx_server->conn_handler(ctx->context,
+ events = dx_server->conn_handler(dx_server->conn_handler_context, ctx->context,
DX_CONN_EVENT_PROCESS,
(dx_connection_t*) pn_connector_context(cxtr));
break;
@@ -261,7 +253,8 @@ void pn_driver_wait_3(pn_driver_t *d);
static void *thread_run(void *arg)
{
- dx_thread_t *thread = (dx_thread_t*) arg;
+ dx_thread_t *thread = (dx_thread_t*) arg;
+ dx_server_t *dx_server = thread->dx_server;
pn_connector_t *work;
pn_connection_t *conn;
dx_connection_t *ctx;
@@ -272,7 +265,7 @@ static void *thread_run(void *arg)
if (!thread)
return 0;
- server_thread = 1;
+ thread_server = dx_server;
thread->running = 1;
if (thread->canceled)
@@ -294,7 +287,7 @@ static void *thread_run(void *arg)
//
// Check for pending signals to process
//
- handle_signals_LH();
+ handle_signals_LH(dx_server);
if (!thread->running) {
sys_mutex_unlock(dx_server->lock);
break;
@@ -303,7 +296,7 @@ static void *thread_run(void *arg)
//
// Check to see if the server is pausing. If so, block here.
//
- block_if_paused_LH();
+ block_if_paused_LH(dx_server);
if (!thread->running) {
sys_mutex_unlock(dx_server->lock);
break;
@@ -450,7 +443,7 @@ static void *thread_run(void *arg)
// Process the connector that we now have exclusive access to.
//
if (work) {
- process_connector(work);
+ process_connector(dx_server, work);
//
// Check to see if the connector was closed during processing
@@ -540,6 +533,7 @@ static void cxtr_try_open(void *context)
return;
dx_connection_t *ctx = new_dx_connection_t();
+ ctx->server = ct->server;
ctx->state = CONN_STATE_CONNECTING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
@@ -553,9 +547,9 @@ static void cxtr_try_open(void *context)
//
// pn_connector is not thread safe
//
- sys_mutex_lock(dx_server->lock);
- ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_lock(ct->server->lock);
+ ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, ct->config->port, (void*) ctx);
+ sys_mutex_unlock(ct->server->lock);
ct->ctx = ctx;
ct->delay = 5000;
@@ -563,18 +557,13 @@ static void cxtr_try_open(void *context)
}
-void dx_server_initialize(int thread_count)
+dx_server_t *dx_server(int thread_count)
{
int i;
- if (dx_server)
- return; // TODO - Fail in a more dramatic way
-
- dx_alloc_initialize();
- dx_server = NEW(dx_server_t);
-
- if (!dx_server)
- return; // TODO - Fail in a more dramatic way
+ dx_server_t *dx_server = NEW(dx_server_t);
+ if (dx_server == 0)
+ return 0;
dx_server->thread_count = thread_count;
dx_server->driver = pn_driver();
@@ -591,7 +580,7 @@ void dx_server_initialize(int thread_count)
dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
for (i = 0; i < thread_count; i++)
- dx_server->threads[i] = thread(i);
+ dx_server->threads[i] = thread(dx_server, i);
dx_server->work_queue = work_queue();
DEQ_INIT(dx_server->pending_timers);
@@ -602,10 +591,12 @@ void dx_server_initialize(int thread_count)
dx_server->pause_next_sequence = 0;
dx_server->pause_now_serving = 0;
dx_server->pending_signal = 0;
+
+ return dx_server;
}
-void dx_server_finalize(void)
+void dx_server_free(dx_server_t *dx_server)
{
int i;
if (!dx_server)
@@ -620,38 +611,40 @@ void dx_server_finalize(void)
sys_mutex_free(dx_server->lock);
sys_cond_free(dx_server->cond);
free(dx_server);
- dx_server = 0;
}
-void dx_server_set_conn_handler(dx_conn_handler_cb_t handler)
+void dx_server_set_conn_handler(dx_dispatch_t *dx, dx_conn_handler_cb_t handler, void *handler_context)
{
- dx_server->conn_handler = handler;
+ dx->server->conn_handler = handler;
+ dx->server->conn_handler_context = handler_context;
}
-void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context)
+void dx_server_set_signal_handler(dx_dispatch_t *dx, dx_signal_handler_cb_t handler, void *context)
{
- dx_server->signal_handler = handler;
- dx_server->signal_context = context;
+ dx->server->signal_handler = handler;
+ dx->server->signal_context = context;
}
-void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context)
+void dx_server_set_start_handler(dx_dispatch_t *dx, dx_thread_start_cb_t handler, void *context)
{
- dx_server->start_handler = handler;
- dx_server->start_context = context;
+ dx->server->start_handler = handler;
+ dx->server->start_context = context;
}
-void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler)
+void dx_server_set_user_fd_handler(dx_dispatch_t *dx, dx_user_fd_handler_cb_t ufd_handler)
{
- dx_server->ufd_handler = ufd_handler;
+ dx->server->ufd_handler = ufd_handler;
}
-void dx_server_run(void)
+void dx_server_run(dx_dispatch_t *dx)
{
+ dx_server_t *dx_server = dx->server;
+
int i;
if (!dx_server)
return;
@@ -672,9 +665,11 @@ void dx_server_run(void)
}
-void dx_server_start(void)
+void dx_server_start(dx_dispatch_t *dx)
{
+ dx_server_t *dx_server = dx->server;
int i;
+
if (!dx_server)
return;
@@ -687,8 +682,9 @@ void dx_server_start(void)
}
-void dx_server_stop(void)
+void dx_server_stop(dx_dispatch_t *dx)
{
+ dx_server_t *dx_server = dx->server;
int idx;
sys_mutex_lock(dx_server->lock);
@@ -698,7 +694,7 @@ void dx_server_stop(void)
pn_driver_wakeup(dx_server->driver);
sys_mutex_unlock(dx_server->lock);
- if (!server_thread) {
+ if (thread_server != dx_server) {
for (idx = 0; idx < dx_server->thread_count; idx++)
thread_join(dx_server->threads[idx]);
dx_log(module, LOG_INFO, "Shut Down");
@@ -706,14 +702,19 @@ void dx_server_stop(void)
}
-void dx_server_signal(int signum)
+void dx_server_signal(dx_dispatch_t *dx, int signum)
{
- signal(signum, signal_handler);
+ dx_server_t *dx_server = dx->server;
+
+ dx_server->pending_signal = signum;
+ sys_cond_signal_all(dx_server->cond);
}
-void dx_server_pause(void)
+void dx_server_pause(dx_dispatch_t *dx)
{
+ dx_server_t *dx_server = dx->server;
+
sys_mutex_lock(dx_server->lock);
//
@@ -741,8 +742,10 @@ void dx_server_pause(void)
}
-void dx_server_resume(void)
+void dx_server_resume(dx_dispatch_t *dx)
{
+ dx_server_t *dx_server = dx->server;
+
sys_mutex_lock(dx_server->lock);
dx_server->pause_requests--;
dx_server->pause_now_serving++;
@@ -783,13 +786,15 @@ pn_connection_t *dx_connection_pn(dx_connection_t *conn)
}
-dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context)
+dx_listener_t *dx_server_listen(dx_dispatch_t *dx, const dx_server_config_t *config, void *context)
{
- dx_listener_t *li = new_dx_listener_t();
+ dx_server_t *dx_server = dx->server;
+ dx_listener_t *li = new_dx_listener_t();
if (!li)
return 0;
+ li->server = dx_server;
li->config = config;
li->context = context;
li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);
@@ -819,18 +824,20 @@ void dx_server_listener_close(dx_listener_t* li)
}
-dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context)
+dx_connector_t *dx_server_connect(dx_dispatch_t *dx, const dx_server_config_t *config, void *context)
{
- dx_connector_t *ct = new_dx_connector_t();
+ dx_server_t *dx_server = dx->server;
+ dx_connector_t *ct = new_dx_connector_t();
if (!ct)
return 0;
+ ct->server = dx_server;
ct->state = CXTR_STATE_CONNECTING;
ct->config = config;
ct->context = context;
ct->ctx = 0;
- ct->timer = dx_timer(cxtr_try_open, (void*) ct);
+ ct->timer = dx_timer(dx, cxtr_try_open, (void*) ct);
ct->delay = 0;
dx_timer_schedule(ct->timer, ct->delay);
@@ -853,14 +860,16 @@ void dx_server_connector_free(dx_connector_t* ct)
}
-dx_user_fd_t *dx_user_fd(int fd, void *context)
+dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context)
{
- dx_user_fd_t *ufd = new_dx_user_fd_t();
+ dx_server_t *dx_server = dx->server;
+ dx_user_fd_t *ufd = new_dx_user_fd_t();
if (!ufd)
return 0;
dx_connection_t *ctx = new_dx_connection_t();
+ ctx->server = dx_server;
ctx->state = CONN_STATE_USER;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
@@ -872,6 +881,7 @@ dx_user_fd_t *dx_user_fd(int fd, void *context)
ctx->ufd = ufd;
ufd->context = context;
+ ufd->server = dx_server;
ufd->fd = fd;
ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
pn_driver_wakeup(dx_server->driver);
@@ -890,14 +900,14 @@ void dx_user_fd_free(dx_user_fd_t *ufd)
void dx_user_fd_activate_read(dx_user_fd_t *ufd)
{
pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
- pn_driver_wakeup(dx_server->driver);
+ pn_driver_wakeup(ufd->server->driver);
}
void dx_user_fd_activate_write(dx_user_fd_t *ufd)
{
pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
- pn_driver_wakeup(dx_server->driver);
+ pn_driver_wakeup(ufd->server->driver);
}
@@ -915,12 +925,12 @@ bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
void dx_server_timer_pending_LH(dx_timer_t *timer)
{
- DEQ_INSERT_TAIL(dx_server->pending_timers, timer);
+ DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
}
void dx_server_timer_cancel_LH(dx_timer_t *timer)
{
- DEQ_REMOVE(dx_server->pending_timers, timer);
+ DEQ_REMOVE(timer->server->pending_timers, timer);
}
diff --git a/qpid/extras/dispatch/src/server_private.h b/qpid/extras/dispatch/src/server_private.h
index 1722175e35..db61090324 100644
--- a/qpid/extras/dispatch/src/server_private.h
+++ b/qpid/extras/dispatch/src/server_private.h
@@ -48,8 +48,10 @@ typedef enum {
CXTR_STATE_FAILED
} cxtr_state_t;
+typedef struct dx_server_t dx_server_t;
struct dx_listener_t {
+ dx_server_t *server;
const dx_server_config_t *config;
void *context;
pn_listener_t *pn_listener;
@@ -57,6 +59,7 @@ struct dx_listener_t {
struct dx_connector_t {
+ dx_server_t *server;
cxtr_state_t state;
const dx_server_config_t *config;
void *context;
@@ -67,6 +70,7 @@ struct dx_connector_t {
struct dx_connection_t {
+ dx_server_t *server;
conn_state_t state;
int owner_thread;
int enqueued;
@@ -81,6 +85,7 @@ struct dx_connection_t {
struct dx_user_fd_t {
+ dx_server_t *server;
void *context;
int fd;
pn_connector_t *pn_conn;
diff --git a/qpid/extras/dispatch/src/timer.c b/qpid/extras/dispatch/src/timer.c
index b6b4864e26..cb957e8400 100644
--- a/qpid/extras/dispatch/src/timer.c
+++ b/qpid/extras/dispatch/src/timer.c
@@ -19,6 +19,7 @@
#include "timer_private.h"
#include "server_private.h"
+#include "dispatch_private.h"
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/alloc.h>
@@ -67,7 +68,7 @@ static void dx_timer_cancel_LH(dx_timer_t *timer)
// Public Functions from timer.h
//=========================================================================
-dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context)
+dx_timer_t *dx_timer(dx_dispatch_t *dx, dx_timer_cb_t cb, void* context)
{
dx_timer_t *timer = new_dx_timer_t();
if (!timer)
@@ -75,6 +76,7 @@ dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context)
DEQ_ITEM_INIT(timer);
+ timer->server = dx ? dx->server : 0;
timer->handler = cb;
timer->context = context;
timer->delta_time = 0;
diff --git a/qpid/extras/dispatch/src/timer_private.h b/qpid/extras/dispatch/src/timer_private.h
index 618297b18e..905a8f5bd1 100644
--- a/qpid/extras/dispatch/src/timer_private.h
+++ b/qpid/extras/dispatch/src/timer_private.h
@@ -22,6 +22,7 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/timer.h>
#include <qpid/dispatch/threading.h>
+#include "server_private.h"
typedef enum {
TIMER_FREE,
@@ -33,6 +34,7 @@ typedef enum {
struct dx_timer_t {
DEQ_LINKS(dx_timer_t);
+ dx_server_t *server;
dx_timer_cb_t handler;
void *context;
long delta_time;