summaryrefslogtreecommitdiff
path: root/extras/dispatch/src
diff options
context:
space:
mode:
Diffstat (limited to 'extras/dispatch/src')
-rw-r--r--extras/dispatch/src/agent.c151
-rw-r--r--extras/dispatch/src/alloc.c210
-rw-r--r--extras/dispatch/src/alloc_private.h26
-rw-r--r--extras/dispatch/src/auth.c75
-rw-r--r--extras/dispatch/src/auth.h27
-rw-r--r--extras/dispatch/src/buffer.c83
-rw-r--r--extras/dispatch/src/container.c616
-rw-r--r--extras/dispatch/src/hash.c223
-rw-r--r--extras/dispatch/src/iovec.c81
-rw-r--r--extras/dispatch/src/iterator.c268
-rw-r--r--extras/dispatch/src/log.c56
-rw-r--r--extras/dispatch/src/message.c1120
-rw-r--r--extras/dispatch/src/message_private.h94
-rw-r--r--extras/dispatch/src/posix/threading.c126
-rw-r--r--extras/dispatch/src/router_node.c424
-rw-r--r--extras/dispatch/src/server.c903
-rw-r--r--extras/dispatch/src/server_private.h96
-rw-r--r--extras/dispatch/src/timer.c236
-rw-r--r--extras/dispatch/src/timer_private.h51
-rw-r--r--extras/dispatch/src/work_queue.c132
-rw-r--r--extras/dispatch/src/work_queue.h33
21 files changed, 5031 insertions, 0 deletions
diff --git a/extras/dispatch/src/agent.c b/extras/dispatch/src/agent.c
new file mode 100644
index 0000000000..a885042b45
--- /dev/null
+++ b/extras/dispatch/src/agent.c
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/agent.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/timer.h>
+#include <string.h>
+
+
+typedef struct dx_agent_t {
+ hash_t *class_hash;
+ dx_message_list_t in_fifo;
+ dx_message_list_t out_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+} dx_agent_t;
+
+static dx_agent_t *agent = 0;
+
+
+struct dx_agent_class_t {
+ char *fqname;
+ void *context;
+ dx_agent_schema_cb_t schema_handler;
+ dx_agent_query_cb_t query_handler; // 0 iff class is an event.
+};
+
+
+static void dx_agent_timer_handler(void *context)
+{
+ // TODO - Process the in_fifo here
+}
+
+
+void dx_agent_initialize()
+{
+ assert(!agent);
+ agent = NEW(dx_agent_t);
+ agent->class_hash = hash(6, 10, 1);
+ DEQ_INIT(agent->in_fifo);
+ DEQ_INIT(agent->out_fifo);
+ agent->lock = sys_mutex();
+ agent->timer = dx_timer(dx_agent_timer_handler, agent);
+}
+
+
+void dx_agent_finalize(void)
+{
+ sys_mutex_free(agent->lock);
+ dx_timer_free(agent->timer);
+ hash_free(agent->class_hash);
+ free(agent);
+ agent = 0;
+}
+
+
+dx_agent_class_t *dx_agent_register_class(const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler,
+ dx_agent_query_cb_t query_handler)
+{
+ dx_agent_class_t *cls = NEW(dx_agent_class_t);
+ assert(cls);
+ cls->fqname = (char*) malloc(strlen(fqname) + 1);
+ strcpy(cls->fqname, fqname);
+ cls->context = context;
+ cls->schema_handler = schema_handler;
+ cls->query_handler = query_handler;
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
+ int result = hash_insert_const(agent->class_hash, iter, cls);
+ dx_field_iterator_free(iter);
+ assert(result >= 0);
+
+ return cls;
+}
+
+
+dx_agent_class_t *dx_agent_register_event(const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler)
+{
+ return dx_agent_register_class(fqname, context, schema_handler, 0);
+}
+
+
+void dx_agent_value_string(const void *correlator, const char *key, const char *value)
+{
+}
+
+
+void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value)
+{
+}
+
+
+void dx_agent_value_null(const void *correlator, const char *key)
+{
+}
+
+
+void dx_agent_value_boolean(const void *correlator, const char *key, bool value)
+{
+}
+
+
+void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len)
+{
+}
+
+
+void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value)
+{
+}
+
+
+void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value)
+{
+}
+
+
+void dx_agent_value_complete(const void *correlator, bool more)
+{
+}
+
+
+void *dx_agent_raise_event(dx_agent_class_t *event)
+{
+ return 0;
+}
+
diff --git a/extras/dispatch/src/alloc.c b/extras/dispatch/src/alloc.c
new file mode 100644
index 0000000000..2b3b953aad
--- /dev/null
+++ b/extras/dispatch/src/alloc.c
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include <memory.h>
+#include <stdio.h>
+
+typedef struct item_t item_t;
+
+struct item_t {
+ DEQ_LINKS(item_t);
+ dx_alloc_type_desc_t *desc;
+};
+
+DEQ_DECLARE(item_t, item_list_t);
+
+struct dx_alloc_pool_t {
+ item_list_t free_list;
+};
+
+dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0};
+dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0};
+
+sys_mutex_t *init_lock;
+item_list_t type_list;
+
+static void dx_alloc_init(dx_alloc_type_desc_t *desc)
+{
+ sys_mutex_lock(init_lock);
+
+ desc->total_size = desc->type_size;
+ if (desc->additional_size)
+ desc->total_size += *desc->additional_size;
+
+ dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d",
+ desc->type_name, desc->type_size, desc->total_size);
+
+ if (!desc->global_pool) {
+ if (desc->config == 0)
+ desc->config = desc->total_size > 256 ?
+ &dx_alloc_default_config_big : &dx_alloc_default_config_small;
+
+ assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
+
+ desc->global_pool = NEW(dx_alloc_pool_t);
+ DEQ_INIT(desc->global_pool->free_list);
+ desc->lock = sys_mutex();
+ desc->stats = NEW(dx_alloc_stats_t);
+ memset(desc->stats, 0, sizeof(dx_alloc_stats_t));
+ }
+
+ item_t *type_item = NEW(item_t);
+ DEQ_ITEM_INIT(type_item);
+ type_item->desc = desc;
+ DEQ_INSERT_TAIL(type_list, type_item);
+
+ sys_mutex_unlock(init_lock);
+}
+
+
+void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool)
+{
+ int idx;
+
+ //
+ // If the descriptor is not initialized, set it up now.
+ //
+ if (!desc->global_pool)
+ dx_alloc_init(desc);
+
+ //
+ // If this is the thread's first pass through here, allocate the
+ // thread-local pool for this type.
+ //
+ if (*tpool == 0) {
+ *tpool = NEW(dx_alloc_pool_t);
+ DEQ_INIT((*tpool)->free_list);
+ }
+
+ dx_alloc_pool_t *pool = *tpool;
+
+ //
+ // Fast case: If there's an item on the local free list, take it off the
+ // list and return it. Since everything we've touched is thread-local,
+ // there is no need to acquire a lock.
+ //
+ item_t *item = DEQ_HEAD(pool->free_list);
+ if (item) {
+ DEQ_REMOVE_HEAD(pool->free_list);
+ return &item[1];
+ }
+
+ //
+ // The local free list is empty, we need to either rebalance a batch
+ // of items from the global list or go to the heap to get new memory.
+ //
+ sys_mutex_lock(desc->lock);
+ if (DEQ_SIZE(desc->global_pool->free_list) >= desc->config->transfer_batch_size) {
+ //
+ // Rebalance a full batch from the global free list to the thread list.
+ //
+ desc->stats->batches_rebalanced_to_threads++;
+ desc->stats->held_by_threads += desc->config->transfer_batch_size;
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ DEQ_INSERT_TAIL(pool->free_list, item);
+ }
+ } else {
+ //
+ // Allocate a full batch from the heap and put it on the thread list.
+ //
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = (item_t*) malloc(sizeof(item_t) + desc->total_size);
+ if (item == 0)
+ break;
+ DEQ_ITEM_INIT(item);
+ item->desc = desc;
+ DEQ_INSERT_TAIL(pool->free_list, item);
+ desc->stats->held_by_threads++;
+ desc->stats->total_alloc_from_heap++;
+ }
+ }
+ sys_mutex_unlock(desc->lock);
+
+ item = DEQ_HEAD(pool->free_list);
+ if (item) {
+ DEQ_REMOVE_HEAD(pool->free_list);
+ return &item[1];
+ }
+
+ return 0;
+}
+
+
+void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p)
+{
+ item_t *item = ((item_t*) p) - 1;
+ int idx;
+
+ //
+ // If this is the thread's first pass through here, allocate the
+ // thread-local pool for this type.
+ //
+ if (*tpool == 0) {
+ *tpool = NEW(dx_alloc_pool_t);
+ DEQ_INIT((*tpool)->free_list);
+ }
+
+ dx_alloc_pool_t *pool = *tpool;
+
+ DEQ_INSERT_TAIL(pool->free_list, item);
+
+ if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
+ return;
+
+ //
+ // We've exceeded the maximum size of the local free list. A batch must be
+ // rebalanced back to the global list.
+ //
+ sys_mutex_lock(desc->lock);
+ desc->stats->batches_rebalanced_to_global++;
+ desc->stats->held_by_threads -= desc->config->transfer_batch_size;
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = DEQ_HEAD(pool->free_list);
+ DEQ_REMOVE_HEAD(pool->free_list);
+ DEQ_INSERT_TAIL(desc->global_pool->free_list, item);
+ }
+
+ //
+ // If there's a global_free_list size limit, remove items until the limit is
+ // not exceeded.
+ //
+ if (desc->config->global_free_list_max != 0) {
+ while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) {
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ free(item);
+ desc->stats->total_free_to_heap++;
+ }
+ }
+
+ sys_mutex_unlock(desc->lock);
+}
+
+
+void dx_alloc_initialize(void)
+{
+ init_lock = sys_mutex();
+ DEQ_INIT(type_list);
+}
+
diff --git a/extras/dispatch/src/alloc_private.h b/extras/dispatch/src/alloc_private.h
new file mode 100644
index 0000000000..fbb18ccd48
--- /dev/null
+++ b/extras/dispatch/src/alloc_private.h
@@ -0,0 +1,26 @@
+#ifndef __dispatch_alloc_private_h__
+#define __dispatch_alloc_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/alloc.h>
+
+void dx_alloc_initialize(void);
+
+#endif
diff --git a/extras/dispatch/src/auth.c b/extras/dispatch/src/auth.c
new file mode 100644
index 0000000000..f0df58f6c2
--- /dev/null
+++ b/extras/dispatch/src/auth.c
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include "auth.h"
+#include "server_private.h"
+#include <proton/sasl.h>
+
+
+void auth_client_handler(pn_connector_t *cxtr)
+{
+ pn_sasl_t *sasl = pn_connector_sasl(cxtr);
+ pn_sasl_state_t state = pn_sasl_state(sasl);
+ dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr);
+
+ if (state == PN_SASL_CONF) {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_client(sasl);
+ }
+
+ state = pn_sasl_state(sasl);
+
+ if (state == PN_SASL_PASS) {
+ ctx->state = CONN_STATE_OPENING;
+ } else if (state == PN_SASL_FAIL) {
+ ctx->state = CONN_STATE_FAILED;
+ }
+}
+
+
+void auth_server_handler(pn_connector_t *cxtr)
+{
+ pn_sasl_t *sasl = pn_connector_sasl(cxtr);
+ pn_sasl_state_t state = pn_sasl_state(sasl);
+ dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr);
+
+ while (state == PN_SASL_CONF || state == PN_SASL_STEP) {
+ if (state == PN_SASL_CONF) {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_server(sasl);
+ } else if (state == PN_SASL_STEP) {
+ const char* mechanisms = pn_sasl_remote_mechanisms(sasl);
+ if (strcmp(mechanisms, "ANONYMOUS") == 0)
+ pn_sasl_done(sasl, PN_SASL_OK);
+ else
+ pn_sasl_done(sasl, PN_SASL_AUTH);
+ }
+ state = pn_sasl_state(sasl);
+ }
+
+ if (state == PN_SASL_PASS) {
+ ctx->state = CONN_STATE_OPENING;
+ } else if (state == PN_SASL_FAIL) {
+ ctx->state = CONN_STATE_FAILED;
+ }
+}
+
+
diff --git a/extras/dispatch/src/auth.h b/extras/dispatch/src/auth.h
new file mode 100644
index 0000000000..c551c8ff76
--- /dev/null
+++ b/extras/dispatch/src/auth.h
@@ -0,0 +1,27 @@
+#ifndef __auth_h__
+#define __auth_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/driver.h>
+
+void auth_client_handler(pn_connector_t *conn);
+void auth_server_handler(pn_connector_t *conn);
+
+#endif
diff --git a/extras/dispatch/src/buffer.c b/extras/dispatch/src/buffer.c
new file mode 100644
index 0000000000..015711afd9
--- /dev/null
+++ b/extras/dispatch/src/buffer.c
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/alloc.h>
+
+static size_t buffer_size = 512;
+static int size_locked = 0;
+
+ALLOC_DECLARE(dx_buffer_t);
+ALLOC_DEFINE_CONFIG(dx_buffer_t, sizeof(dx_buffer_t), &buffer_size, 0);
+
+
+void dx_buffer_set_size(size_t size)
+{
+ assert(!size_locked);
+ buffer_size = size;
+}
+
+
+dx_buffer_t *dx_allocate_buffer(void)
+{
+ size_locked = 1;
+ dx_buffer_t *buf = new_dx_buffer_t();
+
+ DEQ_ITEM_INIT(buf);
+ buf->size = 0;
+ return buf;
+}
+
+
+void dx_free_buffer(dx_buffer_t *buf)
+{
+ free_dx_buffer_t(buf);
+}
+
+
+unsigned char *dx_buffer_base(dx_buffer_t *buf)
+{
+ return (unsigned char*) &buf[1];
+}
+
+
+unsigned char *dx_buffer_cursor(dx_buffer_t *buf)
+{
+ return ((unsigned char*) &buf[1]) + buf->size;
+}
+
+
+size_t dx_buffer_capacity(dx_buffer_t *buf)
+{
+ return buffer_size - buf->size;
+}
+
+
+size_t dx_buffer_size(dx_buffer_t *buf)
+{
+ return buf->size;
+}
+
+
+void dx_buffer_insert(dx_buffer_t *buf, size_t len)
+{
+ buf->size += len;
+ assert(buf->size <= buffer_size);
+}
+
diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c
new file mode 100644
index 0000000000..68e2afa3eb
--- /dev/null
+++ b/extras/dispatch/src/container.c
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/message.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+
+static char *module="CONTAINER";
+
+struct dx_node_t {
+ const dx_node_type_t *ntype;
+ char *name;
+ void *context;
+ dx_dist_mode_t supported_dist;
+ dx_lifetime_policy_t life_policy;
+};
+
+ALLOC_DECLARE(dx_node_t);
+ALLOC_DEFINE(dx_node_t);
+ALLOC_DEFINE(dx_link_item_t);
+
+struct dx_link_t {
+ pn_link_t *pn_link;
+ void *context;
+ dx_node_t *node;
+};
+
+ALLOC_DECLARE(dx_link_t);
+ALLOC_DEFINE(dx_link_t);
+
+typedef struct nxc_node_type_t {
+ DEQ_LINKS(struct nxc_node_type_t);
+ const dx_node_type_t *ntype;
+} nxc_node_type_t;
+DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);
+
+
+static hash_t *node_type_map;
+static hash_t *node_map;
+static sys_mutex_t *lock;
+static dx_node_t *default_node;
+static nxc_node_type_list_t node_type_list;
+
+static void setup_outgoing_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ dx_node_t *node;
+ int result;
+ const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
+ dx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured source
+
+ if (source) {
+ iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ dx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ dx_link_t *link = new_dx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->outgoing_handler(node->context, link);
+}
+
+
+static void setup_incoming_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ dx_node_t *node;
+ int result;
+ const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured target
+
+ if (target) {
+ iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ dx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ dx_link_t *link = new_dx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->incoming_handler(node->context, link);
+}
+
+
+static int do_writable(pn_link_t *pn_link)
+{
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ if (!link)
+ return 0;
+
+ dx_node_t *node = link->node;
+ if (!node)
+ return 0;
+
+ return node->ntype->writable_handler(node->context, link);
+}
+
+
+static void process_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node) {
+ node->ntype->rx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ //
+ // Reject the delivery if we couldn't find a node to handle it
+ //
+ pn_link_advance(pn_link);
+ pn_link_flow(pn_link, 1);
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);
+}
+
+
+static void do_send(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node) {
+ node->ntype->tx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ // TODO - Cancel the delivery
+}
+
+
+static void do_updated(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->disp_handler(node->context, link, delivery);
+ }
+}
+
+
+static int close_handler(void* unused, pn_connection_t *conn)
+{
+ //
+ // Close all links, passing False as the 'closed' argument. These links are not
+ // being properly 'detached'. They are being orphaned.
+ //
+ pn_link_t *pn_link = pn_link_head(conn, 0);
+ while (pn_link) {
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 0);
+ pn_link_close(pn_link);
+ free_dx_link_t(link);
+ pn_link = pn_link_next(pn_link, 0);
+ }
+
+ // teardown all sessions
+ pn_session_t *ssn = pn_session_head(conn, 0);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, 0);
+ }
+
+ // teardown the connection
+ pn_connection_close(conn);
+ return 0;
+}
+
+
+static int process_handler(void* unused, pn_connection_t *conn)
+{
+ pn_session_t *ssn;
+ pn_link_t *pn_link;
+ pn_delivery_t *delivery;
+ int event_count = 0;
+
+ // Step 1: setup the engine's connection, and any sessions and links
+ // that may be pending.
+
+ // initialize the connection if it's new
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+ pn_connection_open(conn);
+ event_count++;
+ }
+
+ // open all pending sessions
+ ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
+ while (ssn) {
+ pn_session_open(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+ // configure and open any pending links
+ pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
+ while (pn_link) {
+ if (pn_link_is_sender(pn_link))
+ setup_outgoing_link(pn_link);
+ else
+ setup_incoming_link(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+
+ // Step 2: Now drain all the pending deliveries from the connection's
+ // work queue and process them
+
+ delivery = pn_work_head(conn);
+ while (delivery) {
+ if (pn_delivery_readable(delivery))
+ process_receive(delivery);
+ else if (pn_delivery_writable(delivery))
+ do_send(delivery);
+
+ if (pn_delivery_updated(delivery))
+ do_updated(delivery);
+
+ delivery = pn_work_next(delivery);
+ event_count++;
+ }
+
+ //
+ // Step 2.5: Traverse all of the links on the connection looking for
+ // outgoing links with non-zero credit. Call the attached node's
+ // writable handler for such links.
+ //
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ while (pn_link) {
+ assert(pn_session_connection(pn_link_session(pn_link)) == conn);
+ if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
+ event_count += do_writable(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ }
+
+ // Step 3: Clean up any links or sessions that have been closed by the
+ // remote. If the connection has been closed remotely, clean that up
+ // also.
+
+ // teardown any terminating links
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (pn_link) {
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
+ pn_link_close(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown any terminating sessions
+ ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown the connection if it's terminating
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ pn_connection_close(conn);
+ event_count++;
+ }
+
+ return event_count;
+}
+
+
+static void open_handler(dx_connection_t *conn, dx_direction_t dir)
+{
+ const dx_node_type_t *nt;
+
+ //
+ // Note the locking structure in this function. Generally this would be unsafe, but since
+ // this particular list is only ever appended to and never has items inserted or deleted,
+ // this usage is safe in this case.
+ //
+ sys_mutex_lock(lock);
+ nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
+ sys_mutex_unlock(lock);
+
+ pn_connection_open(dx_connection_pn(conn));
+
+ while (nt_item) {
+ nt = nt_item->ntype;
+ if (dir == DX_INCOMING) {
+ if (nt->inbound_conn_open_handler)
+ nt->inbound_conn_open_handler(nt->type_context, conn);
+ } else {
+ if (nt->outbound_conn_open_handler)
+ nt->outbound_conn_open_handler(nt->type_context, conn);
+ }
+
+ sys_mutex_lock(lock);
+ nt_item = DEQ_NEXT(nt_item);
+ sys_mutex_unlock(lock);
+ }
+}
+
+
+static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn)
+{
+ pn_connection_t *conn = dx_connection_pn(dx_conn);
+
+ switch (event) {
+ case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break;
+ case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break;
+ case DX_CONN_EVENT_CLOSE: return close_handler(context, conn);
+ case DX_CONN_EVENT_PROCESS: return process_handler(context, conn);
+ }
+
+ return 0;
+}
+
+
+void dx_container_initialize(void)
+{
+ dx_log(module, LOG_TRACE, "Container Initializing");
+
+ node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
+ node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
+ lock = sys_mutex();
+ default_node = 0;
+ DEQ_INIT(node_type_list);
+
+ dx_server_set_conn_handler(handler);
+}
+
+
+void dx_container_finalize(void)
+{
+}
+
+
+int dx_container_register_node_type(const dx_node_type_t *nt)
+{
+ int result;
+ dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
+ nxc_node_type_t *nt_item = NEW(nxc_node_type_t);
+ DEQ_ITEM_INIT(nt_item);
+ nt_item->ntype = nt;
+
+ sys_mutex_lock(lock);
+ result = hash_insert_const(node_type_map, iter, nt);
+ DEQ_INSERT_TAIL(node_type_list, nt_item);
+ sys_mutex_unlock(lock);
+
+ dx_field_iterator_free(iter);
+ if (result < 0)
+ return result;
+ dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name);
+
+ return 0;
+}
+
+
+void dx_container_set_default_node_type(const dx_node_type_t *nt,
+ void *context,
+ dx_dist_mode_t supported_dist)
+{
+ if (default_node)
+ dx_container_destroy_node(default_node);
+
+ if (nt) {
+ default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
+ dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
+ } else {
+ default_node = 0;
+ dx_log(module, LOG_TRACE, "Default node removed");
+ }
+}
+
+
+dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
+ const char *name,
+ void *context,
+ dx_dist_mode_t supported_dist,
+ dx_lifetime_policy_t life_policy)
+{
+ int result;
+ dx_node_t *node = new_dx_node_t();
+ if (!node)
+ return 0;
+
+ node->ntype = nt;
+ node->name = 0;
+ node->context = context;
+ node->supported_dist = supported_dist;
+ node->life_policy = life_policy;
+
+ if (name) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ result = hash_insert(node_map, iter, node);
+ sys_mutex_unlock(lock);
+ dx_field_iterator_free(iter);
+ if (result < 0) {
+ free_dx_node_t(node);
+ return 0;
+ }
+
+ node->name = (char*) malloc(strlen(name) + 1);
+ strcpy(node->name, name);
+ }
+
+ if (name)
+ dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name);
+
+ return node;
+}
+
+
+void dx_container_destroy_node(dx_node_t *node)
+{
+ if (node->name) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ hash_remove(node_map, iter);
+ sys_mutex_unlock(lock);
+ dx_field_iterator_free(iter);
+ free(node->name);
+ }
+
+ free_dx_node_t(node);
+}
+
+
+void dx_container_node_set_context(dx_node_t *node, void *node_context)
+{
+ node->context = node_context;
+}
+
+
+dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node)
+{
+ return node->supported_dist;
+}
+
+
+dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node)
+{
+ return node->life_policy;
+}
+
+
+dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name)
+{
+ pn_session_t *sess = pn_session(dx_connection_pn(conn));
+ dx_link_t *link = new_dx_link_t();
+
+ if (dir == DX_OUTGOING)
+ link->pn_link = pn_sender(sess, name);
+ else
+ link->pn_link = pn_receiver(sess, name);
+ link->context = node->context;
+ link->node = node;
+
+ pn_link_set_context(link->pn_link, link);
+
+ pn_session_open(sess);
+
+ return link;
+}
+
+
+void dx_link_set_context(dx_link_t *link, void *context)
+{
+ link->context = context;
+}
+
+
+void *dx_link_get_context(dx_link_t *link)
+{
+ return link->context;
+}
+
+
+pn_link_t *dx_link_pn(dx_link_t *link)
+{
+ return link->pn_link;
+}
+
+
+pn_terminus_t *dx_link_source(dx_link_t *link)
+{
+ return pn_link_source(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_target(dx_link_t *link)
+{
+ return pn_link_target(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_remote_source(dx_link_t *link)
+{
+ return pn_link_remote_source(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_remote_target(dx_link_t *link)
+{
+ return pn_link_remote_target(link->pn_link);
+}
+
+
+void dx_link_activate(dx_link_t *link)
+{
+ if (!link || !link->pn_link)
+ return;
+
+ pn_session_t *sess = pn_link_session(link->pn_link);
+ if (!sess)
+ return;
+
+ pn_connection_t *conn = pn_session_connection(sess);
+ if (!conn)
+ return;
+
+ dx_connection_t *ctx = pn_connection_get_context(conn);
+ if (!ctx)
+ return;
+
+ dx_server_activate(ctx);
+}
+
+
+void dx_link_close(dx_link_t *link)
+{
+ pn_link_close(link->pn_link);
+}
+
+
diff --git a/extras/dispatch/src/hash.c b/extras/dispatch/src/hash.c
new file mode 100644
index 0000000000..c54d5d6fcf
--- /dev/null
+++ b/extras/dispatch/src/hash.c
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include <stdio.h>
+#include <string.h>
+
+typedef struct hash_item_t {
+ DEQ_LINKS(struct hash_item_t);
+ unsigned char *key;
+ union {
+ void *val;
+ const void *val_const;
+ } v;
+} hash_item_t;
+
+ALLOC_DECLARE(hash_item_t);
+ALLOC_DEFINE(hash_item_t);
+DEQ_DECLARE(hash_item_t, items_t);
+
+
+typedef struct bucket_t {
+ items_t items;
+} bucket_t;
+
+
+struct hash_t {
+ bucket_t *buckets;
+ unsigned int bucket_count;
+ unsigned int bucket_mask;
+ int batch_size;
+ size_t size;
+ int is_const;
+};
+
+
+// djb2 hash algorithm
+static unsigned long hash_function(dx_field_iterator_t *iter)
+{
+ unsigned long hash = 5381;
+ int c;
+
+ while (!dx_field_iterator_end(iter)) {
+ c = (int) dx_field_iterator_octet(iter);
+ hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+ }
+
+ return hash;
+}
+
+
+hash_t *hash(int bucket_exponent, int batch_size, int value_is_const)
+{
+ int i;
+ hash_t *h = NEW(hash_t);
+
+ if (!h)
+ return 0;
+
+ h->bucket_count = 1 << bucket_exponent;
+ h->bucket_mask = h->bucket_count - 1;
+ h->batch_size = batch_size;
+ h->size = 0;
+ h->is_const = value_is_const;
+ h->buckets = NEW_ARRAY(bucket_t, h->bucket_count);
+ for (i = 0; i < h->bucket_count; i++) {
+ DEQ_INIT(h->buckets[i].items);
+ }
+
+ return h;
+}
+
+
+void hash_free(hash_t *h)
+{
+ // TODO - Implement this
+}
+
+
+size_t hash_size(hash_t *h)
+{
+ return h ? h->size : 0;
+}
+
+
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ *error = 0;
+
+ while (item) {
+ if (dx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ if (item) {
+ *error = -1;
+ return 0;
+ }
+
+ item = new_hash_item_t();
+ if (!item) {
+ *error = -2;
+ return 0;
+ }
+
+ DEQ_ITEM_INIT(item);
+ item->key = dx_field_iterator_copy(key);
+
+ DEQ_INSERT_TAIL(h->buckets[idx].items, item);
+ h->size++;
+ return item;
+}
+
+
+int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+{
+ int error = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &error);
+
+ if (item)
+ item->v.val = val;
+ return error;
+}
+
+
+int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+{
+ if (!h->is_const)
+ return -3;
+
+ int error = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &error);
+
+ if (item)
+ item->v.val_const = val;
+ return error;
+}
+
+
+static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ while (item) {
+ if (dx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ return item;
+}
+
+
+int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
+{
+ hash_item_t *item = hash_internal_retrieve(h, key);
+ if (item) {
+ *val = item->v.val;
+ return 0;
+ }
+ return -1;
+}
+
+
+int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
+{
+ if (!h->is_const)
+ return -3;
+
+ hash_item_t *item = hash_internal_retrieve(h, key);
+ if (item) {
+ *val = item->v.val_const;
+ return 0;
+ }
+ return -1;
+}
+
+
+int hash_remove(hash_t *h, dx_field_iterator_t *key)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ while (item) {
+ if (dx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ if (item) {
+ free(item->key);
+ DEQ_REMOVE(h->buckets[idx].items, item);
+ free_hash_item_t(item);
+ h->size--;
+ return 0;
+ }
+
+ return -1;
+}
+
diff --git a/extras/dispatch/src/iovec.c b/extras/dispatch/src/iovec.c
new file mode 100644
index 0000000000..6ff6874440
--- /dev/null
+++ b/extras/dispatch/src/iovec.c
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iovec.h>
+#include <qpid/dispatch/alloc.h>
+#include <string.h>
+
+#define DX_IOVEC_MAX 64
+
+struct dx_iovec_t {
+ struct iovec iov_array[DX_IOVEC_MAX];
+ struct iovec *iov;
+ int iov_count;
+};
+
+
+ALLOC_DECLARE(dx_iovec_t);
+ALLOC_DEFINE(dx_iovec_t);
+
+
+dx_iovec_t *dx_iovec(int vector_count)
+{
+ dx_iovec_t *iov = new_dx_iovec_t();
+ if (!iov)
+ return 0;
+
+ memset(iov, 0, sizeof(dx_iovec_t));
+
+ iov->iov_count = vector_count;
+ if (vector_count > DX_IOVEC_MAX)
+ iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count);
+ else
+ iov->iov = &iov->iov_array[0];
+
+ return iov;
+}
+
+
+void dx_iovec_free(dx_iovec_t *iov)
+{
+ if (!iov)
+ return;
+
+ if (iov->iov && iov->iov != &iov->iov_array[0])
+ free(iov->iov);
+
+ free_dx_iovec_t(iov);
+}
+
+
+struct iovec *dx_iovec_array(dx_iovec_t *iov)
+{
+ if (!iov)
+ return 0;
+ return iov->iov;
+}
+
+
+int dx_iovec_count(dx_iovec_t *iov)
+{
+ if (!iov)
+ return 0;
+ return iov->iov_count;
+}
+
diff --git a/extras/dispatch/src/iterator.c b/extras/dispatch/src/iterator.c
new file mode 100644
index 0000000000..6ab67f948d
--- /dev/null
+++ b/extras/dispatch/src/iterator.c
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include "message_private.h"
+#include <stdio.h>
+#include <string.h>
+
+typedef enum {
+MODE_TO_END,
+MODE_TO_SLASH
+} parse_mode_t;
+
+struct dx_field_iterator_t {
+ dx_buffer_t *start_buffer;
+ unsigned char *start_cursor;
+ int start_length;
+ dx_buffer_t *buffer;
+ unsigned char *cursor;
+ int length;
+ dx_iterator_view_t view;
+ parse_mode_t mode;
+};
+
+
+ALLOC_DECLARE(dx_field_iterator_t);
+ALLOC_DEFINE(dx_field_iterator_t);
+
+
+typedef enum {
+STATE_START,
+STATE_SLASH_LEFT,
+STATE_SKIPPING_TO_NEXT_SLASH,
+STATE_SCANNING,
+STATE_COLON,
+STATE_COLON_SLASH,
+STATE_AT_NODE_ID
+} state_t;
+
+
+static void view_initialize(dx_field_iterator_t *iter)
+{
+ if (iter->view == ITER_VIEW_ALL) {
+ iter->mode = MODE_TO_END;
+ return;
+ }
+
+ //
+ // Advance to the node-id.
+ //
+ state_t state = STATE_START;
+ unsigned int octet;
+ while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) {
+ octet = dx_field_iterator_octet(iter);
+ switch (state) {
+ case STATE_START :
+ if (octet == '/')
+ state = STATE_SLASH_LEFT;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_SLASH_LEFT :
+ if (octet == '/')
+ state = STATE_SKIPPING_TO_NEXT_SLASH;
+ else
+ state = STATE_AT_NODE_ID;
+ break;
+
+ case STATE_SKIPPING_TO_NEXT_SLASH :
+ if (octet == '/')
+ state = STATE_AT_NODE_ID;
+ break;
+
+ case STATE_SCANNING :
+ if (octet == ':')
+ state = STATE_COLON;
+ break;
+
+ case STATE_COLON :
+ if (octet == '/')
+ state = STATE_COLON_SLASH;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_COLON_SLASH :
+ if (octet == '/')
+ state = STATE_SKIPPING_TO_NEXT_SLASH;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_AT_NODE_ID :
+ break;
+ }
+ }
+
+ if (state != STATE_AT_NODE_ID) {
+ //
+ // The address string was relative, not absolute. The node-id
+ // is at the beginning of the string.
+ //
+ iter->buffer = iter->start_buffer;
+ iter->cursor = iter->start_cursor;
+ iter->length = iter->start_length;
+ }
+
+ //
+ // Cursor is now on the first octet of the node-id
+ //
+ if (iter->view == ITER_VIEW_NODE_ID) {
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+
+ if (iter->view == ITER_VIEW_NO_HOST) {
+ iter->mode = MODE_TO_END;
+ return;
+ }
+
+ if (iter->view == ITER_VIEW_NODE_SPECIFIC) {
+ iter->mode = MODE_TO_END;
+ while (!dx_field_iterator_end(iter)) {
+ octet = dx_field_iterator_octet(iter);
+ if (octet == '/')
+ break;
+ }
+ return;
+ }
+}
+
+
+dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view)
+{
+ dx_field_iterator_t *iter = new_dx_field_iterator_t();
+ if (!iter)
+ return 0;
+
+ iter->start_buffer = 0;
+ iter->start_cursor = (unsigned char*) text;
+ iter->start_length = strlen(text);
+
+ dx_field_iterator_reset(iter, view);
+
+ return iter;
+}
+
+
+dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, dx_iterator_view_t view)
+{
+ dx_field_iterator_t *iter = new_dx_field_iterator_t();
+ if (!iter)
+ return 0;
+
+ iter->start_buffer = buffer;
+ iter->start_cursor = dx_buffer_base(buffer) + offset;
+ iter->start_length = length;
+
+ dx_field_iterator_reset(iter, view);
+
+ return iter;
+}
+
+
+void dx_field_iterator_free(dx_field_iterator_t *iter)
+{
+ free_dx_field_iterator_t(iter);
+}
+
+
+void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view)
+{
+ iter->buffer = iter->start_buffer;
+ iter->cursor = iter->start_cursor;
+ iter->length = iter->start_length;
+ iter->view = view;
+
+ view_initialize(iter);
+}
+
+
+unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter)
+{
+ if (iter->length == 0)
+ return (unsigned char) 0;
+
+ unsigned char result = *(iter->cursor);
+
+ iter->cursor++;
+ iter->length--;
+
+ if (iter->length > 0) {
+ if (iter->buffer) {
+ if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) {
+ iter->buffer = iter->buffer->next;
+ if (iter->buffer == 0)
+ iter->length = 0;
+ iter->cursor = dx_buffer_base(iter->buffer);
+ }
+ }
+ }
+
+ if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/')
+ iter->length = 0;
+
+ return result;
+}
+
+
+int dx_field_iterator_end(dx_field_iterator_t *iter)
+{
+ return iter->length == 0;
+}
+
+
+int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string)
+{
+ dx_field_iterator_reset(iter, iter->view);
+ while (!dx_field_iterator_end(iter) && *string) {
+ if (*string != dx_field_iterator_octet(iter))
+ return 0;
+ string++;
+ }
+
+ return (dx_field_iterator_end(iter) && (*string == 0));
+}
+
+
+unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter)
+{
+ int length = 0;
+ int idx = 0;
+ unsigned char *copy;
+
+ dx_field_iterator_reset(iter, iter->view);
+ while (!dx_field_iterator_end(iter)) {
+ dx_field_iterator_octet(iter);
+ length++;
+ }
+
+ dx_field_iterator_reset(iter, iter->view);
+ copy = (unsigned char*) malloc(length + 1);
+ while (!dx_field_iterator_end(iter))
+ copy[idx++] = dx_field_iterator_octet(iter);
+ copy[idx] = '\0';
+
+ return copy;
+}
+
diff --git a/extras/dispatch/src/log.c b/extras/dispatch/src/log.c
new file mode 100644
index 0000000000..d4ec534915
--- /dev/null
+++ b/extras/dispatch/src/log.c
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/log.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+static int mask=LOG_INFO;
+
+static char *cls_prefix(int cls)
+{
+ switch (cls) {
+ case LOG_TRACE : return "TRACE";
+ case LOG_ERROR : return "ERROR";
+ case LOG_INFO : return "INFO";
+ }
+
+ return "";
+}
+
+void dx_log(const char *module, int cls, const char *fmt, ...)
+{
+ if (!(cls & mask))
+ return;
+
+ va_list ap;
+ char line[128];
+
+ va_start(ap, fmt);
+ vsnprintf(line, 127, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line);
+}
+
+void dx_log_set_mask(int _mask)
+{
+ mask = _mask;
+}
+
diff --git a/extras/dispatch/src/message.c b/extras/dispatch/src/message.c
new file mode 100644
index 0000000000..f66e79010c
--- /dev/null
+++ b/extras/dispatch/src/message.c
@@ -0,0 +1,1120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include "message_private.h"
+#include <string.h>
+#include <stdio.h>
+
+ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
+ALLOC_DEFINE(dx_message_content_t);
+
+
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+{
+ unsigned char *local_cursor = *cursor;
+ dx_buffer_t *local_buffer = *buffer;
+
+ int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
+ while (consume > 0) {
+ if (consume < remaining) {
+ local_cursor += consume;
+ consume = 0;
+ } else {
+ consume -= remaining;
+ local_buffer = local_buffer->next;
+ if (local_buffer == 0){
+ local_cursor = 0;
+ break;
+ }
+ local_cursor = dx_buffer_base(local_buffer);
+ remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
+ }
+ }
+
+ *cursor = local_cursor;
+ *buffer = local_buffer;
+}
+
+
+static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
+{
+ unsigned char result = **cursor;
+ advance(cursor, buffer, 1);
+ return result;
+}
+
+
+static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field)
+{
+ unsigned char tag = next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ int consume = 0;
+ switch (tag & 0xF0) {
+ case 0x40 : consume = 0; break;
+ case 0x50 : consume = 1; break;
+ case 0x60 : consume = 2; break;
+ case 0x70 : consume = 4; break;
+ case 0x80 : consume = 8; break;
+ case 0x90 : consume = 16; break;
+
+ case 0xB0 :
+ case 0xD0 :
+ case 0xF0 :
+ consume |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ consume |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ consume |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ // Fall through to the next case...
+
+ case 0xA0 :
+ case 0xC0 :
+ case 0xE0 :
+ consume |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ break;
+ }
+
+ if (field) {
+ field->buffer = *buffer;
+ field->offset = *cursor - dx_buffer_base(*buffer);
+ field->length = consume;
+ field->parsed = 1;
+ }
+
+ advance(cursor, buffer, consume);
+ return 1;
+}
+
+
+static int start_list(unsigned char **cursor, dx_buffer_t **buffer)
+{
+ unsigned char tag = next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ int length = 0;
+ int count = 0;
+
+ switch (tag) {
+ case 0x45 : // list0
+ break;
+ case 0xd0 : // list32
+ length |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ length |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ length |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ length |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ count |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ count |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ count |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ count |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ break;
+
+ case 0xc0 : // list8
+ length |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ count |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ break;
+ }
+
+ return count;
+}
+
+
+//
+// Check the buffer chain, starting at cursor to see if it matches the pattern.
+// If the pattern matches, check the next tag to see if it's in the set of expected
+// tags. If not, return zero. If so, set the location descriptor to the good
+// tag and advance the cursor (and buffer, if needed) to the end of the matched section.
+//
+// If there is no match, don't advance the cursor.
+//
+// Return 0 if the pattern matches but the following tag is unexpected
+// Return 0 if the pattern matches and the location already has a pointer (duplicate section)
+// Return 1 if the pattern matches and we've advanced the cursor/buffer
+// Return 1 if the pattern does not match
+//
+static int dx_check_and_advance(dx_buffer_t **buffer,
+ unsigned char **cursor,
+ unsigned char *pattern,
+ int pattern_length,
+ unsigned char *expected_tags,
+ dx_field_location_t *location)
+{
+ dx_buffer_t *test_buffer = *buffer;
+ unsigned char *test_cursor = *cursor;
+
+ if (!test_cursor)
+ return 1; // no match
+
+ unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer);
+ int idx = 0;
+
+ while (idx < pattern_length && *test_cursor == pattern[idx]) {
+ idx++;
+ test_cursor++;
+ if (test_cursor == end_of_buffer) {
+ test_buffer = test_buffer->next;
+ if (test_buffer == 0)
+ return 1; // Pattern didn't match
+ test_cursor = dx_buffer_base(test_buffer);
+ end_of_buffer = test_cursor + dx_buffer_size(test_buffer);
+ }
+ }
+
+ if (idx < pattern_length)
+ return 1; // Pattern didn't match
+
+ //
+ // Pattern matched, check the tag
+ //
+ while (*expected_tags && *test_cursor != *expected_tags)
+ expected_tags++;
+ if (*expected_tags == 0)
+ return 0; // Unexpected tag
+
+ if (location->parsed)
+ return 0; // Duplicate section
+
+ //
+ // Pattern matched and tag is expected. Mark the beginning of the section.
+ //
+ location->parsed = 1;
+ location->buffer = test_buffer;
+ location->offset = test_cursor - dx_buffer_base(test_buffer);
+ location->length = 0;
+
+ //
+ // Advance the pointers to consume the whole section.
+ //
+ int consume = 0;
+ unsigned char tag = next_octet(&test_cursor, &test_buffer);
+ if (!test_cursor) return 0;
+ switch (tag) {
+ case 0x45 : // list0
+ break;
+
+ case 0xd0 : // list32
+ case 0xd1 : // map32
+ case 0xb0 : // vbin32
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24;
+ if (!test_cursor) return 0;
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16;
+ if (!test_cursor) return 0;
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8;
+ if (!test_cursor) return 0;
+ // Fall through to the next case...
+
+ case 0xc0 : // list8
+ case 0xc1 : // map8
+ case 0xa0 : // vbin8
+ consume |= (int) next_octet(&test_cursor, &test_buffer);
+ if (!test_cursor) return 0;
+ break;
+ }
+
+ if (consume)
+ advance(&test_cursor, &test_buffer, consume);
+
+ *cursor = test_cursor;
+ *buffer = test_buffer;
+ return 1;
+}
+
+
+static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len)
+{
+ dx_buffer_t *buf = DEQ_TAIL(msg->buffers);
+
+ while (len > 0) {
+ if (buf == 0 || dx_buffer_capacity(buf) == 0) {
+ buf = dx_allocate_buffer();
+ if (buf == 0)
+ return;
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ }
+
+ size_t to_copy = dx_buffer_capacity(buf);
+ if (to_copy > len)
+ to_copy = len;
+ memcpy(dx_buffer_cursor(buf), seq, to_copy);
+ dx_buffer_insert(buf, to_copy);
+ len -= to_copy;
+ seq += to_copy;
+ msg->length += to_copy;
+ }
+}
+
+
+static void dx_insert_8(dx_message_content_t *msg, uint8_t value)
+{
+ dx_insert(msg, &value, 1);
+}
+
+
+static void dx_insert_32(dx_message_content_t *msg, uint32_t value)
+{
+ uint8_t buf[4];
+ buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
+ buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
+ buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
+ buf[3] = (uint8_t) (value & 0x000000FF);
+ dx_insert(msg, buf, 4);
+}
+
+
+static void dx_insert_64(dx_message_content_t *msg, uint64_t value)
+{
+ uint8_t buf[8];
+ buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
+ buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48);
+ buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40);
+ buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32);
+ buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24);
+ buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16);
+ buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8);
+ buf[7] = (uint8_t) (value & 0x00000000000000FFL);
+ dx_insert(msg, buf, 8);
+}
+
+
+static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value)
+{
+ while (*buf) {
+ if (*cursor >= dx_buffer_size(*buf)) {
+ *buf = (*buf)->next;
+ *cursor = 0;
+ } else {
+ dx_buffer_base(*buf)[*cursor] = value;
+ (*cursor)++;
+ return;
+ }
+ }
+}
+
+
+static void dx_overwrite_32(dx_field_location_t *field, uint32_t value)
+{
+ dx_buffer_t *buf = field->buffer;
+ size_t cursor = field->offset;
+
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF));
+}
+
+
+static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code)
+{
+ //
+ // Insert the short-form performative tag
+ //
+ dx_insert(msg, (const uint8_t*) "\x00\x53", 2);
+ dx_insert_8(msg, code);
+
+ //
+ // Open the list with a list32 tag
+ //
+ dx_insert_8(msg, 0xd0);
+
+ //
+ // Mark the current location to later overwrite the length
+ //
+ msg->compose_length.buffer = DEQ_TAIL(msg->buffers);
+ msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer);
+ msg->compose_length.length = 4;
+ msg->compose_length.parsed = 1;
+
+ dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ //
+ // Mark the current location to later overwrite the count
+ //
+ msg->compose_count.buffer = DEQ_TAIL(msg->buffers);
+ msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer);
+ msg->compose_count.length = 4;
+ msg->compose_count.parsed = 1;
+
+ dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ msg->length = 4; // Include the length of the count field
+ msg->count = 0;
+}
+
+
+static void dx_end_list(dx_message_content_t *msg)
+{
+ dx_overwrite_32(&msg->compose_length, msg->length);
+ dx_overwrite_32(&msg->compose_count, msg->count);
+}
+
+
+static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+
+ switch (field) {
+ case DX_FIELD_TO:
+ while (1) {
+ if (content->field_to.parsed)
+ return &content->field_to;
+
+ if (content->section_message_properties.parsed == 0)
+ break;
+
+ dx_buffer_t *buffer = content->section_message_properties.buffer;
+ unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset;
+
+ int count = start_list(&cursor, &buffer);
+ int result;
+
+ if (count < 3)
+ break;
+
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &content->field_to); // to
+ if (!result) return 0;
+ }
+ break;
+
+ case DX_FIELD_BODY:
+ while (1) {
+ if (content->body.parsed)
+ return &content->body;
+
+ if (content->section_body.parsed == 0)
+ break;
+
+ dx_buffer_t *buffer = content->section_body.buffer;
+ unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset;
+ int result;
+
+ result = traverse_field(&cursor, &buffer, &content->body);
+ if (!result) return 0;
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+
+dx_message_t *dx_allocate_message()
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t();
+ if (!msg)
+ return 0;
+
+ DEQ_ITEM_INIT(msg);
+ msg->content = new_dx_message_content_t();
+ msg->out_delivery = 0;
+
+ if (msg->content == 0) {
+ free_dx_message_t((dx_message_t*) msg);
+ return 0;
+ }
+
+ memset(msg->content, 0, sizeof(dx_message_content_t));
+ msg->content->lock = sys_mutex();
+ msg->content->ref_count = 1;
+
+ return (dx_message_t*) msg;
+}
+
+
+void dx_free_message(dx_message_t *in_msg)
+{
+ uint32_t rc;
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+
+ sys_mutex_lock(content->lock);
+ rc = --content->ref_count;
+ sys_mutex_unlock(content->lock);
+
+ if (rc == 0) {
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->buffers);
+ dx_free_buffer(buf);
+ buf = DEQ_HEAD(content->buffers);
+ }
+
+ sys_mutex_free(content->lock);
+ free_dx_message_content_t(content);
+ }
+
+ free_dx_message_t((dx_message_t*) msg);
+}
+
+
+dx_message_t *dx_message_copy(dx_message_t *in_msg)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t();
+
+ if (!copy)
+ return 0;
+
+ DEQ_ITEM_INIT(copy);
+ copy->content = content;
+ copy->out_delivery = 0;
+
+ sys_mutex_lock(content->lock);
+ content->ref_count++;
+ sys_mutex_unlock(content->lock);
+
+ return (dx_message_t*) copy;
+}
+
+
+void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+{
+ ((dx_message_pvt_t*) msg)->out_delivery = delivery;
+}
+
+
+pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
+{
+ return ((dx_message_pvt_t*) msg)->out_delivery;
+}
+
+
+void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ content->in_delivery = delivery;
+}
+
+
+pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ return content->in_delivery;
+}
+
+
+dx_message_t *dx_message_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *link = pn_delivery_link(delivery);
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery);
+ ssize_t rc;
+ dx_buffer_t *buf;
+
+ //
+ // If there is no message associated with the delivery, this is the first time
+ // we've received anything on this delivery. Allocate a message descriptor and
+ // link it and the delivery together.
+ //
+ if (!msg) {
+ msg = (dx_message_pvt_t*) dx_allocate_message();
+ pn_delivery_set_context(delivery, (void*) msg);
+
+ //
+ // Record the incoming delivery only if it is not settled. If it is
+ // settled, it should not be recorded as no future operations on it are
+ // permitted.
+ //
+ if (!pn_delivery_settled(delivery))
+ msg->content->in_delivery = delivery;
+ }
+
+ //
+ // Get a reference to the tail buffer on the message. This is the buffer into which
+ // we will store incoming message data. If there is no buffer in the message, allocate
+ // an empty one and add it to the message.
+ //
+ buf = DEQ_TAIL(msg->content->buffers);
+ if (!buf) {
+ buf = dx_allocate_buffer();
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
+ }
+
+ while (1) {
+ //
+ // Try to receive enough data to fill the remaining space in the tail buffer.
+ //
+ rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf));
+
+ //
+ // If we receive PN_EOS, we have come to the end of the message.
+ //
+ if (rc == PN_EOS) {
+ //
+ // If the last buffer in the list is empty, remove it and free it. This
+ // will only happen if the size of the message content is an exact multiple
+ // of the buffer size.
+ //
+ if (dx_buffer_size(buf) == 0) {
+ DEQ_REMOVE_TAIL(msg->content->buffers);
+ dx_free_buffer(buf);
+ }
+ return (dx_message_t*) msg;
+ }
+
+ if (rc > 0) {
+ //
+ // We have received a positive number of bytes for the message. Advance
+ // the cursor in the buffer.
+ //
+ dx_buffer_insert(buf, rc);
+
+ //
+ // If the buffer is full, allocate a new empty buffer and append it to the
+ // tail of the message's list.
+ //
+ if (dx_buffer_capacity(buf) == 0) {
+ buf = dx_allocate_buffer();
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
+ }
+ } else
+ //
+ // We received zero bytes, and no PN_EOS. This means that we've received
+ // all of the data available up to this point, but it does not constitute
+ // the entire message. We'll be back later to finish it up.
+ //
+ break;
+ }
+
+ return 0;
+}
+
+
+void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+
+ // TODO - Handle cases where annotations have been added or modified
+ while (buf) {
+ pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+ buf = DEQ_NEXT(buf);
+ }
+}
+
+
+int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth)
+{
+
+#define LONG 10
+#define SHORT 3
+#define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"
+#define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70"
+#define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"
+#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71"
+#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
+#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72"
+#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
+#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
+#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
+#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74"
+#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
+#define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75"
+#define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"
+#define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76"
+#define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"
+#define FOOTER_SHORT (unsigned char*) "\x00\x53\x78"
+#define TAGS_LIST (unsigned char*) "\x45\xc0\xd0"
+#define TAGS_MAP (unsigned char*) "\xc1\xd1"
+#define TAGS_BINARY (unsigned char*) "\xa0\xb0"
+
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_buffer_t *buffer = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
+
+ if (!buffer)
+ return 0; // Invalid - No data in the message
+
+ if (depth == DX_DEPTH_NONE)
+ return 1;
+
+ cursor = dx_buffer_base(buffer);
+
+ //
+ // MESSAGE HEADER
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header))
+ return 0;
+
+ if (depth == DX_DEPTH_HEADER)
+ return 1;
+
+ //
+ // DELIVERY ANNOTATION
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation))
+ return 0;
+
+ if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS)
+ return 1;
+
+ //
+ // MESSAGE ANNOTATION
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation))
+ return 0;
+
+ if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS)
+ return 1;
+
+ //
+ // PROPERTIES
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties))
+ return 0;
+
+ if (depth == DX_DEPTH_PROPERTIES)
+ return 1;
+
+ //
+ // APPLICATION PROPERTIES
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties))
+ return 0;
+
+ if (depth == DX_DEPTH_APPLICATION_PROPERTIES)
+ return 1;
+
+ //
+ // BODY (Note that this function expects a single data section or a single AMQP sequence)
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body))
+ return 0;
+
+ if (depth == DX_DEPTH_BODY)
+ return 1;
+
+ //
+ // FOOTER
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer))
+ return 0;
+
+ return 1;
+}
+
+
+dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field)
+{
+ dx_field_location_t *loc = dx_message_field_location(msg, field);
+ if (!loc)
+ return 0;
+
+ return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL);
+}
+
+
+dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field)
+{
+ dx_field_location_t *loc = dx_message_field_location(msg, field);
+ if (!loc)
+ return 0;
+
+ //
+ // Count the number of buffers this field straddles
+ //
+ int bufcnt = 1;
+ dx_buffer_t *buf = loc->buffer;
+ size_t bufsize = dx_buffer_size(buf) - loc->offset;
+ ssize_t remaining = loc->length - bufsize;
+
+ while (remaining > 0) {
+ bufcnt++;
+ buf = buf->next;
+ if (!buf)
+ return 0;
+ remaining -= dx_buffer_size(buf);
+ }
+
+ //
+ // Allocate an iovec object big enough to hold the number of buffers
+ //
+ dx_iovec_t *iov = dx_iovec(bufcnt);
+ if (!iov)
+ return 0;
+
+ //
+ // Build out the io vectors with pointers to the segments of the field in buffers
+ //
+ bufcnt = 0;
+ buf = loc->buffer;
+ bufsize = dx_buffer_size(buf) - loc->offset;
+ void *base = dx_buffer_base(buf) + loc->offset;
+ remaining = loc->length;
+
+ while (remaining > 0) {
+ dx_iovec_array(iov)[bufcnt].iov_base = base;
+ dx_iovec_array(iov)[bufcnt].iov_len = bufsize;
+ bufcnt++;
+ remaining -= bufsize;
+ if (remaining > 0) {
+ buf = buf->next;
+ base = dx_buffer_base(buf);
+ bufsize = dx_buffer_size(buf);
+ if (bufsize > remaining)
+ bufsize = remaining;
+ }
+ }
+
+ return iov;
+}
+
+
+void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers)
+{
+ dx_message_begin_header(msg);
+ dx_message_insert_boolean(msg, 0); // durable
+ //dx_message_insert_null(msg); // priority
+ //dx_message_insert_null(msg); // ttl
+ //dx_message_insert_boolean(msg, 0); // first-acquirer
+ //dx_message_insert_uint(msg, 0); // delivery-count
+ dx_message_end_header(msg);
+
+ dx_message_begin_message_properties(msg);
+ dx_message_insert_null(msg); // message-id
+ dx_message_insert_null(msg); // user-id
+ dx_message_insert_string(msg, to); // to
+ //dx_message_insert_null(msg); // subject
+ //dx_message_insert_null(msg); // reply-to
+ //dx_message_insert_null(msg); // correlation-id
+ //dx_message_insert_null(msg); // content-type
+ //dx_message_insert_null(msg); // content-encoding
+ //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time
+ //dx_message_insert_timestamp(msg, 0); // creation-time
+ //dx_message_insert_null(msg); // group-id
+ //dx_message_insert_uint(msg, 0); // group-sequence
+ //dx_message_insert_null(msg); // reply-to-group-id
+ dx_message_end_message_properties(msg);
+
+ if (buffers)
+ dx_message_append_body_data(msg, buffers);
+}
+
+
+void dx_message_begin_header(dx_message_t *msg)
+{
+ dx_start_list_performative(MSG_CONTENT(msg), 0x70);
+}
+
+
+void dx_message_end_header(dx_message_t *msg)
+{
+ dx_end_list(MSG_CONTENT(msg));
+}
+
+
+void dx_message_begin_delivery_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_delivery_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_begin_message_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_message_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_begin_message_properties(dx_message_t *msg)
+{
+ dx_start_list_performative(MSG_CONTENT(msg), 0x73);
+}
+
+
+void dx_message_end_message_properties(dx_message_t *msg)
+{
+ dx_end_list(MSG_CONTENT(msg));
+}
+
+
+void dx_message_begin_application_properties(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_application_properties(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_buffer_t *buf = DEQ_HEAD(*buffers);
+ uint32_t len = 0;
+
+ //
+ // Calculate the size of the body to be appended.
+ //
+ while (buf) {
+ len += dx_buffer_size(buf);
+ buf = DEQ_NEXT(buf);
+ }
+
+ //
+ // Insert a DATA section performative header.
+ //
+ dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3);
+ if (len < 256) {
+ dx_insert_8(content, 0xa0); // vbin8
+ dx_insert_8(content, (uint8_t) len);
+ } else {
+ dx_insert_8(content, 0xb0); // vbin32
+ dx_insert_32(content, len);
+ }
+
+ //
+ // Move the supplied buffers to the tail of the message's buffer list.
+ //
+ buf = DEQ_HEAD(*buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(*buffers);
+ DEQ_INSERT_TAIL(content->buffers, buf);
+ buf = DEQ_HEAD(*buffers);
+ }
+}
+
+
+void dx_message_begin_body_sequence(dx_message_t *msg)
+{
+}
+
+
+void dx_message_end_body_sequence(dx_message_t *msg)
+{
+}
+
+
+void dx_message_begin_footer(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_footer(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_insert_null(dx_message_t *msg)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x40);
+ content->count++;
+}
+
+
+void dx_message_insert_boolean(dx_message_t *msg, int value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (value)
+ dx_insert(content, (const uint8_t*) "\x56\x01", 2);
+ else
+ dx_insert(content, (const uint8_t*) "\x56\x00", 2);
+ content->count++;
+}
+
+
+void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x50);
+ dx_insert_8(content, value);
+ content->count++;
+}
+
+
+void dx_message_insert_uint(dx_message_t *msg, uint32_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (value == 0) {
+ dx_insert_8(content, 0x43); // uint0
+ } else if (value < 256) {
+ dx_insert_8(content, 0x52); // smalluint
+ dx_insert_8(content, (uint8_t) value);
+ } else {
+ dx_insert_8(content, 0x70); // uint
+ dx_insert_32(content, value);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_ulong(dx_message_t *msg, uint64_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (value == 0) {
+ dx_insert_8(content, 0x44); // ulong0
+ } else if (value < 256) {
+ dx_insert_8(content, 0x53); // smallulong
+ dx_insert_8(content, (uint8_t) value);
+ } else {
+ dx_insert_8(content, 0x80); // ulong
+ dx_insert_64(content, value);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (len < 256) {
+ dx_insert_8(content, 0xa0); // vbin8
+ dx_insert_8(content, (uint8_t) len);
+ } else {
+ dx_insert_8(content, 0xb0); // vbin32
+ dx_insert_32(content, len);
+ }
+ dx_insert(content, start, len);
+ content->count++;
+}
+
+
+void dx_message_insert_string(dx_message_t *msg, const char *start)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ uint32_t len = strlen(start);
+
+ if (len < 256) {
+ dx_insert_8(content, 0xa1); // str8-utf8
+ dx_insert_8(content, (uint8_t) len);
+ dx_insert(content, (const uint8_t*) start, len);
+ } else {
+ dx_insert_8(content, 0xb1); // str32-utf8
+ dx_insert_32(content, len);
+ dx_insert(content, (const uint8_t*) start, len);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x98); // uuid
+ dx_insert(content, value, 16);
+ content->count++;
+}
+
+
+void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (len < 256) {
+ dx_insert_8(content, 0xa3); // sym8
+ dx_insert_8(content, (uint8_t) len);
+ dx_insert(content, (const uint8_t*) start, len);
+ } else {
+ dx_insert_8(content, 0xb3); // sym32
+ dx_insert_32(content, len);
+ dx_insert(content, (const uint8_t*) start, len);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x83); // timestamp
+ dx_insert_64(content, value);
+ content->count++;
+}
+
+
+void dx_message_begin_list(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_list(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_begin_map(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_map(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
diff --git a/extras/dispatch/src/message_private.h b/extras/dispatch/src/message_private.h
new file mode 100644
index 0000000000..5fb18078f5
--- /dev/null
+++ b/extras/dispatch/src/message_private.h
@@ -0,0 +1,94 @@
+#ifndef __message_private_h__
+#define __message_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/threading.h>
+
+/**
+ * Architecture of the message module:
+ *
+ * +--------------+ +----------------------+
+ * | | | |
+ * | dx_message_t |----------->| dx_message_content_t |
+ * | | +----->| |
+ * +--------------+ | +----------------------+
+ * | |
+ * +--------------+ | | +-------------+ +-------------+ +-------------+
+ * | | | +--->| dx_buffer_t |-->| dx_buffer_t |-->| dx_buffer_t |--/
+ * | dx_message_t |-----+ +-------------+ +-------------+ +-------------+
+ * | |
+ * +--------------+
+ *
+ * The message module provides chained-fixed-sized-buffer storage of message content with multiple
+ * references. If a message is received and is to be queued for multiple destinations, there is only
+ * one copy of the message content in memory but multiple lightweight references to the content.
+ *
+ */
+
+typedef struct {
+ dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
+} dx_field_location_t;
+
+
+// TODO - consider using pointers to dx_field_location_t below to save memory
+// TODO - we need a second buffer list for modified annotations and header
+// There are three message scenarios:
+// 1) Received message is held and forwarded unmodified - single buffer list
+// 2) Received message is held and modified before forwarding - two buffer lists
+// 3) Message is composed internally - single buffer list
+
+typedef struct {
+ sys_mutex_t *lock;
+ uint32_t ref_count; // The number of qmessages referencing this
+ dx_buffer_list_t buffers; // The buffer chain containing the message
+ pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ dx_field_location_t section_message_header; // The message header list
+ dx_field_location_t section_delivery_annotation; // The delivery annotation map
+ dx_field_location_t section_message_annotation; // The message annotation map
+ dx_field_location_t section_message_properties; // The message properties list
+ dx_field_location_t section_application_properties; // The application properties list
+ dx_field_location_t section_body; // The message body: Data
+ dx_field_location_t section_footer; // The footer
+ dx_field_location_t field_user_id; // The string value of the user-id
+ dx_field_location_t field_to; // The string value of the to field
+ dx_field_location_t body; // The body of the message
+ dx_field_location_t compose_length;
+ dx_field_location_t compose_count;
+ uint32_t length;
+ uint32_t count;
+} dx_message_content_t;
+
+typedef struct {
+ DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t
+ dx_message_content_t *content;
+ pn_delivery_t *out_delivery;
+} dx_message_pvt_t;
+
+ALLOC_DECLARE(dx_message_t);
+ALLOC_DECLARE(dx_message_content_t);
+
+#define MSG_CONTENT(m) (((dx_message_pvt_t*) m)->content)
+
+#endif
diff --git a/extras/dispatch/src/posix/threading.c b/extras/dispatch/src/posix/threading.c
new file mode 100644
index 0000000000..8edce86cdc
--- /dev/null
+++ b/extras/dispatch/src/posix/threading.c
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/ctools.h>
+#include <stdio.h>
+#include <pthread.h>
+
+struct sys_mutex_t {
+ pthread_mutex_t mutex;
+ int acquired;
+};
+
+sys_mutex_t *sys_mutex(void)
+{
+ sys_mutex_t *mutex = NEW(sys_mutex_t);
+ pthread_mutex_init(&(mutex->mutex), 0);
+ mutex->acquired = 0;
+ return mutex;
+}
+
+
+void sys_mutex_free(sys_mutex_t *mutex)
+{
+ assert(!mutex->acquired);
+ pthread_mutex_destroy(&(mutex->mutex));
+ free(mutex);
+}
+
+
+void sys_mutex_lock(sys_mutex_t *mutex)
+{
+ pthread_mutex_lock(&(mutex->mutex));
+ assert(!mutex->acquired);
+ mutex->acquired++;
+}
+
+
+void sys_mutex_unlock(sys_mutex_t *mutex)
+{
+ mutex->acquired--;
+ assert(!mutex->acquired);
+ pthread_mutex_unlock(&(mutex->mutex));
+}
+
+
+struct sys_cond_t {
+ pthread_cond_t cond;
+};
+
+
+sys_cond_t *sys_cond(void)
+{
+ sys_cond_t *cond = NEW(sys_cond_t);
+ pthread_cond_init(&(cond->cond), 0);
+ return cond;
+}
+
+
+void sys_cond_free(sys_cond_t *cond)
+{
+ pthread_cond_destroy(&(cond->cond));
+ free(cond);
+}
+
+
+void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex)
+{
+ assert(held_mutex->acquired);
+ held_mutex->acquired--;
+ pthread_cond_wait(&(cond->cond), &(held_mutex->mutex));
+ held_mutex->acquired++;
+}
+
+
+void sys_cond_signal(sys_cond_t *cond)
+{
+ pthread_cond_signal(&(cond->cond));
+}
+
+
+void sys_cond_signal_all(sys_cond_t *cond)
+{
+ pthread_cond_broadcast(&(cond->cond));
+}
+
+
+struct sys_thread_t {
+ pthread_t thread;
+};
+
+sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg)
+{
+ sys_thread_t *thread = NEW(sys_thread_t);
+ pthread_create(&(thread->thread), 0, run_function, arg);
+ return thread;
+}
+
+
+void sys_thread_free(sys_thread_t *thread)
+{
+ free(thread);
+}
+
+
+void sys_thread_join(sys_thread_t *thread)
+{
+ pthread_join(thread->thread, 0);
+}
+
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
new file mode 100644
index 0000000000..6ddc8f45dd
--- /dev/null
+++ b/extras/dispatch/src/router_node.c
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/router.h>
+
+static char *module="ROUTER_NODE";
+
+struct dx_router_t {
+ dx_node_t *node;
+ dx_link_list_t in_links;
+ dx_link_list_t out_links;
+ dx_message_list_t in_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ hash_t *out_hash;
+ uint64_t dtag;
+};
+
+
+typedef struct {
+ dx_link_t *link;
+ dx_message_list_t out_fifo;
+} dx_router_link_t;
+
+
+ALLOC_DECLARE(dx_router_link_t);
+ALLOC_DEFINE(dx_router_link_t);
+
+
+/**
+ * Outbound Delivery Handler
+ */
+static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_message_t *msg;
+ size_t size;
+
+ sys_mutex_lock(router->lock);
+ msg = DEQ_HEAD(rlink->out_fifo);
+ if (!msg) {
+ // TODO - Recind the delivery
+ sys_mutex_unlock(router->lock);
+ return;
+ }
+
+ DEQ_REMOVE_HEAD(rlink->out_fifo);
+ size = (DEQ_SIZE(rlink->out_fifo));
+ sys_mutex_unlock(router->lock);
+
+ dx_message_send(msg, pn_link);
+
+ //
+ // If there is no incoming delivery, it was pre-settled. In this case,
+ // we must pre-settle the outgoing delivery as well.
+ //
+ if (dx_message_in_delivery(msg)) {
+ pn_delivery_set_context(delivery, (void*) msg);
+ dx_message_set_out_delivery(msg, delivery);
+ } else {
+ pn_delivery_settle(delivery);
+ dx_free_message(msg);
+ }
+
+ pn_link_advance(pn_link);
+ pn_link_offered(pn_link, size);
+}
+
+
+/**
+ * Inbound Delivery Handler
+ */
+static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_message_t *msg;
+ int valid_message = 0;
+
+ //
+ // Receive the message into a local representation. If the returned message
+ // pointer is NULL, we have not yet received a complete message.
+ //
+ sys_mutex_lock(router->lock);
+ msg = dx_message_receive(delivery);
+ sys_mutex_unlock(router->lock);
+
+ if (!msg)
+ return;
+
+ //
+ // Validate the message through the Properties section
+ //
+ valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
+ pn_link_advance(pn_link);
+ pn_link_flow(pn_link, 1);
+
+ if (valid_message) {
+ dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
+ dx_router_link_t *rlink;
+ if (iter) {
+ dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ sys_mutex_lock(router->lock);
+ int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+ dx_field_iterator_free(iter);
+
+ if (result == 0) {
+ //
+ // To field is valid and contains a known destination. Enqueue on
+ // the output fifo for the next-hop-to-destination.
+ //
+ pn_link_t* pn_outlink = dx_link_pn(rlink->link);
+ DEQ_INSERT_TAIL(rlink->out_fifo, msg);
+ pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo));
+ dx_link_activate(rlink->link);
+ } else {
+ //
+ // To field contains an unknown address. Release the message.
+ //
+ pn_delivery_update(delivery, PN_RELEASED);
+ pn_delivery_settle(delivery);
+ }
+
+ sys_mutex_unlock(router->lock);
+ }
+ } else {
+ //
+ // Message is invalid. Reject the message.
+ //
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);
+ pn_delivery_set_context(delivery, 0);
+ dx_free_message(msg);
+ }
+}
+
+
+/**
+ * Delivery Disposition Handler
+ */
+static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+
+ if (pn_link_is_sender(pn_link)) {
+ pn_disposition_t disp = pn_delivery_remote_state(delivery);
+ dx_message_t *msg = pn_delivery_get_context(delivery);
+ pn_delivery_t *activate = 0;
+
+ if (msg) {
+ assert(delivery == dx_message_out_delivery(msg));
+ if (disp != 0) {
+ activate = dx_message_in_delivery(msg);
+ pn_delivery_update(activate, disp);
+ // TODO - handling of the data accompanying RECEIVED/MODIFIED
+ }
+
+ if (pn_delivery_settled(delivery)) {
+ //
+ // Downstream delivery has been settled. Propagate the settlement
+ // upstream.
+ //
+ activate = dx_message_in_delivery(msg);
+ pn_delivery_settle(activate);
+ pn_delivery_settle(delivery);
+ dx_free_message(msg);
+ }
+
+ if (activate) {
+ //
+ // Activate the upstream/incoming link so that the settlement will
+ // get pushed out.
+ //
+ dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate));
+ dx_link_activate(act_link);
+ }
+
+ return;
+ }
+ }
+
+ pn_delivery_settle(delivery);
+}
+
+
+/**
+ * New Incoming Link Handler
+ */
+static int router_incoming_link_handler(void* context, dx_link_t *link)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ dx_link_item_t *item = new_dx_link_item_t();
+ pn_link_t *pn_link = dx_link_pn(link);
+
+ if (item) {
+ DEQ_ITEM_INIT(item);
+ item->link = link;
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, item);
+ sys_mutex_unlock(router->lock);
+
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_flow(pn_link, 8);
+ pn_link_open(pn_link);
+ } else {
+ pn_link_close(pn_link);
+ }
+ return 0;
+}
+
+
+/**
+ * New Outgoing Link Handler
+ */
+static int router_outgoing_link_handler(void* context, dx_link_t *link)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+
+ sys_mutex_lock(router->lock);
+ dx_router_link_t *rlink = new_dx_router_link_t();
+ rlink->link = link;
+ DEQ_INIT(rlink->out_fifo);
+ dx_link_set_context(link, rlink);
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ int result = hash_insert(router->out_hash, iter, rlink);
+ dx_field_iterator_free(iter);
+
+ if (result == 0) {
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_open(pn_link);
+ sys_mutex_unlock(router->lock);
+ dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
+ return 0;
+ }
+
+ dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
+ pn_link_close(pn_link);
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+/**
+ * Outgoing Link Writable Handler
+ */
+static int router_writable_link_handler(void* context, dx_link_t *link)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ int grant_delivery = 0;
+ pn_delivery_t *delivery;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ pn_link_t *pn_link = dx_link_pn(link);
+ uint64_t tag;
+
+ sys_mutex_lock(router->lock);
+ if (DEQ_SIZE(rlink->out_fifo) > 0) {
+ grant_delivery = 1;
+ tag = router->dtag++;
+ }
+ sys_mutex_unlock(router->lock);
+
+ if (grant_delivery) {
+ pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
+ delivery = pn_link_current(pn_link);
+ if (delivery) {
+ router_tx_handler(context, link, delivery);
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+/**
+ * Link Detached Handler
+ */
+static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_link_item_t *item;
+
+ sys_mutex_lock(router->lock);
+ if (pn_link_is_sender(pn_link)) {
+ item = DEQ_HEAD(router->out_links);
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ dx_router_link_t *rlink;
+ if (iter) {
+ int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+ if (result == 0) {
+ dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ hash_remove(router->out_hash, iter);
+ free_dx_router_link_t(rlink);
+ dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ }
+ dx_field_iterator_free(iter);
+ }
+ }
+ else
+ item = DEQ_HEAD(router->in_links);
+
+ while (item) {
+ if (item->link == link) {
+ if (pn_link_is_sender(pn_link))
+ DEQ_REMOVE(router->out_links, item);
+ else
+ DEQ_REMOVE(router->in_links, item);
+ free_dx_link_item_t(item);
+ break;
+ }
+ item = item->next;
+ }
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static void router_inbound_open_handler(void *type_context, dx_connection_t *conn)
+{
+}
+
+
+static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
+{
+}
+
+
+static void dx_router_timer_handler(void *context)
+{
+ dx_router_t *router = (dx_router_t*) context;
+
+ //
+ // Periodic processing.
+ //
+ dx_timer_schedule(router->timer, 1000);
+}
+
+
+static dx_node_type_t router_node = {"router", 0, 0,
+ router_rx_handler,
+ router_tx_handler,
+ router_disp_handler,
+ router_incoming_link_handler,
+ router_outgoing_link_handler,
+ router_writable_link_handler,
+ router_link_detach_handler,
+ 0, // node_created_handler
+ 0, // node_destroyed_handler
+ router_inbound_open_handler,
+ router_outbound_open_handler };
+static int type_registered = 0;
+
+
+dx_router_t *dx_router(dx_router_configuration_t *config)
+{
+ if (!type_registered) {
+ type_registered = 1;
+ dx_container_register_node_type(&router_node);
+ }
+
+ dx_router_t *router = NEW(dx_router_t);
+ dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH);
+
+ DEQ_INIT(router->in_links);
+ DEQ_INIT(router->out_links);
+ DEQ_INIT(router->in_fifo);
+
+ router->lock = sys_mutex();
+
+ router->timer = dx_timer(dx_router_timer_handler, (void*) router);
+ dx_timer_schedule(router->timer, 0); // Immediate
+
+ router->out_hash = hash(10, 32, 0);
+ router->dtag = 1;
+
+ return router;
+}
+
+
+void dx_router_free(dx_router_t *router)
+{
+ dx_container_set_default_node_type(0, 0, DX_DIST_BOTH);
+ sys_mutex_free(router->lock);
+ free(router);
+}
+
diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c
new file mode 100644
index 0000000000..0099393f60
--- /dev/null
+++ b/extras/dispatch/src/server.c
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/log.h>
+#include "server_private.h"
+#include "timer_private.h"
+#include "alloc_private.h"
+#include "auth.h"
+#include "work_queue.h"
+#include <stdio.h>
+#include <time.h>
+#include <signal.h>
+
+static char *module="SERVER";
+
+typedef struct dx_thread_t {
+ int thread_id;
+ volatile int running;
+ volatile int canceled;
+ int using_thread;
+ sys_thread_t *thread;
+} dx_thread_t;
+
+
+typedef struct dx_server_t {
+ int thread_count;
+ pn_driver_t *driver;
+ dx_thread_start_cb_t start_handler;
+ dx_conn_handler_cb_t conn_handler;
+ dx_signal_handler_cb_t signal_handler;
+ dx_user_fd_handler_cb_t ufd_handler;
+ void *start_context;
+ void *conn_context;
+ void *signal_context;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ dx_thread_t **threads;
+ work_queue_t *work_queue;
+ dx_timer_list_t pending_timers;
+ bool a_thread_is_waiting;
+ int threads_active;
+ int pause_requests;
+ int threads_paused;
+ int pause_next_sequence;
+ int pause_now_serving;
+ int pending_signal;
+} dx_server_t;
+
+
+ALLOC_DEFINE(dx_listener_t);
+ALLOC_DEFINE(dx_connector_t);
+ALLOC_DEFINE(dx_connection_t);
+ALLOC_DEFINE(dx_user_fd_t);
+
+
+/**
+ * Singleton Concurrent Proton Driver object
+ */
+static dx_server_t *dx_server = 0;
+
+
+static void signal_handler(int signum)
+{
+ dx_server->pending_signal = signum;
+ sys_cond_signal_all(dx_server->cond);
+}
+
+
+static dx_thread_t *thread(int id)
+{
+ dx_thread_t *thread = NEW(dx_thread_t);
+ if (!thread)
+ return 0;
+
+ thread->thread_id = id;
+ thread->running = 0;
+ thread->canceled = 0;
+ thread->using_thread = 0;
+
+ return thread;
+}
+
+
+static void thread_process_listeners(pn_driver_t *driver)
+{
+ pn_listener_t *listener = pn_driver_listener(driver);
+ pn_connector_t *cxtr;
+ dx_connection_t *ctx;
+
+ while (listener) {
+ dx_log(module, LOG_TRACE, "Accepting Connection");
+ cxtr = pn_listener_accept(listener);
+ ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_SASL_SERVER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_cxtr = cxtr;
+ ctx->pn_conn = 0;
+ ctx->listener = (dx_listener_t*) pn_listener_context(listener);
+ ctx->connector = 0;
+ ctx->context = ctx->listener->context;
+ ctx->ufd = 0;
+
+ pn_connector_set_context(cxtr, ctx);
+ listener = pn_driver_listener(driver);
+ }
+}
+
+
+static void handle_signals_LH(void)
+{
+ int signum = dx_server->pending_signal;
+
+ if (signum) {
+ dx_server->pending_signal = 0;
+ if (dx_server->signal_handler) {
+ sys_mutex_unlock(dx_server->lock);
+ dx_server->signal_handler(dx_server->signal_context, signum);
+ sys_mutex_lock(dx_server->lock);
+ }
+ }
+}
+
+
+static void block_if_paused_LH(void)
+{
+ if (dx_server->pause_requests > 0) {
+ dx_server->threads_paused++;
+ sys_cond_signal_all(dx_server->cond);
+ while (dx_server->pause_requests > 0)
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+ dx_server->threads_paused--;
+ }
+}
+
+
+static void process_connector(pn_connector_t *cxtr)
+{
+ dx_connection_t *ctx = pn_connector_context(cxtr);
+ int events = 0;
+ int auth_passes = 0;
+
+ if (ctx->state == CONN_STATE_USER) {
+ dx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
+ return;
+ }
+
+ do {
+ //
+ // Step the engine for pre-handler processing
+ //
+ pn_connector_process(cxtr);
+
+ //
+ // Call the handler that is appropriate for the connector's state.
+ //
+ switch (ctx->state) {
+ case CONN_STATE_CONNECTING:
+ if (!pn_connector_closed(cxtr)) {
+ ctx->state = CONN_STATE_SASL_CLIENT;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
+ events = 1;
+ } else {
+ ctx->state = CONN_STATE_FAILED;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_CLIENT:
+ if (auth_passes == 0) {
+ auth_client_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_SERVER:
+ if (auth_passes == 0) {
+ auth_server_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_OPENING:
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, "dispatch"); // TODO - make unique
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+
+ case CONN_STATE_OPERATIONAL:
+ if (pn_connector_closed(cxtr)) {
+ dx_server->conn_handler(ctx->context,
+ DX_CONN_EVENT_CLOSE,
+ (dx_connection_t*) pn_connector_context(cxtr));
+ events = 0;
+ }
+ else
+ events = dx_server->conn_handler(ctx->context,
+ DX_CONN_EVENT_PROCESS,
+ (dx_connection_t*) pn_connector_context(cxtr));
+ break;
+
+ default:
+ break;
+ }
+ } while (events > 0);
+}
+
+
+//
+// TEMPORARY FUNCTION PROTOTYPES
+//
+void pn_driver_wait_1(pn_driver_t *d);
+int pn_driver_wait_2(pn_driver_t *d, int timeout);
+void pn_driver_wait_3(pn_driver_t *d);
+//
+// END TEMPORARY
+//
+
+static void *thread_run(void *arg)
+{
+ dx_thread_t *thread = (dx_thread_t*) arg;
+ pn_connector_t *work;
+ pn_connection_t *conn;
+ dx_connection_t *ctx;
+ int error;
+ int poll_result;
+ int timer_holdoff = 0;
+
+ if (!thread)
+ return 0;
+
+ thread->running = 1;
+
+ if (thread->canceled)
+ return 0;
+
+ //
+ // Invoke the start handler if the application supplied one.
+ // This handler can be used to set NUMA or processor affinnity for the thread.
+ //
+ if (dx_server->start_handler)
+ dx_server->start_handler(dx_server->start_context, thread->thread_id);
+
+ //
+ // Main Loop
+ //
+ while (thread->running) {
+ sys_mutex_lock(dx_server->lock);
+
+ //
+ // Check for pending signals to process
+ //
+ handle_signals_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Check to see if the server is pausing. If so, block here.
+ //
+ block_if_paused_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Service pending timers.
+ //
+ dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(dx_server->pending_timers);
+
+ //
+ // Mark the timer as idle in case it reschedules itself.
+ //
+ dx_timer_idle_LH(timer);
+
+ //
+ // Release the lock and invoke the connection handler.
+ //
+ sys_mutex_unlock(dx_server->lock);
+ timer->handler(timer->context);
+ pn_driver_wakeup(dx_server->driver);
+ continue;
+ }
+
+ //
+ // Check the work queue for connectors scheduled for processing.
+ //
+ work = work_queue_get(dx_server->work_queue);
+ if (!work) {
+ //
+ // There is no pending work to do
+ //
+ if (dx_server->a_thread_is_waiting) {
+ //
+ // Another thread is waiting on the proton driver, this thread must
+ // wait on the condition variable until signaled.
+ //
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+ } else {
+ //
+ // This thread elects itself to wait on the proton driver. Set the
+ // thread-is-waiting flag so other idle threads will not interfere.
+ //
+ dx_server->a_thread_is_waiting = true;
+
+ //
+ // Ask the timer module when its next timer is scheduled to fire. We'll
+ // use this value in driver_wait as the timeout. If there are no scheduled
+ // timers, the returned value will be -1.
+ //
+ long duration = dx_timer_next_duration_LH();
+
+ //
+ // Invoke the proton driver's wait sequence. This is a bit of a hack for now
+ // and will be improved in the future. The wait process is divided into three parts,
+ // the first and third of which need to be non-reentrant, and the second of which
+ // must be reentrant (and blocks).
+ //
+ pn_driver_wait_1(dx_server->driver);
+ sys_mutex_unlock(dx_server->lock);
+
+ do {
+ error = 0;
+ poll_result = pn_driver_wait_2(dx_server->driver, duration);
+ if (poll_result == -1)
+ error = pn_driver_errno(dx_server->driver);
+ } while (error == PN_INTR);
+ if (error) {
+ dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver)));
+ exit(-1);
+ }
+
+ sys_mutex_lock(dx_server->lock);
+ pn_driver_wait_3(dx_server->driver);
+
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Visit the timer module.
+ //
+ if (poll_result == 0 || ++timer_holdoff == 100) {
+ struct timespec tv;
+ clock_gettime(CLOCK_REALTIME, &tv);
+ long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
+ dx_timer_visit_LH(milliseconds);
+ timer_holdoff = 0;
+ }
+
+ //
+ // Process listeners (incoming connections).
+ //
+ thread_process_listeners(dx_server->driver);
+
+ //
+ // Traverse the list of connectors-needing-service from the proton driver.
+ // If the connector is not already in the work queue and it is not currently
+ // being processed by another thread, put it in the work queue and signal the
+ // condition variable.
+ //
+ work = pn_driver_connector(dx_server->driver);
+ while (work) {
+ ctx = pn_connector_context(work);
+ if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->enqueued = 1;
+ work_queue_put(dx_server->work_queue, work);
+ sys_cond_signal(dx_server->cond);
+ }
+ work = pn_driver_connector(dx_server->driver);
+ }
+
+ //
+ // Release our exclusive claim on pn_driver_wait.
+ //
+ dx_server->a_thread_is_waiting = false;
+ }
+ }
+
+ //
+ // If we were given a connector to work on from the work queue, mark it as
+ // owned by this thread and as no longer enqueued.
+ //
+ if (work) {
+ ctx = pn_connector_context(work);
+ if (ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->owner_thread = thread->thread_id;
+ ctx->enqueued = 0;
+ dx_server->threads_active++;
+ } else {
+ //
+ // This connector is being processed by another thread, re-queue it.
+ //
+ work_queue_put(dx_server->work_queue, work);
+ work = 0;
+ }
+ }
+ sys_mutex_unlock(dx_server->lock);
+
+ //
+ // Process the connector that we now have exclusive access to.
+ //
+ if (work) {
+ process_connector(work);
+
+ //
+ // Check to see if the connector was closed during processing
+ //
+ if (pn_connector_closed(work)) {
+ //
+ // Connector is closed. Free the context and the connector.
+ //
+ conn = pn_connector_connection(work);
+ if (ctx->connector) {
+ ctx->connector->ctx = 0;
+ ctx->connector->state = CXTR_STATE_CONNECTING;
+ dx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+ }
+ sys_mutex_lock(dx_server->lock);
+ free_dx_connection_t(ctx);
+ pn_connector_free(work);
+ if (conn)
+ pn_connection_free(conn);
+ dx_server->threads_active--;
+ sys_mutex_unlock(dx_server->lock);
+ } else {
+ //
+ // The connector lives on. Mark it as no longer owned by this thread.
+ //
+ sys_mutex_lock(dx_server->lock);
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ dx_server->threads_active--;
+ sys_mutex_unlock(dx_server->lock);
+ }
+
+ //
+ // Wake up the proton driver to force it to reconsider its set of FDs
+ // in light of the processing that just occurred.
+ //
+ pn_driver_wakeup(dx_server->driver);
+ }
+ }
+
+ return 0;
+}
+
+
+static void thread_start(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->using_thread = 1;
+ thread->thread = sys_thread(thread_run, (void*) thread);
+}
+
+
+static void thread_cancel(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->running = 0;
+ thread->canceled = 1;
+}
+
+
+static void thread_join(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ if (thread->using_thread)
+ sys_thread_join(thread->thread);
+}
+
+
+static void thread_free(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ free(thread);
+}
+
+
+static void cxtr_try_open(void *context)
+{
+ dx_connector_t *ct = (dx_connector_t*) context;
+ if (ct->state != CXTR_STATE_CONNECTING)
+ return;
+
+ dx_connection_t *ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_CONNECTING;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = ct;
+ ctx->context = ct->context;
+ ctx->user_context = 0;
+ ctx->ufd = 0;
+
+ //
+ // pn_connector is not thread safe
+ //
+ sys_mutex_lock(dx_server->lock);
+ ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
+ sys_mutex_unlock(dx_server->lock);
+
+ ct->ctx = ctx;
+ ct->delay = 5000;
+ dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
+}
+
+
+void dx_server_initialize(int thread_count)
+{
+ int i;
+
+ if (dx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ dx_alloc_initialize();
+ dx_server = NEW(dx_server_t);
+
+ if (!dx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ dx_server->thread_count = thread_count;
+ dx_server->driver = pn_driver();
+ dx_server->start_handler = 0;
+ dx_server->conn_handler = 0;
+ dx_server->signal_handler = 0;
+ dx_server->ufd_handler = 0;
+ dx_server->start_context = 0;
+ dx_server->signal_context = 0;
+ dx_server->lock = sys_mutex();
+ dx_server->cond = sys_cond();
+
+ dx_timer_initialize(dx_server->lock);
+
+ dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
+ for (i = 0; i < thread_count; i++)
+ dx_server->threads[i] = thread(i);
+
+ dx_server->work_queue = work_queue();
+ DEQ_INIT(dx_server->pending_timers);
+ dx_server->a_thread_is_waiting = false;
+ dx_server->threads_active = 0;
+ dx_server->pause_requests = 0;
+ dx_server->threads_paused = 0;
+ dx_server->pause_next_sequence = 0;
+ dx_server->pause_now_serving = 0;
+ dx_server->pending_signal = 0;
+}
+
+
+void dx_server_finalize(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ for (i = 0; i < dx_server->thread_count; i++)
+ thread_free(dx_server->threads[i]);
+
+ work_queue_free(dx_server->work_queue);
+
+ pn_driver_free(dx_server->driver);
+ sys_mutex_free(dx_server->lock);
+ sys_cond_free(dx_server->cond);
+ free(dx_server);
+ dx_server = 0;
+}
+
+
+void dx_server_set_conn_handler(dx_conn_handler_cb_t handler)
+{
+ dx_server->conn_handler = handler;
+}
+
+
+void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context)
+{
+ dx_server->signal_handler = handler;
+ dx_server->signal_context = context;
+}
+
+
+void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context)
+{
+ dx_server->start_handler = handler;
+ dx_server->start_context = context;
+}
+
+
+void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler)
+{
+ dx_server->ufd_handler = ufd_handler;
+}
+
+
+void dx_server_run(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ assert(dx_server->conn_handler); // Server can't run without a connection handler.
+
+ for (i = 1; i < dx_server->thread_count; i++)
+ thread_start(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
+
+ thread_run((void*) dx_server->threads[0]);
+
+ for (i = 1; i < dx_server->thread_count; i++)
+ thread_join(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Shut Down");
+}
+
+
+void dx_server_stop(void)
+{
+ int idx;
+
+ sys_mutex_lock(dx_server->lock);
+ for (idx = 0; idx < dx_server->thread_count; idx++)
+ thread_cancel(dx_server->threads[idx]);
+ sys_cond_signal_all(dx_server->cond);
+ pn_driver_wakeup(dx_server->driver);
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_signal(int signum)
+{
+ signal(signum, signal_handler);
+}
+
+
+void dx_server_pause(void)
+{
+ sys_mutex_lock(dx_server->lock);
+
+ //
+ // Bump the request count to stop all the threads.
+ //
+ dx_server->pause_requests++;
+ int my_sequence = dx_server->pause_next_sequence++;
+
+ //
+ // Awaken all threads that are currently blocking.
+ //
+ sys_cond_signal_all(dx_server->cond);
+ pn_driver_wakeup(dx_server->driver);
+
+ //
+ // Wait for the paused thread count plus the number of threads requesting a pause to equal
+ // the total thread count. Also, don't exit the blocking loop until now_serving equals our
+ // sequence number. This ensures that concurrent pausers don't run at the same time.
+ //
+ while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) ||
+ (my_sequence != dx_server->pause_now_serving))
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_resume(void)
+{
+ sys_mutex_lock(dx_server->lock);
+ dx_server->pause_requests--;
+ dx_server->pause_now_serving++;
+ sys_cond_signal_all(dx_server->cond);
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_activate(dx_connection_t *ctx)
+{
+ if (!ctx)
+ return;
+
+ pn_connector_t *ctor = ctx->pn_cxtr;
+ if (!ctor)
+ return;
+
+ if (!pn_connector_closed(ctor))
+ pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE);
+}
+
+
+void dx_connection_set_context(dx_connection_t *conn, void *context)
+{
+ conn->user_context = context;
+}
+
+
+void *dx_connection_get_context(dx_connection_t *conn)
+{
+ return conn->user_context;
+}
+
+
+pn_connection_t *dx_connection_pn(dx_connection_t *conn)
+{
+ return conn->pn_conn;
+}
+
+
+dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context)
+{
+ dx_listener_t *li = new_dx_listener_t();
+
+ if (!li)
+ return 0;
+
+ li->config = config;
+ li->context = context;
+ li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);
+
+ if (!li->pn_listener) {
+ dx_log(module, LOG_ERROR, "Driver Error %d (%s)",
+ pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver));
+ free_dx_listener_t(li);
+ return 0;
+ }
+ dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
+
+ return li;
+}
+
+
+void dx_server_listener_free(dx_listener_t* li)
+{
+ pn_listener_free(li->pn_listener);
+ free_dx_listener_t(li);
+}
+
+
+void dx_server_listener_close(dx_listener_t* li)
+{
+ pn_listener_close(li->pn_listener);
+}
+
+
+dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context)
+{
+ dx_connector_t *ct = new_dx_connector_t();
+
+ if (!ct)
+ return 0;
+
+ ct->state = CXTR_STATE_CONNECTING;
+ ct->config = config;
+ ct->context = context;
+ ct->ctx = 0;
+ ct->timer = dx_timer(cxtr_try_open, (void*) ct);
+ ct->delay = 0;
+
+ dx_timer_schedule(ct->timer, ct->delay);
+ return ct;
+}
+
+
+void dx_server_connector_free(dx_connector_t* ct)
+{
+ // Don't free the proton connector. This will be done by the connector
+ // processing/cleanup.
+
+ if (ct->ctx) {
+ pn_connector_close(ct->ctx->pn_cxtr);
+ ct->ctx->connector = 0;
+ }
+
+ dx_timer_free(ct->timer);
+ free_dx_connector_t(ct);
+}
+
+
+dx_user_fd_t *dx_user_fd(int fd, void *context)
+{
+ dx_user_fd_t *ufd = new_dx_user_fd_t();
+
+ if (!ufd)
+ return 0;
+
+ dx_connection_t *ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_USER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = 0;
+ ctx->context = 0;
+ ctx->user_context = 0;
+ ctx->ufd = ufd;
+
+ ufd->context = context;
+ ufd->fd = fd;
+ ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
+ pn_driver_wakeup(dx_server->driver);
+
+ return ufd;
+}
+
+
+void dx_user_fd_free(dx_user_fd_t *ufd)
+{
+ pn_connector_close(ufd->pn_conn);
+ free_dx_user_fd_t(ufd);
+}
+
+
+void dx_user_fd_activate_read(dx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
+ pn_driver_wakeup(dx_server->driver);
+}
+
+
+void dx_user_fd_activate_write(dx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+ pn_driver_wakeup(dx_server->driver);
+}
+
+
+bool dx_user_fd_is_readable(dx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
+}
+
+
+bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+}
+
+
+void dx_server_timer_pending_LH(dx_timer_t *timer)
+{
+ DEQ_INSERT_TAIL(dx_server->pending_timers, timer);
+}
+
+
+void dx_server_timer_cancel_LH(dx_timer_t *timer)
+{
+ DEQ_REMOVE(dx_server->pending_timers, timer);
+}
+
diff --git a/extras/dispatch/src/server_private.h b/extras/dispatch/src/server_private.h
new file mode 100644
index 0000000000..1722175e35
--- /dev/null
+++ b/extras/dispatch/src/server_private.h
@@ -0,0 +1,96 @@
+#ifndef __server_private_h__
+#define __server_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/user_fd.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/alloc.h>
+#include <proton/driver.h>
+#include <proton/driver_extras.h>
+
+void dx_server_timer_pending_LH(dx_timer_t *timer);
+void dx_server_timer_cancel_LH(dx_timer_t *timer);
+
+
+typedef enum {
+ CONN_STATE_CONNECTING = 0,
+ CONN_STATE_SASL_CLIENT,
+ CONN_STATE_SASL_SERVER,
+ CONN_STATE_OPENING,
+ CONN_STATE_OPERATIONAL,
+ CONN_STATE_FAILED,
+ CONN_STATE_USER
+} conn_state_t;
+
+#define CONTEXT_NO_OWNER -1
+
+typedef enum {
+ CXTR_STATE_CONNECTING = 0,
+ CXTR_STATE_OPEN,
+ CXTR_STATE_FAILED
+} cxtr_state_t;
+
+
+struct dx_listener_t {
+ const dx_server_config_t *config;
+ void *context;
+ pn_listener_t *pn_listener;
+};
+
+
+struct dx_connector_t {
+ cxtr_state_t state;
+ const dx_server_config_t *config;
+ void *context;
+ dx_connection_t *ctx;
+ dx_timer_t *timer;
+ long delay;
+};
+
+
+struct dx_connection_t {
+ conn_state_t state;
+ int owner_thread;
+ int enqueued;
+ pn_connector_t *pn_cxtr;
+ pn_connection_t *pn_conn;
+ dx_listener_t *listener;
+ dx_connector_t *connector;
+ void *context; // Copy of context from listener or connector
+ void *user_context;
+ dx_user_fd_t *ufd;
+};
+
+
+struct dx_user_fd_t {
+ void *context;
+ int fd;
+ pn_connector_t *pn_conn;
+};
+
+
+ALLOC_DECLARE(dx_listener_t);
+ALLOC_DECLARE(dx_connector_t);
+ALLOC_DECLARE(dx_connection_t);
+ALLOC_DECLARE(dx_user_fd_t);
+
+
+#endif
diff --git a/extras/dispatch/src/timer.c b/extras/dispatch/src/timer.c
new file mode 100644
index 0000000000..b6b4864e26
--- /dev/null
+++ b/extras/dispatch/src/timer.c
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "timer_private.h"
+#include "server_private.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/alloc.h>
+#include <assert.h>
+#include <stdio.h>
+
+static sys_mutex_t *lock;
+static dx_timer_list_t idle_timers;
+static dx_timer_list_t scheduled_timers;
+static long time_base;
+
+ALLOC_DECLARE(dx_timer_t);
+ALLOC_DEFINE(dx_timer_t);
+
+//=========================================================================
+// Private static functions
+//=========================================================================
+
+static void dx_timer_cancel_LH(dx_timer_t *timer)
+{
+ switch (timer->state) {
+ case TIMER_FREE:
+ assert(0);
+ break;
+
+ case TIMER_IDLE:
+ break;
+
+ case TIMER_SCHEDULED:
+ if (timer->next)
+ timer->next->delta_time += timer->delta_time;
+ DEQ_REMOVE(scheduled_timers, timer);
+ DEQ_INSERT_TAIL(idle_timers, timer);
+ break;
+
+ case TIMER_PENDING:
+ dx_server_timer_cancel_LH(timer);
+ break;
+ }
+
+ timer->state = TIMER_IDLE;
+}
+
+
+//=========================================================================
+// Public Functions from timer.h
+//=========================================================================
+
+dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context)
+{
+ dx_timer_t *timer = new_dx_timer_t();
+ if (!timer)
+ return 0;
+
+ DEQ_ITEM_INIT(timer);
+
+ timer->handler = cb;
+ timer->context = context;
+ timer->delta_time = 0;
+ timer->state = TIMER_IDLE;
+
+ sys_mutex_lock(lock);
+ DEQ_INSERT_TAIL(idle_timers, timer);
+ sys_mutex_unlock(lock);
+
+ return timer;
+}
+
+
+void dx_timer_free(dx_timer_t *timer)
+{
+ sys_mutex_lock(lock);
+ dx_timer_cancel_LH(timer);
+ DEQ_REMOVE(idle_timers, timer);
+ sys_mutex_unlock(lock);
+
+ timer->state = TIMER_FREE;
+ free_dx_timer_t(timer);
+}
+
+
+void dx_timer_schedule(dx_timer_t *timer, long duration)
+{
+ dx_timer_t *ptr;
+ dx_timer_t *last;
+ long total_time;
+
+ sys_mutex_lock(lock);
+ dx_timer_cancel_LH(timer); // Timer is now on the idle list
+ assert(timer->state == TIMER_IDLE);
+ DEQ_REMOVE(idle_timers, timer);
+
+ //
+ // Handle the special case of a zero-time scheduling. In this case,
+ // the timer doesn't go on the scheduled list. It goes straight to the
+ // pending list in the server.
+ //
+ if (duration == 0) {
+ timer->state = TIMER_PENDING;
+ dx_server_timer_pending_LH(timer);
+ sys_mutex_unlock(lock);
+ return;
+ }
+
+ //
+ // Find the insert point in the schedule.
+ //
+ total_time = 0;
+ ptr = DEQ_HEAD(scheduled_timers);
+ assert(!ptr || ptr->prev == 0);
+ while (ptr) {
+ total_time += ptr->delta_time;
+ if (total_time > duration)
+ break;
+ ptr = ptr->next;
+ }
+
+ //
+ // Insert the timer into the schedule and adjust the delta time
+ // of the following timer if present.
+ //
+ if (total_time <= duration) {
+ assert(ptr == 0);
+ timer->delta_time = duration - total_time;
+ DEQ_INSERT_TAIL(scheduled_timers, timer);
+ } else {
+ total_time -= ptr->delta_time;
+ timer->delta_time = duration - total_time;
+ assert(ptr->delta_time > timer->delta_time);
+ ptr->delta_time -= timer->delta_time;
+ last = ptr->prev;
+ if (last)
+ DEQ_INSERT_AFTER(scheduled_timers, timer, last);
+ else
+ DEQ_INSERT_HEAD(scheduled_timers, timer);
+ }
+
+ timer->state = TIMER_SCHEDULED;
+
+ sys_mutex_unlock(lock);
+}
+
+
+void dx_timer_cancel(dx_timer_t *timer)
+{
+ sys_mutex_lock(lock);
+ dx_timer_cancel_LH(timer);
+ sys_mutex_unlock(lock);
+}
+
+
+//=========================================================================
+// Private Functions from timer_private.h
+//=========================================================================
+
+void dx_timer_initialize(sys_mutex_t *server_lock)
+{
+ lock = server_lock;
+ DEQ_INIT(idle_timers);
+ DEQ_INIT(scheduled_timers);
+ time_base = 0;
+}
+
+
+void dx_timer_finalize(void)
+{
+ lock = 0;
+}
+
+
+long dx_timer_next_duration_LH(void)
+{
+ dx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+ if (timer)
+ return timer->delta_time;
+ return -1;
+}
+
+
+void dx_timer_visit_LH(long current_time)
+{
+ long delta;
+ dx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+
+ if (time_base == 0) {
+ time_base = current_time;
+ return;
+ }
+
+ delta = current_time - time_base;
+ time_base = current_time;
+
+ while (timer) {
+ assert(delta >= 0);
+ if (timer->delta_time > delta) {
+ timer->delta_time -= delta;
+ break;
+ } else {
+ DEQ_REMOVE_HEAD(scheduled_timers);
+ delta -= timer->delta_time;
+ timer->state = TIMER_PENDING;
+ dx_server_timer_pending_LH(timer);
+
+ }
+ timer = DEQ_HEAD(scheduled_timers);
+ }
+}
+
+
+void dx_timer_idle_LH(dx_timer_t *timer)
+{
+ timer->state = TIMER_IDLE;
+ DEQ_INSERT_TAIL(idle_timers, timer);
+}
+
diff --git a/extras/dispatch/src/timer_private.h b/extras/dispatch/src/timer_private.h
new file mode 100644
index 0000000000..618297b18e
--- /dev/null
+++ b/extras/dispatch/src/timer_private.h
@@ -0,0 +1,51 @@
+#ifndef __timer_private_h__
+#define __timer_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/threading.h>
+
+typedef enum {
+ TIMER_FREE,
+ TIMER_IDLE,
+ TIMER_SCHEDULED,
+ TIMER_PENDING
+} dx_timer_state_t;
+
+
+struct dx_timer_t {
+ DEQ_LINKS(dx_timer_t);
+ dx_timer_cb_t handler;
+ void *context;
+ long delta_time;
+ dx_timer_state_t state;
+};
+
+DEQ_DECLARE(dx_timer_t, dx_timer_list_t);
+
+void dx_timer_initialize(sys_mutex_t *server_lock);
+void dx_timer_finalize(void);
+long dx_timer_next_duration_LH(void);
+void dx_timer_visit_LH(long current_time);
+void dx_timer_idle_LH(dx_timer_t *timer);
+
+
+#endif
diff --git a/extras/dispatch/src/work_queue.c b/extras/dispatch/src/work_queue.c
new file mode 100644
index 0000000000..4b3c5d7fa5
--- /dev/null
+++ b/extras/dispatch/src/work_queue.c
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include "work_queue.h"
+#include <string.h>
+#include <stdio.h>
+
+#define BATCH_SIZE 100
+typedef struct work_item_t work_item_t;
+
+struct work_item_t {
+ DEQ_LINKS(work_item_t);
+ pn_connector_t *conn;
+};
+
+DEQ_DECLARE(work_item_t, work_list_t);
+
+struct work_queue_t {
+ work_list_t items;
+ work_list_t free_list;
+};
+
+static void allocate_batch(work_queue_t *w)
+{
+ int i;
+ work_item_t *batch = NEW_ARRAY(work_item_t, BATCH_SIZE);
+ if (!batch)
+ return;
+
+ memset(batch, 0, sizeof(work_item_t) * BATCH_SIZE);
+
+ for (i = 0; i < BATCH_SIZE; i++)
+ DEQ_INSERT_TAIL(w->free_list, &batch[i]);
+}
+
+
+work_queue_t *work_queue(void)
+{
+ work_queue_t *w = NEW(work_queue_t);
+ if (!w)
+ return 0;
+
+ DEQ_INIT(w->items);
+ DEQ_INIT(w->free_list);
+
+ allocate_batch(w);
+
+ return w;
+}
+
+
+void work_queue_free(work_queue_t *w)
+{
+ if (!w)
+ return;
+
+ // KEEP TRACK OF BATCHES AND FREE
+ free(w);
+}
+
+
+void work_queue_put(work_queue_t *w, pn_connector_t *conn)
+{
+ work_item_t *item;
+
+ if (!w)
+ return;
+ if (DEQ_SIZE(w->free_list) == 0)
+ allocate_batch(w);
+ if (DEQ_SIZE(w->free_list) == 0)
+ return;
+
+ item = DEQ_HEAD(w->free_list);
+ DEQ_REMOVE_HEAD(w->free_list);
+
+ item->conn = conn;
+
+ DEQ_INSERT_TAIL(w->items, item);
+}
+
+
+pn_connector_t *work_queue_get(work_queue_t *w)
+{
+ work_item_t *item;
+ pn_connector_t *conn;
+
+ if (!w)
+ return 0;
+ item = DEQ_HEAD(w->items);
+ if (!item)
+ return 0;
+
+ DEQ_REMOVE_HEAD(w->items);
+ conn = item->conn;
+ item->conn = 0;
+
+ DEQ_INSERT_TAIL(w->free_list, item);
+
+ return conn;
+}
+
+
+int work_queue_empty(work_queue_t *w)
+{
+ return !w || DEQ_SIZE(w->items) == 0;
+}
+
+
+int work_queue_depth(work_queue_t *w)
+{
+ if (!w)
+ return 0;
+ return DEQ_SIZE(w->items);
+}
+
diff --git a/extras/dispatch/src/work_queue.h b/extras/dispatch/src/work_queue.h
new file mode 100644
index 0000000000..597a484a9c
--- /dev/null
+++ b/extras/dispatch/src/work_queue.h
@@ -0,0 +1,33 @@
+#ifndef __work_queue_h__
+#define __work_queue_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/driver.h>
+
+typedef struct work_queue_t work_queue_t;
+
+work_queue_t *work_queue(void);
+void work_queue_free(work_queue_t *w);
+void work_queue_put(work_queue_t *w, pn_connector_t *conn);
+pn_connector_t *work_queue_get(work_queue_t *w);
+int work_queue_empty(work_queue_t *w);
+int work_queue_depth(work_queue_t *w);
+
+#endif