summaryrefslogtreecommitdiff
path: root/extras/dispatch
diff options
context:
space:
mode:
Diffstat (limited to 'extras/dispatch')
-rw-r--r--extras/dispatch/CMakeLists.txt99
-rw-r--r--extras/dispatch/include/qpid/dispatch/agent.h108
-rw-r--r--extras/dispatch/include/qpid/dispatch/alloc.h72
-rw-r--r--extras/dispatch/include/qpid/dispatch/buffer.h79
-rw-r--r--extras/dispatch/include/qpid/dispatch/container.h129
-rw-r--r--extras/dispatch/include/qpid/dispatch/ctools.h146
-rw-r--r--extras/dispatch/include/qpid/dispatch/hash.h37
-rw-r--r--extras/dispatch/include/qpid/dispatch/iovec.h32
-rw-r--r--extras/dispatch/include/qpid/dispatch/iterator.h113
-rw-r--r--extras/dispatch/include/qpid/dispatch/log.h31
-rw-r--r--extras/dispatch/include/qpid/dispatch/message.h165
-rw-r--r--extras/dispatch/include/qpid/dispatch/router.h35
-rw-r--r--extras/dispatch/include/qpid/dispatch/server.h403
-rw-r--r--extras/dispatch/include/qpid/dispatch/threading.h45
-rw-r--r--extras/dispatch/include/qpid/dispatch/timer.h86
-rw-r--r--extras/dispatch/include/qpid/dispatch/user_fd.h121
-rw-r--r--extras/dispatch/router/CMakeLists.txt31
-rw-r--r--extras/dispatch/router/src/main.c122
-rw-r--r--extras/dispatch/site/css/style.css280
-rw-r--r--extras/dispatch/site/images/arch.diabin0 -> 1352 bytes
-rw-r--r--extras/dispatch/site/images/arch.pngbin0 -> 6170 bytes
-rw-r--r--extras/dispatch/site/includes/footer.include7
-rw-r--r--extras/dispatch/site/includes/header.include6
-rw-r--r--extras/dispatch/site/includes/menu.include68
-rwxr-xr-xextras/dispatch/site/index.html101
-rw-r--r--extras/dispatch/src/agent.c151
-rw-r--r--extras/dispatch/src/alloc.c210
-rw-r--r--extras/dispatch/src/alloc_private.h26
-rw-r--r--extras/dispatch/src/auth.c75
-rw-r--r--extras/dispatch/src/auth.h27
-rw-r--r--extras/dispatch/src/buffer.c83
-rw-r--r--extras/dispatch/src/container.c616
-rw-r--r--extras/dispatch/src/hash.c223
-rw-r--r--extras/dispatch/src/iovec.c81
-rw-r--r--extras/dispatch/src/iterator.c268
-rw-r--r--extras/dispatch/src/log.c56
-rw-r--r--extras/dispatch/src/message.c1120
-rw-r--r--extras/dispatch/src/message_private.h94
-rw-r--r--extras/dispatch/src/posix/threading.c126
-rw-r--r--extras/dispatch/src/router_node.c424
-rw-r--r--extras/dispatch/src/server.c903
-rw-r--r--extras/dispatch/src/server_private.h96
-rw-r--r--extras/dispatch/src/timer.c236
-rw-r--r--extras/dispatch/src/timer_private.h51
-rw-r--r--extras/dispatch/src/work_queue.c132
-rw-r--r--extras/dispatch/src/work_queue.h33
-rw-r--r--extras/dispatch/tests/CMakeLists.txt34
-rw-r--r--extras/dispatch/tests/alloc_test.c86
-rw-r--r--extras/dispatch/tests/message_test.c119
-rw-r--r--extras/dispatch/tests/run_tests.c36
-rw-r--r--extras/dispatch/tests/server_test.c195
-rw-r--r--extras/dispatch/tests/test_case.h36
-rw-r--r--extras/dispatch/tests/timer_test.c388
-rw-r--r--extras/dispatch/tests/tool_test.c159
54 files changed, 8400 insertions, 0 deletions
diff --git a/extras/dispatch/CMakeLists.txt b/extras/dispatch/CMakeLists.txt
new file mode 100644
index 0000000000..bc1812fb6b
--- /dev/null
+++ b/extras/dispatch/CMakeLists.txt
@@ -0,0 +1,99 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+cmake_minimum_required(VERSION 2.6)
+include(CheckLibraryExists)
+include(CheckSymbolExists)
+
+project(qpid-dispatch C)
+
+set (SO_VERSION_MAJOR 0)
+set (SO_VERSION_MINOR 1)
+set (SO_VERSION "${SO_VERSION_MAJOR}.${SO_VERSION_MINOR}")
+
+if (NOT DEFINED LIB_SUFFIX)
+ get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS)
+ if ("${LIB64}" STREQUAL "TRUE" AND ${CMAKE_SIZEOF_VOID_P} STREQUAL "8")
+ set(LIB_SUFFIX 64)
+ else()
+ set(LIB_SUFFIX "")
+ endif()
+endif()
+
+set(INCLUDE_INSTALL_DIR include CACHE PATH "Include file directory")
+set(LIB_INSTALL_DIR "lib${LIB_SUFFIX}" CACHE PATH "Library object file directory")
+set(SYSCONF_INSTALL_DIR etc CACHE PATH "System read only configuration directory")
+set(SHARE_INSTALL_DIR share CACHE PATH "Shared read only data directory")
+set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory")
+
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${proton_include}
+ )
+
+##
+## Find dependencies
+##
+find_library(proton_lib qpid-proton)
+find_library(pthread_lib pthread)
+find_library(rt_lib rt)
+find_path(proton_include proton/driver.h)
+
+set(CMAKE_C_FLAGS "-pthread -Wall -Werror")
+set(CATCH_UNDEFINED "-Wl,--no-undefined")
+
+##
+## Build the Multi-Threaded Server Library
+##
+set(server_SOURCES
+ src/agent.c
+ src/alloc.c
+ src/auth.c
+ src/buffer.c
+ src/container.c
+ src/hash.c
+ src/iovec.c
+ src/iterator.c
+ src/log.c
+ src/message.c
+ src/posix/threading.c
+ src/router_node.c
+ src/server.c
+ src/timer.c
+ src/work_queue.c
+ )
+
+add_library(qpid-dispatch SHARED ${server_SOURCES})
+target_link_libraries(qpid-dispatch ${proton_lib} ${pthread_lib} ${rt_lib})
+set_target_properties(qpid-dispatch PROPERTIES
+ VERSION "${SO_VERSION}"
+ SOVERSION "${SO_VERSION_MAJOR}"
+ LINK_FLAGS "${CATCH_UNDEFINED}"
+ )
+install(TARGETS qpid-dispatch
+ LIBRARY DESTINATION ${LIB_INSTALL_DIR})
+file(GLOB headers "include/qpid/dispatch/*.h")
+install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/dispatch)
+
+##
+## Build Tests
+##
+add_subdirectory(router)
+add_subdirectory(tests)
diff --git a/extras/dispatch/include/qpid/dispatch/agent.h b/extras/dispatch/include/qpid/dispatch/agent.h
new file mode 100644
index 0000000000..d53d24d4d4
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/agent.h
@@ -0,0 +1,108 @@
+#ifndef __dispatch_agent_h__
+#define __dispatch_agent_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stddef.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+/**
+ * \defgroup Container Management Agent
+ * @{
+ */
+
+typedef struct dx_agent_class_t dx_agent_class_t;
+
+
+/**
+ * \brief Get Schema Data Handler
+ *
+ * @param context The handler context supplied in dx_agent_register.
+ */
+typedef void (*dx_agent_schema_cb_t)(void* context);
+
+
+/**
+ * \brief Query Handler
+ *
+ * @param context The handler context supplied in dx_agent_register.
+ * @param id The identifier of the instance being queried or NULL for all instances.
+ * @param correlator The correlation handle to be used in calls to dx_agent_value_*
+ */
+typedef void (*dx_agent_query_cb_t)(void* context, const char *id, const void *correlator);
+
+
+/**
+ * \brief Initialize the agent module and prepare it for operation.
+ *
+ */
+void dx_agent_initialize();
+
+
+/**
+ * \brief Finalize the agent after it has stopped running.
+ */
+void dx_agent_finalize(void);
+
+
+/**
+ * \brief Register a class/object-type with the agent.
+ */
+dx_agent_class_t *dx_agent_register_class(const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler,
+ dx_agent_query_cb_t query_handler);
+
+/**
+ * \brief Register an event-type with the agent.
+ */
+dx_agent_class_t *dx_agent_register_event(const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler);
+
+/**
+ *
+ */
+void dx_agent_value_string(const void *correlator, const char *key, const char *value);
+void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value);
+void dx_agent_value_null(const void *correlator, const char *key);
+void dx_agent_value_boolean(const void *correlator, const char *key, bool value);
+void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len);
+void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value);
+void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value);
+
+
+/**
+ *
+ */
+void dx_agent_value_complete(const void *correlator, bool more);
+
+
+/**
+ *
+ */
+void *dx_agent_raise_event(dx_agent_class_t *event);
+
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/alloc.h b/extras/dispatch/include/qpid/dispatch/alloc.h
new file mode 100644
index 0000000000..ae4190ad89
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/alloc.h
@@ -0,0 +1,72 @@
+#ifndef __dispatch_alloc_h__
+#define __dispatch_alloc_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <qpid/dispatch/threading.h>
+
+typedef struct dx_alloc_pool_t dx_alloc_pool_t;
+
+typedef struct {
+ int transfer_batch_size;
+ int local_free_list_max;
+ int global_free_list_max;
+} dx_alloc_config_t;
+
+typedef struct {
+ uint64_t total_alloc_from_heap;
+ uint64_t total_free_to_heap;
+ uint64_t held_by_threads;
+ uint64_t batches_rebalanced_to_threads;
+ uint64_t batches_rebalanced_to_global;
+} dx_alloc_stats_t;
+
+typedef struct {
+ char *type_name;
+ size_t type_size;
+ size_t *additional_size;
+ size_t total_size;
+ dx_alloc_config_t *config;
+ dx_alloc_stats_t *stats;
+ dx_alloc_pool_t *global_pool;
+ sys_mutex_t *lock;
+} dx_alloc_type_desc_t;
+
+
+void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool);
+void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p);
+
+
+#define ALLOC_DECLARE(T) \
+ T *new_##T(); \
+ void free_##T(T *p)
+
+#define ALLOC_DEFINE_CONFIG(T,S,A,C) \
+ dx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \
+ __thread dx_alloc_pool_t *__local_pool_##T = 0; \
+ T *new_##T() { return (T*) dx_alloc(&__desc_##T, &__local_pool_##T); } \
+ void free_##T(T *p) { dx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \
+ dx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; }
+
+#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0)
+
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/buffer.h b/extras/dispatch/include/qpid/dispatch/buffer.h
new file mode 100644
index 0000000000..1c372b265d
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/buffer.h
@@ -0,0 +1,79 @@
+#ifndef __dispatch_buffer_h__
+#define __dispatch_buffer_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+
+typedef struct dx_buffer_t dx_buffer_t;
+
+DEQ_DECLARE(dx_buffer_t, dx_buffer_list_t);
+
+struct dx_buffer_t {
+ DEQ_LINKS(dx_buffer_t);
+ unsigned int size;
+};
+
+/**
+ */
+void dx_buffer_set_size(size_t size);
+
+/**
+ */
+dx_buffer_t *dx_allocate_buffer(void);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ */
+void dx_free_buffer(dx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first octet in the buffer
+ */
+unsigned char *dx_buffer_base(dx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first free octet in the buffer, the insert point for new data.
+ */
+unsigned char *dx_buffer_cursor(dx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets in the buffer's free space, how many octets may be inserted.
+ */
+size_t dx_buffer_capacity(dx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets of data in the buffer
+ */
+size_t dx_buffer_size(dx_buffer_t *buf);
+
+/**
+ * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the
+ * cursor by len octets.
+ *
+ * @param buf A pointer to an allocated buffer
+ * @param len The number of octets that have been appended to the buffer
+ */
+void dx_buffer_insert(dx_buffer_t *buf, size_t len);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/container.h b/extras/dispatch/include/qpid/dispatch/container.h
new file mode 100644
index 0000000000..01a24fbbef
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/container.h
@@ -0,0 +1,129 @@
+#ifndef __dispatch_container_h__
+#define __dispatch_container_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
+
+typedef uint8_t dx_dist_mode_t;
+#define DX_DIST_COPY 0x01
+#define DX_DIST_MOVE 0x02
+#define DX_DIST_BOTH 0x03
+
+/**
+ * Node Lifetime Policy (see AMQP 3.5.9)
+ */
+typedef enum {
+ DX_LIFE_PERMANENT,
+ DX_LIFE_DELETE_CLOSE,
+ DX_LIFE_DELETE_NO_LINKS,
+ DX_LIFE_DELETE_NO_MESSAGES,
+ DX_LIFE_DELETE_NO_LINKS_MESSAGES
+} dx_lifetime_policy_t;
+
+
+/**
+ * Link Direction
+ */
+typedef enum {
+ DX_INCOMING,
+ DX_OUTGOING
+} dx_direction_t;
+
+
+typedef struct dx_node_t dx_node_t;
+typedef struct dx_link_t dx_link_t;
+
+typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, pn_delivery_t *delivery);
+typedef int (*dx_container_link_handler_t) (void *node_context, dx_link_t *link);
+typedef int (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed);
+typedef void (*dx_container_node_handler_t) (void *type_context, dx_node_t *node);
+typedef void (*dx_container_conn_handler_t) (void *type_context, dx_connection_t *conn);
+
+typedef struct {
+ char *type_name;
+ void *type_context;
+ int allow_dynamic_creation;
+
+ //
+ // Node-Instance Handlers
+ //
+ dx_container_delivery_handler_t rx_handler;
+ dx_container_delivery_handler_t tx_handler;
+ dx_container_delivery_handler_t disp_handler;
+ dx_container_link_handler_t incoming_handler;
+ dx_container_link_handler_t outgoing_handler;
+ dx_container_link_handler_t writable_handler;
+ dx_container_link_detach_handler_t link_detach_handler;
+
+ //
+ // Node-Type Handlers
+ //
+ dx_container_node_handler_t node_created_handler;
+ dx_container_node_handler_t node_destroyed_handler;
+ dx_container_conn_handler_t inbound_conn_open_handler;
+ dx_container_conn_handler_t outbound_conn_open_handler;
+} dx_node_type_t;
+
+void dx_container_initialize(void);
+void dx_container_finalize(void);
+
+int dx_container_register_node_type(const dx_node_type_t *nt);
+
+void dx_container_set_default_node_type(const dx_node_type_t *nt,
+ void *node_context,
+ dx_dist_mode_t supported_dist);
+
+dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
+ const char *name,
+ void *node_context,
+ dx_dist_mode_t supported_dist,
+ dx_lifetime_policy_t life_policy);
+void dx_container_destroy_node(dx_node_t *node);
+
+void dx_container_node_set_context(dx_node_t *node, void *node_context);
+dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node);
+dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node);
+
+dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name);
+void dx_link_set_context(dx_link_t *link, void *link_context);
+void *dx_link_get_context(dx_link_t *link);
+pn_link_t *dx_link_pn(dx_link_t *link);
+pn_terminus_t *dx_link_source(dx_link_t *link);
+pn_terminus_t *dx_link_target(dx_link_t *link);
+pn_terminus_t *dx_link_remote_source(dx_link_t *link);
+pn_terminus_t *dx_link_remote_target(dx_link_t *link);
+void dx_link_activate(dx_link_t *link);
+void dx_link_close(dx_link_t *link);
+
+
+typedef struct dx_link_item_t dx_link_item_t;
+
+struct dx_link_item_t {
+ DEQ_LINKS(dx_link_item_t);
+ dx_link_t *link;
+};
+
+ALLOC_DECLARE(dx_link_item_t);
+DEQ_DECLARE(dx_link_item_t, dx_link_list_t);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/ctools.h b/extras/dispatch/include/qpid/dispatch/ctools.h
new file mode 100644
index 0000000000..33178a23ee
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/ctools.h
@@ -0,0 +1,146 @@
+#ifndef __dispatch_ctools_h__
+#define __dispatch_ctools_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+
+#define CT_ASSERT(exp) { assert(exp); }
+
+#define NEW(t) (t*) malloc(sizeof(t))
+#define NEW_ARRAY(t,n) (t*) malloc(sizeof(t)*(n))
+#define NEW_PTR_ARRAY(t,n) (t**) malloc(sizeof(t*)*(n))
+
+#define DEQ_DECLARE(i,d) typedef struct { \
+ i *head; \
+ i *tail; \
+ i *scratch; \
+ size_t size; \
+ } d
+
+#define DEQ_LINKS(t) t *prev; t *next
+
+#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0)
+#define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0)
+#define DEQ_HEAD(d) ((d).head)
+#define DEQ_TAIL(d) ((d).tail)
+#define DEQ_SIZE(d) ((d).size)
+#define DEQ_NEXT(i) (i)->next
+#define DEQ_PREV(i) (i)->prev
+
+#define DEQ_INSERT_HEAD(d,i) \
+do { \
+ CT_ASSERT((i)->next == 0); \
+ CT_ASSERT((i)->prev == 0); \
+ if ((d).head) { \
+ (i)->next = (d).head; \
+ (d).head->prev = i; \
+ } else { \
+ (d).tail = i; \
+ (i)->next = 0; \
+ CT_ASSERT((d).size == 0); \
+ } \
+ (i)->prev = 0; \
+ (d).head = i; \
+ (d).size++; \
+} while (0)
+
+#define DEQ_INSERT_TAIL(d,i) \
+do { \
+ CT_ASSERT((i)->next == 0); \
+ CT_ASSERT((i)->prev == 0); \
+ if ((d).tail) { \
+ (i)->prev = (d).tail; \
+ (d).tail->next = i; \
+ } else { \
+ (d).head = i; \
+ (i)->prev = 0; \
+ CT_ASSERT((d).size == 0); \
+ } \
+ (i)->next = 0; \
+ (d).tail = i; \
+ (d).size++; \
+} while (0)
+
+#define DEQ_REMOVE_HEAD(d) \
+do { \
+ CT_ASSERT((d).head); \
+ if ((d).head) { \
+ (d).scratch = (d).head; \
+ (d).head = (d).head->next; \
+ if ((d).head == 0) { \
+ (d).tail = 0; \
+ CT_ASSERT((d).size == 1); \
+ } else \
+ (d).head->prev = 0; \
+ (d).size--; \
+ (d).scratch->next = 0; \
+ (d).scratch->prev = 0; \
+ } \
+} while (0)
+
+#define DEQ_REMOVE_TAIL(d) \
+do { \
+ CT_ASSERT((d).tail); \
+ if ((d).tail) { \
+ (d).scratch = (d).tail; \
+ (d).tail = (d).tail->prev; \
+ if ((d).tail == 0) { \
+ (d).head = 0; \
+ CT_ASSERT((d).size == 1); \
+ } else \
+ (d).tail->next = 0; \
+ (d).size--; \
+ (d).scratch->next = 0; \
+ (d).scratch->prev = 0; \
+ } \
+} while (0)
+
+#define DEQ_INSERT_AFTER(d,i,a) \
+do { \
+ CT_ASSERT((i)->next == 0); \
+ CT_ASSERT((i)->prev == 0); \
+ if ((a)->next) \
+ (a)->next->prev = (i); \
+ else \
+ (d).tail = (i); \
+ (i)->next = (a)->next; \
+ (i)->prev = (a); \
+ (a)->next = (i); \
+ (d).size++; \
+} while (0)
+
+#define DEQ_REMOVE(d,i) \
+do { \
+ if ((i)->next) \
+ (i)->next->prev = (i)->prev; \
+ else \
+ (d).tail = (i)->prev; \
+ if ((i)->prev) \
+ (i)->prev->next = (i)->next; \
+ else \
+ (d).head = (i)->next; \
+ (d).size--; \
+ (i)->next = 0; \
+ (i)->prev = 0; \
+ CT_ASSERT((d).size || (!(d).head && !(d).tail)); \
+} while (0)
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/hash.h b/extras/dispatch/include/qpid/dispatch/hash.h
new file mode 100644
index 0000000000..7f4a4bb950
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/hash.h
@@ -0,0 +1,37 @@
+#ifndef __dispatch_hash_h__
+#define __dispatch_hash_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdlib.h>
+#include <qpid/dispatch/iterator.h>
+
+typedef struct hash_t hash_t;
+
+hash_t *hash(int bucket_exponent, int batch_size, int value_is_const);
+void hash_free(hash_t *h);
+
+size_t hash_size(hash_t *h);
+int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val);
+int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val);
+int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val);
+int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val);
+int hash_remove(hash_t *h, dx_field_iterator_t *key);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/iovec.h b/extras/dispatch/include/qpid/dispatch/iovec.h
new file mode 100644
index 0000000000..5b56c638ff
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/iovec.h
@@ -0,0 +1,32 @@
+#ifndef __dispatch_iovec_h__
+#define __dispatch_iovec_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sys/uio.h>
+
+typedef struct dx_iovec_t dx_iovec_t;
+
+dx_iovec_t *dx_iovec(int vector_count);
+void dx_iovec_free(dx_iovec_t *iov);
+struct iovec *dx_iovec_array(dx_iovec_t *iov);
+int dx_iovec_count(dx_iovec_t *iov);
+
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/iterator.h b/extras/dispatch/include/qpid/dispatch/iterator.h
new file mode 100644
index 0000000000..9844286483
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/iterator.h
@@ -0,0 +1,113 @@
+#ifndef __dispatch_iterator_h__
+#define __dispatch_iterator_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+
+/**
+ * The field iterator is used to access fields within a buffer chain.
+ * It shields the user from the fact that the field may be split across
+ * one or more physical buffers.
+ */
+typedef struct dx_field_iterator_t dx_field_iterator_t;
+
+/**
+ * Iterator views allow the code traversing the field to see a transformed
+ * view of the raw field.
+ *
+ * ITER_VIEW_ALL - No transformation of the raw field data
+ *
+ * ITER_VIEW_NO_HOST - Remove the scheme and host fields from the view
+ *
+ * amqp://host.domain.com:port/node-id/node/specific
+ * ^^^^^^^^^^^^^^^^^^^^^
+ * node-id/node/specific
+ * ^^^^^^^^^^^^^^^^^^^^^
+ *
+ * ITER_VIEW_NODE_ID - Isolate the node identifier from an address
+ *
+ * amqp://host.domain.com:port/node-id/node/specific
+ * ^^^^^^^
+ * node-id/node/specific
+ * ^^^^^^^
+ *
+ * ITER_VIEW_NODE_SPECIFIC - Isolate node-specific text from an address
+ *
+ * amqp://host.domain.com:port/node-id/node/specific
+ * ^^^^^^^^^^^^^
+ * node-id/node/specific
+ * ^^^^^^^^^^^^^
+ */
+typedef enum {
+ ITER_VIEW_ALL,
+ ITER_VIEW_NO_HOST,
+ ITER_VIEW_NODE_ID,
+ ITER_VIEW_NODE_SPECIFIC
+} dx_iterator_view_t;
+
+/**
+ * Create an iterator from a null-terminated string.
+ *
+ * The "text" string must stay intact for the whole life of the iterator. The iterator
+ * does not copy the string, it references it.
+ */
+dx_field_iterator_t* dx_field_iterator_string(const char *text,
+ dx_iterator_view_t view);
+
+/**
+ * Create an iterator from a field in a buffer chain
+ */
+dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer,
+ int offset,
+ int length,
+ dx_iterator_view_t view);
+
+/**
+ * Free an iterator
+ */
+void dx_field_iterator_free(dx_field_iterator_t *iter);
+
+/**
+ * Reset the iterator to the first octet and set a new view
+ */
+void dx_field_iterator_reset(dx_field_iterator_t *iter,
+ dx_iterator_view_t view);
+
+/**
+ * Return the current octet in the iterator's view and step to the next.
+ */
+unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter);
+
+/**
+ * Return true iff the iterator has no more octets in the view.
+ */
+int dx_field_iterator_end(dx_field_iterator_t *iter);
+
+/**
+ * Compare an input string to the iterator's view. Return true iff they are equal.
+ */
+int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string);
+
+/**
+ * Return a copy of the iterator's view.
+ */
+unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/log.h b/extras/dispatch/include/qpid/dispatch/log.h
new file mode 100644
index 0000000000..cbea50f266
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/log.h
@@ -0,0 +1,31 @@
+#ifndef __dispatch_log_h__
+#define __dispatch_log_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define LOG_NONE 0x00000000
+#define LOG_TRACE 0x00000001
+#define LOG_ERROR 0x00000002
+#define LOG_INFO 0x00000004
+
+void dx_log(const char *module, int cls, const char *fmt, ...);
+
+void dx_log_set_mask(int mask);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/message.h b/extras/dispatch/include/qpid/dispatch/message.h
new file mode 100644
index 0000000000..41983c44a1
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/message.h
@@ -0,0 +1,165 @@
+#ifndef __dispatch_message_h__
+#define __dispatch_message_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/iovec.h>
+
+// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
+
+typedef struct dx_message_t dx_message_t;
+
+DEQ_DECLARE(dx_message_t, dx_message_list_t);
+
+struct dx_message_t {
+ DEQ_LINKS(dx_message_t);
+ // Private members not listed here.
+};
+
+typedef enum {
+ DX_DEPTH_NONE,
+ DX_DEPTH_HEADER,
+ DX_DEPTH_DELIVERY_ANNOTATIONS,
+ DX_DEPTH_MESSAGE_ANNOTATIONS,
+ DX_DEPTH_PROPERTIES,
+ DX_DEPTH_APPLICATION_PROPERTIES,
+ DX_DEPTH_BODY,
+ DX_DEPTH_ALL
+} dx_message_depth_t;
+
+
+typedef enum {
+ //
+ // Message Sections
+ //
+ DX_FIELD_HEADER,
+ DX_FIELD_DELIVERY_ANNOTATION,
+ DX_FIELD_MESSAGE_ANNOTATION,
+ DX_FIELD_PROPERTIES,
+ DX_FIELD_APPLICATION_PROPERTIES,
+ DX_FIELD_BODY,
+ DX_FIELD_FOOTER,
+
+ //
+ // Fields of the Header Section
+ //
+ DX_FIELD_DURABLE,
+ DX_FIELD_PRIORITY,
+ DX_FIELD_TTL,
+ DX_FIELD_FIRST_ACQUIRER,
+ DX_FIELD_DELIVERY_COUNT,
+
+ //
+ // Fields of the Properties Section
+ //
+ DX_FIELD_MESSAGE_ID,
+ DX_FIELD_USER_ID,
+ DX_FIELD_TO,
+ DX_FIELD_SUBJECT,
+ DX_FIELD_REPLY_TO,
+ DX_FIELD_CORRELATION_ID,
+ DX_FIELD_CONTENT_TYPE,
+ DX_FIELD_CONTENT_ENCODING,
+ DX_FIELD_ABSOLUTE_EXPIRY_TIME,
+ DX_FIELD_CREATION_TIME,
+ DX_FIELD_GROUP_ID,
+ DX_FIELD_GROUP_SEQUENCE,
+ DX_FIELD_REPLY_TO_GROUP_ID
+} dx_message_field_t;
+
+//
+// Functions for allocation
+//
+dx_message_t *dx_allocate_message(void);
+void dx_free_message(dx_message_t *qm);
+dx_message_t *dx_message_copy(dx_message_t *qm);
+int dx_message_persistent(dx_message_t *qm);
+int dx_message_in_memory(dx_message_t *qm);
+
+void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery);
+pn_delivery_t *dx_message_out_delivery(dx_message_t *msg);
+void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery);
+pn_delivery_t *dx_message_in_delivery(dx_message_t *msg);
+
+//
+// Functions for received messages
+//
+dx_message_t *dx_message_receive(pn_delivery_t *delivery);
+void dx_message_send(dx_message_t *msg, pn_link_t *link);
+
+int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
+dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field);
+dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field);
+
+pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm);
+
+//
+// Functions for composed messages
+//
+
+// Convenience Functions
+void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers);
+void dx_message_copy_header(dx_message_t *msg); // Copy received header into send-header (prior to adding annotations)
+void dx_message_copy_message_annotations(dx_message_t *msg);
+
+// Raw Functions
+void dx_message_begin_header(dx_message_t *msg);
+void dx_message_end_header(dx_message_t *msg);
+
+void dx_message_begin_delivery_annotations(dx_message_t *msg);
+void dx_message_end_delivery_annotations(dx_message_t *msg);
+
+void dx_message_begin_message_annotations(dx_message_t *msg);
+void dx_message_end_message_annotations(dx_message_t *msg);
+
+void dx_message_begin_message_properties(dx_message_t *msg);
+void dx_message_end_message_properties(dx_message_t *msg);
+
+void dx_message_begin_application_properties(dx_message_t *msg);
+void dx_message_end_application_properties(dx_message_t *msg);
+
+void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers);
+
+void dx_message_begin_body_sequence(dx_message_t *msg);
+void dx_message_end_body_sequence(dx_message_t *msg);
+
+void dx_message_begin_footer(dx_message_t *msg);
+void dx_message_end_footer(dx_message_t *msg);
+
+void dx_message_insert_null(dx_message_t *msg);
+void dx_message_insert_boolean(dx_message_t *msg, int value);
+void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value);
+void dx_message_insert_uint(dx_message_t *msg, uint32_t value);
+void dx_message_insert_ulong(dx_message_t *msg, uint64_t value);
+void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len);
+void dx_message_insert_string(dx_message_t *msg, const char *start);
+void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value);
+void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len);
+void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value);
+void dx_message_begin_list(dx_message_t* msg);
+void dx_message_end_list(dx_message_t* msg);
+void dx_message_begin_map(dx_message_t* msg);
+void dx_message_end_map(dx_message_t* msg);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/router.h b/extras/dispatch/include/qpid/dispatch/router.h
new file mode 100644
index 0000000000..03f4aa15be
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/router.h
@@ -0,0 +1,35 @@
+#ifndef __dispatch_router_h__
+#define __dispatch_router_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+#include <qpid/dispatch/container.h>
+
+typedef struct dx_router_t dx_router_t;
+
+typedef struct {
+ size_t message_limit;
+ size_t memory_limit;
+} dx_router_configuration_t;
+
+dx_router_t *dx_router(dx_router_configuration_t *config);
+void dx_router_free(dx_router_t *router);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/server.h b/extras/dispatch/include/qpid/dispatch/server.h
new file mode 100644
index 0000000000..635e1323dd
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/server.h
@@ -0,0 +1,403 @@
+#ifndef __dispatch_server_h__
+#define __dispatch_server_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/engine.h>
+
+/**
+ * \defgroup Control Server Control Functions
+ * @{
+ */
+
+/**
+ * \brief Thread Start Handler
+ *
+ * Callback invoked when a new server thread is started. The callback is
+ * invoked on the newly created thread.
+ *
+ * This handler can be used to set processor affinity or other thread-specific
+ * tuning values.
+ *
+ * @param context The handler context supplied in dx_server_initialize.
+ * @param thread_id The integer thread identifier that uniquely identifies this thread.
+ */
+typedef void (*dx_thread_start_cb_t)(void* context, int thread_id);
+
+
+/**
+ * \brief Initialize the server module and prepare it for operation.
+ *
+ * @param thread_count The number of worker threads (1 or more) that the server shall create
+ */
+void dx_server_initialize(int thread_count);
+
+
+/**
+ * \brief Finalize the server after it has stopped running.
+ */
+void dx_server_finalize(void);
+
+
+/**
+ * \brief Set the optional thread-start handler.
+ *
+ * This handler is called once on each worker thread at the time
+ * the thread is started. This may be used to set tuning settings like processor affinity, etc.
+ *
+ * @param start_handler The thread-start handler invoked per thread on thread startup.
+ * @param context Opaque context to be passed back in the callback function.
+ */
+void dx_server_set_start_handler(dx_thread_start_cb_t start_handler, void *context);
+
+
+/**
+ * \brief Run the server threads until completion.
+ *
+ * Start the operation of the server, including launching all of the worker threads.
+ * This function does not return until after the server has been stopped. The thread
+ * that calls dx_server_run is used as one of the worker threads.
+ */
+void dx_server_run(void);
+
+
+/**
+ * \brief Stop the server
+ *
+ * Stop the server and join all of its worker threads. This function may be called from any
+ * thread. When this function returns, all of the other server threads have been closed and
+ * joined. The calling thread will be the only running thread in the process.
+ */
+void dx_server_stop(void);
+
+
+/**
+ * \brief Pause (quiesce) the server.
+ *
+ * This call blocks until all of the worker threads (except
+ * the one calling the this function) are finished processing and have been blocked. When
+ * this call returns, the calling thread is the only thread running in the process.
+ */
+void dx_server_pause(void);
+
+
+/**
+ * \brief Resume normal operation of a paused server.
+ *
+ * This call unblocks all of the worker threads
+ * so they can resume normal connection processing.
+ */
+void dx_server_resume(void);
+
+
+/**
+ * @}
+ * \defgroup Signal Server Signal Handling Functions
+ * @{
+ */
+
+
+/**
+ * \brief Signal Handler
+ *
+ * Callback for caught signals. This handler will only be invoked for signal numbers
+ * that were registered via dx_server_signal. The handler is not invoked in the context
+ * of the OS signal handler. Rather, it is invoked on one of the worker threads in an
+ * orderly sequence.
+ *
+ * @param context The handler context supplied in dx_server_initialize.
+ * @param signum The signal number that was raised.
+ */
+typedef void (*dx_signal_handler_cb_t)(void* context, int signum);
+
+
+/**
+ * Set the signal handler for the server. The signal handler is invoked cleanly on a worker thread
+ * after the server process catches an operating-system signal. The signal handler is optional and
+ * need not be set.
+ *
+ * @param signal_handler The signal handler called when a registered signal is caught.
+ * @param context Opaque context to be passed back in the callback function.
+ */
+void dx_server_set_signal_handler(dx_signal_handler_cb_t signal_handler, void *context);
+
+
+/**
+ * \brief Register a signal to be caught and handled by the signal handler.
+ *
+ * @param signum The signal number of a signal to be handled by the application.
+ */
+void dx_server_signal(int signum);
+
+
+/**
+ * @}
+ * \defgroup Connection Server AMQP Connection Handling Functions
+ * @{
+ */
+
+/**
+ * \brief Listener objects represent the desire to accept incoming transport connections.
+ */
+typedef struct dx_listener_t dx_listener_t;
+
+/**
+ * \brief Connector objects represent the desire to create and maintain an outgoing transport connection.
+ */
+typedef struct dx_connector_t dx_connector_t;
+
+/**
+ * \brief Connection objects wrap Proton connection objects.
+ */
+typedef struct dx_connection_t dx_connection_t;
+
+/**
+ * Event type for the connection callback.
+ */
+typedef enum {
+ /// The connection just opened via a listener (inbound).
+ DX_CONN_EVENT_LISTENER_OPEN,
+
+ /// The connection just opened via a connector (outbound).
+ DX_CONN_EVENT_CONNECTOR_OPEN,
+
+ /// The connection was closed at the transport level (not cleanly).
+ DX_CONN_EVENT_CLOSE,
+
+ /// The connection requires processing.
+ DX_CONN_EVENT_PROCESS
+} dx_conn_event_t;
+
+
+/**
+ * \brief Connection Event Handler
+ *
+ * Callback invoked when processing is needed on a proton connection. This callback
+ * shall be invoked on one of the server's worker threads. The server guarantees that
+ * no two threads shall be allowed to process a single connection concurrently.
+ * The implementation of this handler may assume that it has exclusive access to the
+ * connection and its subservient components (sessions, links, deliveries, etc.).
+ *
+ * @param context The handler context supplied in dx_server_{connect,listen}.
+ * @param event The event/reason for the invocation of the handler.
+ * @param conn The connection that requires processing by the handler.
+ * @return A value greater than zero if the handler did any proton processing for
+ * the connection. If no work was done, zero is returned.
+ */
+typedef int (*dx_conn_handler_cb_t)(void* context, dx_conn_event_t event, dx_connection_t *conn);
+
+
+/**
+ * \brief Set the connection event handler callback.
+ *
+ * Set the connection handler callback for the server. This callback is mandatory and must be set
+ * prior to the invocation of dx_server_run.
+ *
+ * @param conn_hander The handler for processing connection-related events.
+ */
+void dx_server_set_conn_handler(dx_conn_handler_cb_t conn_handler);
+
+
+/**
+ * \brief Set the user context for a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @param context User context to be stored with the connection.
+ */
+void dx_connection_set_context(dx_connection_t *conn, void *context);
+
+
+/**
+ * \brief Get the user context from a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The user context stored with the connection.
+ */
+void *dx_connection_get_context(dx_connection_t *conn);
+
+
+/**
+ * \brief Activate a connection for output.
+ *
+ * This function is used to request that the server activate the indicated connection.
+ * It is assumed that the connection is one that the caller does not have permission to
+ * access (i.e. it may be owned by another thread currently). An activated connection
+ * will, when writable, appear in the internal work list and be invoked for processing
+ * by a worker thread.
+ *
+ * @param conn The connection over which the application wishes to send data
+ */
+void dx_server_activate(dx_connection_t *conn);
+
+
+/**
+ * \brief Get the wrapped proton-engine connection object.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The proton connection object.
+ */
+pn_connection_t *dx_connection_pn(dx_connection_t *conn);
+
+
+/**
+ * \brief Configuration block for a connector or a listener.
+ */
+typedef struct dx_server_config_t {
+ /**
+ * Host name or network address to bind to a listener or use in the connector.
+ */
+ char *host;
+
+ /**
+ * Port name or number to bind to a listener or use in the connector.
+ */
+ char *port;
+
+ /**
+ * Space-separated list of SASL mechanisms to be accepted for the connection.
+ */
+ char *sasl_mechanisms;
+
+ /**
+ * If appropriate for the mechanism, the username for authentication
+ * (connector only)
+ */
+ char *sasl_username;
+
+ /**
+ * If appropriate for the mechanism, the password for authentication
+ * (connector only)
+ */
+ char *sasl_password;
+
+ /**
+ * If appropriate for the mechanism, the minimum acceptable security strength factor
+ */
+ int sasl_minssf;
+
+ /**
+ * If appropriate for the mechanism, the maximum acceptable security strength factor
+ */
+ int sasl_maxssf;
+
+ /**
+ * SSL is enabled for this connection iff non-zero.
+ */
+ int ssl_enabled;
+
+ /**
+ * Connection will take on the role of SSL server iff non-zero.
+ */
+ int ssl_server;
+
+ /**
+ * Iff non-zero AND ssl_enabled is non-zero, this listener will detect the client's use
+ * of SSL or non-SSL and conform to the client's protocol.
+ * (listener only)
+ */
+ int ssl_allow_unsecured_client;
+
+ /**
+ * Path to the file containing the PEM-formatted public certificate for the local end
+ * of the connection.
+ */
+ char *ssl_certificate_file;
+
+ /**
+ * Path to the file containing the PEM-formatted private key for the local end of the
+ * connection.
+ */
+ char *ssl_private_key_file;
+
+ /**
+ * The password used to sign the private key, or NULL if the key is not protected.
+ */
+ char *ssl_password;
+
+ /**
+ * Path to the file containing the PEM-formatted set of certificates of trusted CAs.
+ */
+ char *ssl_trusted_certificate_db;
+
+ /**
+ * Iff non-zero, require that the peer's certificate be supplied and that it be authentic
+ * according to the set of trusted CAs.
+ */
+ int ssl_require_peer_authentication;
+
+ /**
+ * Allow the connection to be redirected by the peer (via CLOSE->Redirect). This is
+ * meaningful for outgoing (connector) connections only.
+ */
+ int allow_redirect;
+} dx_server_config_t;
+
+
+/**
+ * \brief Create a listener for incoming connections.
+ *
+ * @param config Pointer to a configuration block for this listener. This block will be
+ * referenced by the server, not copied. The referenced record must remain
+ * in-scope for the life of the listener.
+ * @param context User context passed back in the connection handler.
+ * @return A pointer to the new listener, or NULL in case of failure.
+ */
+dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context);
+
+
+/**
+ * \brief Free the resources associated with a listener.
+ *
+ * @param li A listener pointer returned by dx_listen.
+ */
+void dx_listener_free(dx_listener_t* li);
+
+
+/**
+ * \brief Close a listener so it will accept no more connections.
+ *
+ * @param li A listener pointer returned by dx_listen.
+ */
+void dx_listener_close(dx_listener_t* li);
+
+
+/**
+ * \brief Create a connector for an outgoing connection.
+ *
+ * @param config Pointer to a configuration block for this connector. This block will be
+ * referenced by the server, not copied. The referenced record must remain
+ * in-scope for the life of the connector..
+ * @param context User context passed back in the connection handler.
+ * @return A pointer to the new connector, or NULL in case of failure.
+ */
+dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context);
+
+
+/**
+ * \brief Free the resources associated with a connector.
+ *
+ * @param ct A connector pointer returned by dx_connect.
+ */
+void dx_connector_free(dx_connector_t* ct);
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/threading.h b/extras/dispatch/include/qpid/dispatch/threading.h
new file mode 100644
index 0000000000..f275fc0086
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/threading.h
@@ -0,0 +1,45 @@
+#ifndef __sys_threading_h__
+#define __sys_threading_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct sys_mutex_t sys_mutex_t;
+
+sys_mutex_t *sys_mutex(void);
+void sys_mutex_free(sys_mutex_t *mutex);
+void sys_mutex_lock(sys_mutex_t *mutex);
+void sys_mutex_unlock(sys_mutex_t *mutex);
+
+
+typedef struct sys_cond_t sys_cond_t;
+
+sys_cond_t *sys_cond(void);
+void sys_cond_free(sys_cond_t *cond);
+void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex);
+void sys_cond_signal(sys_cond_t *cond);
+void sys_cond_signal_all(sys_cond_t *cond);
+
+
+typedef struct sys_thread_t sys_thread_t;
+
+sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg);
+void sys_thread_free(sys_thread_t *thread);
+void sys_thread_join(sys_thread_t *thread);
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/timer.h b/extras/dispatch/include/qpid/dispatch/timer.h
new file mode 100644
index 0000000000..af3a22e262
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/timer.h
@@ -0,0 +1,86 @@
+#ifndef __dispatch_timer_h__
+#define __dispatch_timer_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \defgroup Timer Server Timer Functions
+ * @{
+ */
+
+typedef struct dx_timer_t dx_timer_t;
+
+/**
+ * Timer Callback
+ *
+ * Callback invoked after a timer's interval expires and the timer fires.
+ *
+ * @param context The context supplied in dx_timer
+ */
+typedef void (*dx_timer_cb_t)(void* context);
+
+
+/**
+ * Create a new timer object.
+ *
+ * @param cb The callback function to be invoked when the timer expires.
+ * @param context An opaque, user-supplied context to be passed into the callback.
+ * @return A pointer to the new timer object or NULL if memory is exhausted.
+ */
+dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context);
+
+
+/**
+ * Free the resources for a timer object. If the timer was scheduled, it will be canceled
+ * prior to freeing. After this function returns, the callback will not be invoked for this
+ * timer.
+ *
+ * @param timer Pointer to the timer object returned by dx_timer.
+ */
+void dx_timer_free(dx_timer_t *timer);
+
+
+/**
+ * Schedule a timer to fire in the future.
+ *
+ * Note that the timer callback will never be invoked synchronously during the execution
+ * of dx_timer_schedule. Even if the interval is immediate (0), the callback invocation will
+ * be asynchronous and after the return of this function.
+ *
+ * @param timer Pointer to the timer object returned by dx_timer.
+ * @param msec The minimum number of milliseconds of delay until the timer fires.
+ * If 0 is supplied, the timer will fire immediately.
+ */
+void dx_timer_schedule(dx_timer_t *timer, long msec);
+
+
+/**
+ * Attempt to cancel a scheduled timer. Since the timer callback can be invoked on any
+ * server thread, it is always possible that a last-second cancel attempt may arrive too late
+ * to stop the timer from firing (i.e. the cancel is concurrent with the fire callback).
+ *
+ * @param timer Pointer to the timer object returned by dx_timer.
+ */
+void dx_timer_cancel(dx_timer_t *timer);
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/extras/dispatch/include/qpid/dispatch/user_fd.h b/extras/dispatch/include/qpid/dispatch/user_fd.h
new file mode 100644
index 0000000000..3e5584ce2e
--- /dev/null
+++ b/extras/dispatch/include/qpid/dispatch/user_fd.h
@@ -0,0 +1,121 @@
+#ifndef __dispatch_user_fd_h__
+#define __dispatch_user_fd_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * \defgroup UserFd Server User-File-Descriptor Functions
+ * @{
+ */
+
+typedef struct dx_user_fd_t dx_user_fd_t;
+
+
+/**
+ * User_fd Handler
+ *
+ * Callback invoked when a user-managed file descriptor is available for reading or writing or there
+ * was an error on the file descriptor.
+ *
+ * @param context The handler context supplied in the dx_user_fd call.
+ * @param ufd The user_fd handle for the processable fd.
+ */
+typedef void (*dx_user_fd_handler_cb_t)(void* context, dx_user_fd_t *ufd);
+
+
+/**
+ * Set the user-fd handler callback for the server. This handler is optional, but must be supplied
+ * if the dx_server is used to manage the activation of user file descriptors.
+ */
+void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler);
+
+
+/**
+ * Create a tracker for a user-managed file descriptor.
+ *
+ * A user-fd is appropriate for use when the application opens and manages file descriptors
+ * for purposes other than AMQP communication. Registering a user fd with the dispatch server
+ * controls processing of the FD alongside the FDs used for messaging.
+ *
+ * @param fd The open file descriptor being managed by the application.
+ * @param context User context passed back in the connection handler.
+ * @return A pointer to the new user_fd.
+ */
+dx_user_fd_t *dx_user_fd(int fd, void *context);
+
+
+/**
+ * Free the resources for a user-managed FD tracker.
+ *
+ * @param ufd Structure pointer returned by dx_user_fd.
+ */
+void dx_user_fd_free(dx_user_fd_t *ufd);
+
+
+/**
+ * Activate a user-fd for read.
+ *
+ * Use this activation when the application has capacity to receive data from the user-fd. This will
+ * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the
+ * file descriptor has data to read.
+ *
+ * @param ufd Structure pointer returned by dx_user_fd.
+ */
+void dx_user_fd_activate_read(dx_user_fd_t *ufd);
+
+
+/**
+ * Activate a user-fd for write.
+ *
+ * Use this activation when the application has data to write via the user-fd. This will
+ * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the
+ * file descriptor is writable.
+ *
+ * @param ufd Structure pointer returned by dx_user_fd.
+ */
+void dx_user_fd_activate_write(dx_user_fd_t *ufd);
+
+
+/**
+ * Check readable status of a user-fd
+ *
+ * Note: It is possible that readable status is spurious (i.e. this function returns true
+ * but the file-descriptor is not readable and will block if not set to O_NONBLOCK).
+ * Code accordingly.
+ *
+ * @param ufd Structure pointer returned by dx_user_fd.
+ * @return true iff the user file descriptor is readable.
+ */
+bool dx_user_fd_is_readable(dx_user_fd_t *ufd);
+
+
+/**
+ * Check writable status of a user-fd
+ *
+ * @param ufd Structure pointer returned by dx_user_fd.
+ * @return true iff the user file descriptor is writable.
+ */
+bool dx_user_fd_is_writeable(dx_user_fd_t *ufd);
+
+/**
+ * @}
+ */
+
+#endif
diff --git a/extras/dispatch/router/CMakeLists.txt b/extras/dispatch/router/CMakeLists.txt
new file mode 100644
index 0000000000..efb424ee13
--- /dev/null
+++ b/extras/dispatch/router/CMakeLists.txt
@@ -0,0 +1,31 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+##
+## Build the router application
+##
+set(router_SOURCES
+ src/main.c
+ )
+
+add_executable(dispatch-router ${router_SOURCES})
+target_link_libraries(dispatch-router qpid-dispatch ${proton_lib})
+
+install(TARGETS dispatch-router RUNTIME DESTINATION bin)
+
diff --git a/extras/dispatch/router/src/main.c b/extras/dispatch/router/src/main.c
new file mode 100644
index 0000000000..0cafa6a2ca
--- /dev/null
+++ b/extras/dispatch/router/src/main.c
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <proton/driver.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/router.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static int exit_with_sigint = 0;
+
+static void thread_start_handler(void* context, int thread_id)
+{
+}
+
+
+static void signal_handler(void* context, int signum)
+{
+ dx_server_pause();
+
+ switch (signum) {
+ case SIGINT:
+ exit_with_sigint = 1;
+
+ case SIGQUIT:
+ case SIGTERM:
+ fflush(stdout);
+ dx_server_stop();
+ break;
+
+ case SIGHUP:
+ break;
+
+ default:
+ break;
+ }
+
+ dx_server_resume();
+}
+
+
+static void startup(void *context)
+{
+ // TODO - Move this into a configuration framework
+
+ dx_server_pause();
+
+ static dx_server_config_t server_config;
+ server_config.host = "0.0.0.0";
+ server_config.port = "5672";
+ server_config.sasl_mechanisms = "ANONYMOUS";
+ server_config.ssl_enabled = 0;
+
+ dx_server_listen(&server_config, 0);
+
+ /*
+ static dx_server_config_t client_config;
+ client_config.host = "0.0.0.0";
+ client_config.port = "10000";
+ client_config.sasl_mechanisms = "ANONYMOUS";
+ client_config.ssl_enabled = 0;
+
+ dx_server_connect(&client_config, 0);
+ */
+
+ dx_server_resume();
+}
+
+
+int main(int argc, char **argv)
+{
+ dx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR);
+
+ dx_server_initialize(4);
+ dx_container_initialize();
+
+ dx_server_set_signal_handler(signal_handler, 0);
+ dx_server_set_start_handler(thread_start_handler, 0);
+
+ dx_router_t *router = dx_router(0);
+
+ dx_timer_t *startup_timer = dx_timer(startup, 0);
+ dx_timer_schedule(startup_timer, 0);
+
+ dx_server_signal(SIGHUP);
+ dx_server_signal(SIGQUIT);
+ dx_server_signal(SIGTERM);
+ dx_server_signal(SIGINT);
+
+ dx_server_run();
+ dx_router_free(router);
+ dx_server_finalize();
+
+ if (exit_with_sigint) {
+ signal(SIGINT, SIG_DFL);
+ kill(getpid(), SIGINT);
+ }
+
+ return 0;
+}
+
diff --git a/extras/dispatch/site/css/style.css b/extras/dispatch/site/css/style.css
new file mode 100644
index 0000000000..b73c136d4a
--- /dev/null
+++ b/extras/dispatch/site/css/style.css
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ul {
+ list-style-type:square;
+}
+
+th {
+ text-align: left;
+ font-weight: bold;
+}
+
+body {
+ margin:0;
+ background:#FFFFFF;
+ font-family:"Verdana", sans-serif;
+}
+
+.container {
+ width:950px;
+ margin:0 auto;
+}
+
+.header {
+ height:100px;
+ width:950px;
+ background:url(images/header.png)
+}
+
+.logo {
+ text-align:center;
+ font-weight:600;
+ padding:0 0 0 0;
+ font-size:14px;
+ font-family:"Verdana", cursive;
+}
+
+.logo a {
+ color:#000000;
+ text-decoration:none;
+}
+
+.main_text_area {
+ margin-left:200px;
+}
+
+.main_text_area_top {
+ height:14px;
+ font-size:1px;
+}
+
+.main_text_area_bottom {
+ display:none;
+/* height:14px;
+ margin-bottom:4px;*/
+}
+
+.main_text_area_body {
+ padding:5px 24px;
+}
+
+.main_text_area_body p {
+ text-align:justify;
+}
+
+.main_text_area br {
+ line-height:10px;
+}
+
+.main_text_area h1 {
+ font-size:28px;
+ font-weight:600;
+ margin:0 0 24px 0;
+ color:#0c3b82;
+ font-family:"Verdana", Times, serif;
+}
+
+.main_text_area h2 {
+ font-size:24px;
+ font-weight:600;
+ margin:24px 0 8px 0;
+ color:#0c3b82;
+ font-family:"Verdana",Times, serif;
+}
+
+.main_text_area ol, .main_text_area ul {
+ padding:0;
+ margin:10px 0;
+ margin-left:20px;
+}
+
+.main_text_area li {
+/* margin-left:40px; */
+}
+
+.main_text_area, .menu_box {
+ font-size:13px;
+ line-height:17px;
+ color:#000000;
+}
+
+.main_text_area {
+ font-size:15px;
+}
+
+.main_text_area a {
+ color:#000000;
+}
+
+.main_text_area a:hover {
+ color:#000000;
+}
+
+.menu_box {
+ width:196px;
+ float:left;
+ margin-left:4px;
+}
+
+.menu_box_top {
+ background:url(images/menu_top.png) no-repeat;
+ height:14px;
+ font-size:1px;
+}
+
+.menu_box_body {
+ background:url(images/menu_body.png) repeat-y;
+ padding:5px 24px 5px 24px;
+}
+
+.menu_box_bottom {
+ background:url(images/menu_bottom.png) no-repeat;
+ height:14px;
+ font-size:1px;
+ margin-bottom:1px;
+}
+
+.menu_box h3 {
+ font-size:20px;
+ font-weight:500;
+ margin:0 0 8px 0;
+ color:#0c3b82;
+ font-family:"Verdana",Times, serif;
+}
+
+.menu_box ul {
+ margin:12px;
+ padding:0px;
+}
+
+.menu_box li {
+ list-style:square;
+}
+
+.menu_box a {
+ color:#000000;
+ text-decoration:none;
+}
+
+.menu_box a:hover {
+ color:#000000;
+ text-decoration:underline;
+}
+
+.feature_box {
+ width:698px;
+ overflow:hidden;
+}
+
+.feature_box h3 {
+ font-size:18px;
+ font-weight:600;
+ margin:0 0 8px 0;
+ color:#0c3b82;
+ font-family:"Verdana", Times, serif;
+}
+
+.feature_box_column1 {
+ width:196px;
+ float:left;
+ padding:10px 15px 10px 15px;
+ margin-left:0px;
+}
+
+.feature_box_column2 {
+ width:196px;
+ float:left;
+ padding:10px 15px 10px 15px;
+ margin-left:0px;
+}
+
+.feature_box_column3 {
+ width:196px;
+ float:left;
+ padding:10px 15px 10px 15px;
+ margin-left:0px;
+}
+
+
+.feature_box ul {
+ margin:.8em .4em;
+ padding-left:1.2em;
+ padding:0;
+ list-style-type: square;
+}
+
+.feature_box ul li {
+ font-family:"Verdana",sans-serif;
+ font-size:14px;
+ color:#000;
+ margin:.4em 0;
+}
+
+.feature_box ul li ul {
+ padding-left:1.2em;
+ margin-left:2em;
+}
+
+.feature_box a {
+ color:#000000;
+ text-decoration:none;
+}
+
+.feature_box a:hover {
+ color:#000000;
+ text-decoration:underline;
+}
+
+.footer {
+ color:#000000;
+ clear:both;
+ text-align:center;
+ font-size:11px;
+ line-height:17px;
+ height:45px;
+ padding-top:18px;
+}
+
+.footer a {
+ color:#000000;
+}
+
+.footer a:hover {
+ color:#000000;
+}
+
+.download_table {
+ width:100%;
+}
+
+.download_table_col_1 {
+ width:240px;
+}
+
+.proton_download_table_col_1 {
+ width:420px;
+}
+
+.download_table_amqp_col {
+ text-align:center;
+ width:80px;
+}
+
diff --git a/extras/dispatch/site/images/arch.dia b/extras/dispatch/site/images/arch.dia
new file mode 100644
index 0000000000..99b3185447
--- /dev/null
+++ b/extras/dispatch/site/images/arch.dia
Binary files differ
diff --git a/extras/dispatch/site/images/arch.png b/extras/dispatch/site/images/arch.png
new file mode 100644
index 0000000000..a2b7f776b9
--- /dev/null
+++ b/extras/dispatch/site/images/arch.png
Binary files differ
diff --git a/extras/dispatch/site/includes/footer.include b/extras/dispatch/site/includes/footer.include
new file mode 100644
index 0000000000..35ff04b9f2
--- /dev/null
+++ b/extras/dispatch/site/includes/footer.include
@@ -0,0 +1,7 @@
+ <div class="footer">
+ <p>
+ &#xA9; 2004-2012 The Apache Software Foundation.<br />
+ Apache Qpid, Qpid, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation.<br />
+ All other marks mentioned may be trademarks or registered trademarks of their respective owners.
+ </p>
+ </div>
diff --git a/extras/dispatch/site/includes/header.include b/extras/dispatch/site/includes/header.include
new file mode 100644
index 0000000000..244dfc4517
--- /dev/null
+++ b/extras/dispatch/site/includes/header.include
@@ -0,0 +1,6 @@
+ <div class="header">
+ <div class="logo">
+ <h1>Apache Qpid&#8482;</h1>
+ <h2>Open Source AMQP Messaging</h2>
+ </div>
+ </div>
diff --git a/extras/dispatch/site/includes/menu.include b/extras/dispatch/site/includes/menu.include
new file mode 100644
index 0000000000..7cbdbd139d
--- /dev/null
+++ b/extras/dispatch/site/includes/menu.include
@@ -0,0 +1,68 @@
+ <div class="menu_box">
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Apache Qpid Dispatch</h3>
+ <ul>
+ <li><a href="index.html">Back to Qpid</a></li>
+ <li><a href="index.html">Home</a></li>
+ <li><a href="download.html">Download</a></li>
+ <li><a href="getting_started.html">Getting Started</a></li>
+ <li><a href="http://www.apache.org/licenses/">License</a></li>
+ <li><a href="https://cwiki.apache.org/qpid/faq.html">FAQ</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Documentation</h3>
+ <ul>
+ <li><a href="documentation.html#doc-release">Latest Release</a></li>
+ <li><a href="documentation.html#doc-trunk">Trunk</a></li>
+ <li><a href="documentation.html#doc-archives">Archive</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Community</h3>
+ <ul>
+ <li><a href="getting_involved.html">Getting Involved</a></li>
+ <li><a href="source_repository.html">Source Repository</a></li>
+ <li><a href="https://issues.apache.org/jira/browse/qpid">Issue Reporting</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>Developers</h3>
+ <ul>
+ <li><a href="https://cwiki.apache.org/qpid/building.html">Building Qpid</a></li>
+ <li><a href="https://cwiki.apache.org/qpid/developer-pages.html">Developer Pages</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>About AMQP</h3>
+ <ul>
+ <li><a href="amqp.html">What is AMQP?</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+
+ <div class="menu_box_top"></div>
+ <div class="menu_box_body">
+ <h3>About Apache</h3>
+ <ul>
+ <li><a href="http://www.apache.org">Home</a></li>
+ <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
+ <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li>
+ <li><a href="http://www.apache.org/security/">Security</a></li>
+ </ul>
+ </div>
+ <div class="menu_box_bottom"></div>
+ </div>
diff --git a/extras/dispatch/site/index.html b/extras/dispatch/site/index.html
new file mode 100755
index 0000000000..d8f1759492
--- /dev/null
+++ b/extras/dispatch/site/index.html
@@ -0,0 +1,101 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+-->
+<html xmlns="http://www.w3.org/1999/xhtml">
+ <head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+ <title>Apache Qpid Dispatch&#8482;: A Platform for Building AMQP Infrastructure</title>
+ <link href="css/style.css" rel="stylesheet" type="text/css"/>
+ </head>
+
+ <body>
+ <div class="container">
+ <!-- begin header -->
+
+ <div class="header">
+ <div class="logo">
+ <h1>Apache Qpid Dispatch&#8482;</h1>
+ <h2>A Platform for Building AMQP Infrastructure</h2>
+ </div>
+ </div>
+
+ <!-- end header -->
+
+ <!-- begin menu -->
+ <!--#include virtual="includes/menu.include" -->
+ <!-- end menu -->
+
+ <!-- begin content -->
+ <div class="main_text_area">
+ <div class="main_text_area_top"></div>
+
+ <div class="main_text_area_body">
+
+<p>Qpid Dispatch is a library to help developers build infrastructure
+components for AMQP. Dispatch is not a general-purpose Messaging API.
+Rather, it is a foundation on which to build applications, services, and
+appliances that need direct access to the detailed constructs of AMQP.</p>
+<hr width="80%" />
+<h2>Overview</h2>
+<p>Dispatch is an extension of the Engine and Driver interfaces of
+<a href="http://qpid.apache.org/proton">Qpid Proton</a>. It neither
+uses nor exposes the Messenger interface of Proton. Rather, it
+provides a way for developers to use Proton's more detailed Engine
+facility. The following features are provided:</p>
+
+<ul>
+ <li>An asynchronous, event-oriented application environment</li>
+ <li>Safe multi-threaded use of Proton</li>
+ <li>Operating System Signal handling</li>
+ <li>Quiesce and Resume for the application's threads</li>
+ <li>Timers</li>
+ <li>Resilient outbound connections (retry/reconnect)</li>
+ <li>Polling support for the application's non-AMQP file descriptors</li>
+ <li>An AMQP Node Container that allows the developer to create
+ custom node types</li>
+</ul>
+<p />
+<hr width="80%" />
+<h2>Architecture</h2>
+<center><img src="images/arch.png" /></center>
+<ul>
+ <li><b>Proton Engine and Driver</b> provide the underlying AMQP capability</li>
+ <li><a href="doxygen/server/modules.html">Dispatch Server</a>
+ wraps Proton connections in a multi-threaded server environment</li>
+ <li><b>Dispatch Container</b> provides management of AMQP nodes (links, termini, and deliveries)</li>
+ <li><b>Dispatch Message</b> provides efficient message encode/decode, optimized for messaging intermediaries</li>
+ <li>The <b>Application</b> uses all of the above services to implement scalable and performant AMQP infrastructure</li>
+</ul>
+<hr width="80%" />
+
+ </div>
+
+ <div class="main_text_area_bottom"></div>
+ </div>
+ <!-- end content -->
+
+ <!-- begin footer -->
+ <!--#include virtual="includes/footer.include" -->
+ <!-- end footer -->
+
+ </div>
+ </body>
+</html>
diff --git a/extras/dispatch/src/agent.c b/extras/dispatch/src/agent.c
new file mode 100644
index 0000000000..a885042b45
--- /dev/null
+++ b/extras/dispatch/src/agent.c
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/agent.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/timer.h>
+#include <string.h>
+
+
+typedef struct dx_agent_t {
+ hash_t *class_hash;
+ dx_message_list_t in_fifo;
+ dx_message_list_t out_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+} dx_agent_t;
+
+static dx_agent_t *agent = 0;
+
+
+struct dx_agent_class_t {
+ char *fqname;
+ void *context;
+ dx_agent_schema_cb_t schema_handler;
+ dx_agent_query_cb_t query_handler; // 0 iff class is an event.
+};
+
+
+static void dx_agent_timer_handler(void *context)
+{
+ // TODO - Process the in_fifo here
+}
+
+
+void dx_agent_initialize()
+{
+ assert(!agent);
+ agent = NEW(dx_agent_t);
+ agent->class_hash = hash(6, 10, 1);
+ DEQ_INIT(agent->in_fifo);
+ DEQ_INIT(agent->out_fifo);
+ agent->lock = sys_mutex();
+ agent->timer = dx_timer(dx_agent_timer_handler, agent);
+}
+
+
+void dx_agent_finalize(void)
+{
+ sys_mutex_free(agent->lock);
+ dx_timer_free(agent->timer);
+ hash_free(agent->class_hash);
+ free(agent);
+ agent = 0;
+}
+
+
+dx_agent_class_t *dx_agent_register_class(const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler,
+ dx_agent_query_cb_t query_handler)
+{
+ dx_agent_class_t *cls = NEW(dx_agent_class_t);
+ assert(cls);
+ cls->fqname = (char*) malloc(strlen(fqname) + 1);
+ strcpy(cls->fqname, fqname);
+ cls->context = context;
+ cls->schema_handler = schema_handler;
+ cls->query_handler = query_handler;
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
+ int result = hash_insert_const(agent->class_hash, iter, cls);
+ dx_field_iterator_free(iter);
+ assert(result >= 0);
+
+ return cls;
+}
+
+
+dx_agent_class_t *dx_agent_register_event(const char *fqname,
+ void *context,
+ dx_agent_schema_cb_t schema_handler)
+{
+ return dx_agent_register_class(fqname, context, schema_handler, 0);
+}
+
+
+void dx_agent_value_string(const void *correlator, const char *key, const char *value)
+{
+}
+
+
+void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value)
+{
+}
+
+
+void dx_agent_value_null(const void *correlator, const char *key)
+{
+}
+
+
+void dx_agent_value_boolean(const void *correlator, const char *key, bool value)
+{
+}
+
+
+void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len)
+{
+}
+
+
+void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value)
+{
+}
+
+
+void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value)
+{
+}
+
+
+void dx_agent_value_complete(const void *correlator, bool more)
+{
+}
+
+
+void *dx_agent_raise_event(dx_agent_class_t *event)
+{
+ return 0;
+}
+
diff --git a/extras/dispatch/src/alloc.c b/extras/dispatch/src/alloc.c
new file mode 100644
index 0000000000..2b3b953aad
--- /dev/null
+++ b/extras/dispatch/src/alloc.c
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include <memory.h>
+#include <stdio.h>
+
+typedef struct item_t item_t;
+
+struct item_t {
+ DEQ_LINKS(item_t);
+ dx_alloc_type_desc_t *desc;
+};
+
+DEQ_DECLARE(item_t, item_list_t);
+
+struct dx_alloc_pool_t {
+ item_list_t free_list;
+};
+
+dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0};
+dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0};
+
+sys_mutex_t *init_lock;
+item_list_t type_list;
+
+static void dx_alloc_init(dx_alloc_type_desc_t *desc)
+{
+ sys_mutex_lock(init_lock);
+
+ desc->total_size = desc->type_size;
+ if (desc->additional_size)
+ desc->total_size += *desc->additional_size;
+
+ dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d",
+ desc->type_name, desc->type_size, desc->total_size);
+
+ if (!desc->global_pool) {
+ if (desc->config == 0)
+ desc->config = desc->total_size > 256 ?
+ &dx_alloc_default_config_big : &dx_alloc_default_config_small;
+
+ assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
+
+ desc->global_pool = NEW(dx_alloc_pool_t);
+ DEQ_INIT(desc->global_pool->free_list);
+ desc->lock = sys_mutex();
+ desc->stats = NEW(dx_alloc_stats_t);
+ memset(desc->stats, 0, sizeof(dx_alloc_stats_t));
+ }
+
+ item_t *type_item = NEW(item_t);
+ DEQ_ITEM_INIT(type_item);
+ type_item->desc = desc;
+ DEQ_INSERT_TAIL(type_list, type_item);
+
+ sys_mutex_unlock(init_lock);
+}
+
+
+void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool)
+{
+ int idx;
+
+ //
+ // If the descriptor is not initialized, set it up now.
+ //
+ if (!desc->global_pool)
+ dx_alloc_init(desc);
+
+ //
+ // If this is the thread's first pass through here, allocate the
+ // thread-local pool for this type.
+ //
+ if (*tpool == 0) {
+ *tpool = NEW(dx_alloc_pool_t);
+ DEQ_INIT((*tpool)->free_list);
+ }
+
+ dx_alloc_pool_t *pool = *tpool;
+
+ //
+ // Fast case: If there's an item on the local free list, take it off the
+ // list and return it. Since everything we've touched is thread-local,
+ // there is no need to acquire a lock.
+ //
+ item_t *item = DEQ_HEAD(pool->free_list);
+ if (item) {
+ DEQ_REMOVE_HEAD(pool->free_list);
+ return &item[1];
+ }
+
+ //
+ // The local free list is empty, we need to either rebalance a batch
+ // of items from the global list or go to the heap to get new memory.
+ //
+ sys_mutex_lock(desc->lock);
+ if (DEQ_SIZE(desc->global_pool->free_list) >= desc->config->transfer_batch_size) {
+ //
+ // Rebalance a full batch from the global free list to the thread list.
+ //
+ desc->stats->batches_rebalanced_to_threads++;
+ desc->stats->held_by_threads += desc->config->transfer_batch_size;
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ DEQ_INSERT_TAIL(pool->free_list, item);
+ }
+ } else {
+ //
+ // Allocate a full batch from the heap and put it on the thread list.
+ //
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = (item_t*) malloc(sizeof(item_t) + desc->total_size);
+ if (item == 0)
+ break;
+ DEQ_ITEM_INIT(item);
+ item->desc = desc;
+ DEQ_INSERT_TAIL(pool->free_list, item);
+ desc->stats->held_by_threads++;
+ desc->stats->total_alloc_from_heap++;
+ }
+ }
+ sys_mutex_unlock(desc->lock);
+
+ item = DEQ_HEAD(pool->free_list);
+ if (item) {
+ DEQ_REMOVE_HEAD(pool->free_list);
+ return &item[1];
+ }
+
+ return 0;
+}
+
+
+void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p)
+{
+ item_t *item = ((item_t*) p) - 1;
+ int idx;
+
+ //
+ // If this is the thread's first pass through here, allocate the
+ // thread-local pool for this type.
+ //
+ if (*tpool == 0) {
+ *tpool = NEW(dx_alloc_pool_t);
+ DEQ_INIT((*tpool)->free_list);
+ }
+
+ dx_alloc_pool_t *pool = *tpool;
+
+ DEQ_INSERT_TAIL(pool->free_list, item);
+
+ if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
+ return;
+
+ //
+ // We've exceeded the maximum size of the local free list. A batch must be
+ // rebalanced back to the global list.
+ //
+ sys_mutex_lock(desc->lock);
+ desc->stats->batches_rebalanced_to_global++;
+ desc->stats->held_by_threads -= desc->config->transfer_batch_size;
+ for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
+ item = DEQ_HEAD(pool->free_list);
+ DEQ_REMOVE_HEAD(pool->free_list);
+ DEQ_INSERT_TAIL(desc->global_pool->free_list, item);
+ }
+
+ //
+ // If there's a global_free_list size limit, remove items until the limit is
+ // not exceeded.
+ //
+ if (desc->config->global_free_list_max != 0) {
+ while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) {
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ free(item);
+ desc->stats->total_free_to_heap++;
+ }
+ }
+
+ sys_mutex_unlock(desc->lock);
+}
+
+
+void dx_alloc_initialize(void)
+{
+ init_lock = sys_mutex();
+ DEQ_INIT(type_list);
+}
+
diff --git a/extras/dispatch/src/alloc_private.h b/extras/dispatch/src/alloc_private.h
new file mode 100644
index 0000000000..fbb18ccd48
--- /dev/null
+++ b/extras/dispatch/src/alloc_private.h
@@ -0,0 +1,26 @@
+#ifndef __dispatch_alloc_private_h__
+#define __dispatch_alloc_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/alloc.h>
+
+void dx_alloc_initialize(void);
+
+#endif
diff --git a/extras/dispatch/src/auth.c b/extras/dispatch/src/auth.c
new file mode 100644
index 0000000000..f0df58f6c2
--- /dev/null
+++ b/extras/dispatch/src/auth.c
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include "auth.h"
+#include "server_private.h"
+#include <proton/sasl.h>
+
+
+void auth_client_handler(pn_connector_t *cxtr)
+{
+ pn_sasl_t *sasl = pn_connector_sasl(cxtr);
+ pn_sasl_state_t state = pn_sasl_state(sasl);
+ dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr);
+
+ if (state == PN_SASL_CONF) {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_client(sasl);
+ }
+
+ state = pn_sasl_state(sasl);
+
+ if (state == PN_SASL_PASS) {
+ ctx->state = CONN_STATE_OPENING;
+ } else if (state == PN_SASL_FAIL) {
+ ctx->state = CONN_STATE_FAILED;
+ }
+}
+
+
+void auth_server_handler(pn_connector_t *cxtr)
+{
+ pn_sasl_t *sasl = pn_connector_sasl(cxtr);
+ pn_sasl_state_t state = pn_sasl_state(sasl);
+ dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr);
+
+ while (state == PN_SASL_CONF || state == PN_SASL_STEP) {
+ if (state == PN_SASL_CONF) {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_server(sasl);
+ } else if (state == PN_SASL_STEP) {
+ const char* mechanisms = pn_sasl_remote_mechanisms(sasl);
+ if (strcmp(mechanisms, "ANONYMOUS") == 0)
+ pn_sasl_done(sasl, PN_SASL_OK);
+ else
+ pn_sasl_done(sasl, PN_SASL_AUTH);
+ }
+ state = pn_sasl_state(sasl);
+ }
+
+ if (state == PN_SASL_PASS) {
+ ctx->state = CONN_STATE_OPENING;
+ } else if (state == PN_SASL_FAIL) {
+ ctx->state = CONN_STATE_FAILED;
+ }
+}
+
+
diff --git a/extras/dispatch/src/auth.h b/extras/dispatch/src/auth.h
new file mode 100644
index 0000000000..c551c8ff76
--- /dev/null
+++ b/extras/dispatch/src/auth.h
@@ -0,0 +1,27 @@
+#ifndef __auth_h__
+#define __auth_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/driver.h>
+
+void auth_client_handler(pn_connector_t *conn);
+void auth_server_handler(pn_connector_t *conn);
+
+#endif
diff --git a/extras/dispatch/src/buffer.c b/extras/dispatch/src/buffer.c
new file mode 100644
index 0000000000..015711afd9
--- /dev/null
+++ b/extras/dispatch/src/buffer.c
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/buffer.h>
+#include <qpid/dispatch/alloc.h>
+
+static size_t buffer_size = 512;
+static int size_locked = 0;
+
+ALLOC_DECLARE(dx_buffer_t);
+ALLOC_DEFINE_CONFIG(dx_buffer_t, sizeof(dx_buffer_t), &buffer_size, 0);
+
+
+void dx_buffer_set_size(size_t size)
+{
+ assert(!size_locked);
+ buffer_size = size;
+}
+
+
+dx_buffer_t *dx_allocate_buffer(void)
+{
+ size_locked = 1;
+ dx_buffer_t *buf = new_dx_buffer_t();
+
+ DEQ_ITEM_INIT(buf);
+ buf->size = 0;
+ return buf;
+}
+
+
+void dx_free_buffer(dx_buffer_t *buf)
+{
+ free_dx_buffer_t(buf);
+}
+
+
+unsigned char *dx_buffer_base(dx_buffer_t *buf)
+{
+ return (unsigned char*) &buf[1];
+}
+
+
+unsigned char *dx_buffer_cursor(dx_buffer_t *buf)
+{
+ return ((unsigned char*) &buf[1]) + buf->size;
+}
+
+
+size_t dx_buffer_capacity(dx_buffer_t *buf)
+{
+ return buffer_size - buf->size;
+}
+
+
+size_t dx_buffer_size(dx_buffer_t *buf)
+{
+ return buf->size;
+}
+
+
+void dx_buffer_insert(dx_buffer_t *buf, size_t len)
+{
+ buf->size += len;
+ assert(buf->size <= buffer_size);
+}
+
diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c
new file mode 100644
index 0000000000..68e2afa3eb
--- /dev/null
+++ b/extras/dispatch/src/container.c
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/message.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+
+static char *module="CONTAINER";
+
+struct dx_node_t {
+ const dx_node_type_t *ntype;
+ char *name;
+ void *context;
+ dx_dist_mode_t supported_dist;
+ dx_lifetime_policy_t life_policy;
+};
+
+ALLOC_DECLARE(dx_node_t);
+ALLOC_DEFINE(dx_node_t);
+ALLOC_DEFINE(dx_link_item_t);
+
+struct dx_link_t {
+ pn_link_t *pn_link;
+ void *context;
+ dx_node_t *node;
+};
+
+ALLOC_DECLARE(dx_link_t);
+ALLOC_DEFINE(dx_link_t);
+
+typedef struct nxc_node_type_t {
+ DEQ_LINKS(struct nxc_node_type_t);
+ const dx_node_type_t *ntype;
+} nxc_node_type_t;
+DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);
+
+
+static hash_t *node_type_map;
+static hash_t *node_map;
+static sys_mutex_t *lock;
+static dx_node_t *default_node;
+static nxc_node_type_list_t node_type_list;
+
+static void setup_outgoing_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ dx_node_t *node;
+ int result;
+ const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
+ dx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured source
+
+ if (source) {
+ iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ dx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ dx_link_t *link = new_dx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->outgoing_handler(node->context, link);
+}
+
+
+static void setup_incoming_link(pn_link_t *pn_link)
+{
+ sys_mutex_lock(lock);
+ dx_node_t *node;
+ int result;
+ const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_field_iterator_t *iter;
+ // TODO - Extract the name from the structured target
+
+ if (target) {
+ iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
+ result = hash_retrieve(node_map, iter, (void*) &node);
+ dx_field_iterator_free(iter);
+ } else
+ result = -1;
+ sys_mutex_unlock(lock);
+
+ if (result < 0) {
+ if (default_node)
+ node = default_node;
+ else {
+ // Reject the link
+ // TODO - When the API allows, add an error message for "no available node"
+ pn_link_close(pn_link);
+ return;
+ }
+ }
+
+ dx_link_t *link = new_dx_link_t();
+ if (!link) {
+ pn_link_close(pn_link);
+ return;
+ }
+
+ link->pn_link = pn_link;
+ link->context = 0;
+ link->node = node;
+
+ pn_link_set_context(pn_link, link);
+ node->ntype->incoming_handler(node->context, link);
+}
+
+
+static int do_writable(pn_link_t *pn_link)
+{
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ if (!link)
+ return 0;
+
+ dx_node_t *node = link->node;
+ if (!node)
+ return 0;
+
+ return node->ntype->writable_handler(node->context, link);
+}
+
+
+static void process_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node) {
+ node->ntype->rx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ //
+ // Reject the delivery if we couldn't find a node to handle it
+ //
+ pn_link_advance(pn_link);
+ pn_link_flow(pn_link, 1);
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);
+}
+
+
+static void do_send(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node) {
+ node->ntype->tx_handler(node->context, link, delivery);
+ return;
+ }
+ }
+
+ // TODO - Cancel the delivery
+}
+
+
+static void do_updated(pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+
+ if (link) {
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->disp_handler(node->context, link, delivery);
+ }
+}
+
+
+static int close_handler(void* unused, pn_connection_t *conn)
+{
+ //
+ // Close all links, passing False as the 'closed' argument. These links are not
+ // being properly 'detached'. They are being orphaned.
+ //
+ pn_link_t *pn_link = pn_link_head(conn, 0);
+ while (pn_link) {
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 0);
+ pn_link_close(pn_link);
+ free_dx_link_t(link);
+ pn_link = pn_link_next(pn_link, 0);
+ }
+
+ // teardown all sessions
+ pn_session_t *ssn = pn_session_head(conn, 0);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, 0);
+ }
+
+ // teardown the connection
+ pn_connection_close(conn);
+ return 0;
+}
+
+
+static int process_handler(void* unused, pn_connection_t *conn)
+{
+ pn_session_t *ssn;
+ pn_link_t *pn_link;
+ pn_delivery_t *delivery;
+ int event_count = 0;
+
+ // Step 1: setup the engine's connection, and any sessions and links
+ // that may be pending.
+
+ // initialize the connection if it's new
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+ pn_connection_open(conn);
+ event_count++;
+ }
+
+ // open all pending sessions
+ ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
+ while (ssn) {
+ pn_session_open(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+ // configure and open any pending links
+ pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
+ while (pn_link) {
+ if (pn_link_is_sender(pn_link))
+ setup_outgoing_link(pn_link);
+ else
+ setup_incoming_link(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
+ event_count++;
+ }
+
+
+ // Step 2: Now drain all the pending deliveries from the connection's
+ // work queue and process them
+
+ delivery = pn_work_head(conn);
+ while (delivery) {
+ if (pn_delivery_readable(delivery))
+ process_receive(delivery);
+ else if (pn_delivery_writable(delivery))
+ do_send(delivery);
+
+ if (pn_delivery_updated(delivery))
+ do_updated(delivery);
+
+ delivery = pn_work_next(delivery);
+ event_count++;
+ }
+
+ //
+ // Step 2.5: Traverse all of the links on the connection looking for
+ // outgoing links with non-zero credit. Call the attached node's
+ // writable handler for such links.
+ //
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ while (pn_link) {
+ assert(pn_session_connection(pn_link_session(pn_link)) == conn);
+ if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
+ event_count += do_writable(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ }
+
+ // Step 3: Clean up any links or sessions that have been closed by the
+ // remote. If the connection has been closed remotely, clean that up
+ // also.
+
+ // teardown any terminating links
+ pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (pn_link) {
+ dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
+ dx_node_t *node = link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
+ pn_link_close(pn_link);
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown any terminating sessions
+ ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ while (ssn) {
+ pn_session_close(ssn);
+ ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ event_count++;
+ }
+
+ // teardown the connection if it's terminating
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ pn_connection_close(conn);
+ event_count++;
+ }
+
+ return event_count;
+}
+
+
+static void open_handler(dx_connection_t *conn, dx_direction_t dir)
+{
+ const dx_node_type_t *nt;
+
+ //
+ // Note the locking structure in this function. Generally this would be unsafe, but since
+ // this particular list is only ever appended to and never has items inserted or deleted,
+ // this usage is safe in this case.
+ //
+ sys_mutex_lock(lock);
+ nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
+ sys_mutex_unlock(lock);
+
+ pn_connection_open(dx_connection_pn(conn));
+
+ while (nt_item) {
+ nt = nt_item->ntype;
+ if (dir == DX_INCOMING) {
+ if (nt->inbound_conn_open_handler)
+ nt->inbound_conn_open_handler(nt->type_context, conn);
+ } else {
+ if (nt->outbound_conn_open_handler)
+ nt->outbound_conn_open_handler(nt->type_context, conn);
+ }
+
+ sys_mutex_lock(lock);
+ nt_item = DEQ_NEXT(nt_item);
+ sys_mutex_unlock(lock);
+ }
+}
+
+
+static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn)
+{
+ pn_connection_t *conn = dx_connection_pn(dx_conn);
+
+ switch (event) {
+ case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break;
+ case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break;
+ case DX_CONN_EVENT_CLOSE: return close_handler(context, conn);
+ case DX_CONN_EVENT_PROCESS: return process_handler(context, conn);
+ }
+
+ return 0;
+}
+
+
+void dx_container_initialize(void)
+{
+ dx_log(module, LOG_TRACE, "Container Initializing");
+
+ node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
+ node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
+ lock = sys_mutex();
+ default_node = 0;
+ DEQ_INIT(node_type_list);
+
+ dx_server_set_conn_handler(handler);
+}
+
+
+void dx_container_finalize(void)
+{
+}
+
+
+int dx_container_register_node_type(const dx_node_type_t *nt)
+{
+ int result;
+ dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
+ nxc_node_type_t *nt_item = NEW(nxc_node_type_t);
+ DEQ_ITEM_INIT(nt_item);
+ nt_item->ntype = nt;
+
+ sys_mutex_lock(lock);
+ result = hash_insert_const(node_type_map, iter, nt);
+ DEQ_INSERT_TAIL(node_type_list, nt_item);
+ sys_mutex_unlock(lock);
+
+ dx_field_iterator_free(iter);
+ if (result < 0)
+ return result;
+ dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name);
+
+ return 0;
+}
+
+
+void dx_container_set_default_node_type(const dx_node_type_t *nt,
+ void *context,
+ dx_dist_mode_t supported_dist)
+{
+ if (default_node)
+ dx_container_destroy_node(default_node);
+
+ if (nt) {
+ default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
+ dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
+ } else {
+ default_node = 0;
+ dx_log(module, LOG_TRACE, "Default node removed");
+ }
+}
+
+
+dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
+ const char *name,
+ void *context,
+ dx_dist_mode_t supported_dist,
+ dx_lifetime_policy_t life_policy)
+{
+ int result;
+ dx_node_t *node = new_dx_node_t();
+ if (!node)
+ return 0;
+
+ node->ntype = nt;
+ node->name = 0;
+ node->context = context;
+ node->supported_dist = supported_dist;
+ node->life_policy = life_policy;
+
+ if (name) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ result = hash_insert(node_map, iter, node);
+ sys_mutex_unlock(lock);
+ dx_field_iterator_free(iter);
+ if (result < 0) {
+ free_dx_node_t(node);
+ return 0;
+ }
+
+ node->name = (char*) malloc(strlen(name) + 1);
+ strcpy(node->name, name);
+ }
+
+ if (name)
+ dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name);
+
+ return node;
+}
+
+
+void dx_container_destroy_node(dx_node_t *node)
+{
+ if (node->name) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
+ sys_mutex_lock(lock);
+ hash_remove(node_map, iter);
+ sys_mutex_unlock(lock);
+ dx_field_iterator_free(iter);
+ free(node->name);
+ }
+
+ free_dx_node_t(node);
+}
+
+
+void dx_container_node_set_context(dx_node_t *node, void *node_context)
+{
+ node->context = node_context;
+}
+
+
+dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node)
+{
+ return node->supported_dist;
+}
+
+
+dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node)
+{
+ return node->life_policy;
+}
+
+
+dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name)
+{
+ pn_session_t *sess = pn_session(dx_connection_pn(conn));
+ dx_link_t *link = new_dx_link_t();
+
+ if (dir == DX_OUTGOING)
+ link->pn_link = pn_sender(sess, name);
+ else
+ link->pn_link = pn_receiver(sess, name);
+ link->context = node->context;
+ link->node = node;
+
+ pn_link_set_context(link->pn_link, link);
+
+ pn_session_open(sess);
+
+ return link;
+}
+
+
+void dx_link_set_context(dx_link_t *link, void *context)
+{
+ link->context = context;
+}
+
+
+void *dx_link_get_context(dx_link_t *link)
+{
+ return link->context;
+}
+
+
+pn_link_t *dx_link_pn(dx_link_t *link)
+{
+ return link->pn_link;
+}
+
+
+pn_terminus_t *dx_link_source(dx_link_t *link)
+{
+ return pn_link_source(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_target(dx_link_t *link)
+{
+ return pn_link_target(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_remote_source(dx_link_t *link)
+{
+ return pn_link_remote_source(link->pn_link);
+}
+
+
+pn_terminus_t *dx_link_remote_target(dx_link_t *link)
+{
+ return pn_link_remote_target(link->pn_link);
+}
+
+
+void dx_link_activate(dx_link_t *link)
+{
+ if (!link || !link->pn_link)
+ return;
+
+ pn_session_t *sess = pn_link_session(link->pn_link);
+ if (!sess)
+ return;
+
+ pn_connection_t *conn = pn_session_connection(sess);
+ if (!conn)
+ return;
+
+ dx_connection_t *ctx = pn_connection_get_context(conn);
+ if (!ctx)
+ return;
+
+ dx_server_activate(ctx);
+}
+
+
+void dx_link_close(dx_link_t *link)
+{
+ pn_link_close(link->pn_link);
+}
+
+
diff --git a/extras/dispatch/src/hash.c b/extras/dispatch/src/hash.c
new file mode 100644
index 0000000000..c54d5d6fcf
--- /dev/null
+++ b/extras/dispatch/src/hash.c
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include <stdio.h>
+#include <string.h>
+
+typedef struct hash_item_t {
+ DEQ_LINKS(struct hash_item_t);
+ unsigned char *key;
+ union {
+ void *val;
+ const void *val_const;
+ } v;
+} hash_item_t;
+
+ALLOC_DECLARE(hash_item_t);
+ALLOC_DEFINE(hash_item_t);
+DEQ_DECLARE(hash_item_t, items_t);
+
+
+typedef struct bucket_t {
+ items_t items;
+} bucket_t;
+
+
+struct hash_t {
+ bucket_t *buckets;
+ unsigned int bucket_count;
+ unsigned int bucket_mask;
+ int batch_size;
+ size_t size;
+ int is_const;
+};
+
+
+// djb2 hash algorithm
+static unsigned long hash_function(dx_field_iterator_t *iter)
+{
+ unsigned long hash = 5381;
+ int c;
+
+ while (!dx_field_iterator_end(iter)) {
+ c = (int) dx_field_iterator_octet(iter);
+ hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
+ }
+
+ return hash;
+}
+
+
+hash_t *hash(int bucket_exponent, int batch_size, int value_is_const)
+{
+ int i;
+ hash_t *h = NEW(hash_t);
+
+ if (!h)
+ return 0;
+
+ h->bucket_count = 1 << bucket_exponent;
+ h->bucket_mask = h->bucket_count - 1;
+ h->batch_size = batch_size;
+ h->size = 0;
+ h->is_const = value_is_const;
+ h->buckets = NEW_ARRAY(bucket_t, h->bucket_count);
+ for (i = 0; i < h->bucket_count; i++) {
+ DEQ_INIT(h->buckets[i].items);
+ }
+
+ return h;
+}
+
+
+void hash_free(hash_t *h)
+{
+ // TODO - Implement this
+}
+
+
+size_t hash_size(hash_t *h)
+{
+ return h ? h->size : 0;
+}
+
+
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ *error = 0;
+
+ while (item) {
+ if (dx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ if (item) {
+ *error = -1;
+ return 0;
+ }
+
+ item = new_hash_item_t();
+ if (!item) {
+ *error = -2;
+ return 0;
+ }
+
+ DEQ_ITEM_INIT(item);
+ item->key = dx_field_iterator_copy(key);
+
+ DEQ_INSERT_TAIL(h->buckets[idx].items, item);
+ h->size++;
+ return item;
+}
+
+
+int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+{
+ int error = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &error);
+
+ if (item)
+ item->v.val = val;
+ return error;
+}
+
+
+int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+{
+ if (!h->is_const)
+ return -3;
+
+ int error = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &error);
+
+ if (item)
+ item->v.val_const = val;
+ return error;
+}
+
+
+static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ while (item) {
+ if (dx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ return item;
+}
+
+
+int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
+{
+ hash_item_t *item = hash_internal_retrieve(h, key);
+ if (item) {
+ *val = item->v.val;
+ return 0;
+ }
+ return -1;
+}
+
+
+int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
+{
+ if (!h->is_const)
+ return -3;
+
+ hash_item_t *item = hash_internal_retrieve(h, key);
+ if (item) {
+ *val = item->v.val_const;
+ return 0;
+ }
+ return -1;
+}
+
+
+int hash_remove(hash_t *h, dx_field_iterator_t *key)
+{
+ unsigned long idx = hash_function(key) & h->bucket_mask;
+ hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
+
+ while (item) {
+ if (dx_field_iterator_equal(key, item->key))
+ break;
+ item = item->next;
+ }
+
+ if (item) {
+ free(item->key);
+ DEQ_REMOVE(h->buckets[idx].items, item);
+ free_hash_item_t(item);
+ h->size--;
+ return 0;
+ }
+
+ return -1;
+}
+
diff --git a/extras/dispatch/src/iovec.c b/extras/dispatch/src/iovec.c
new file mode 100644
index 0000000000..6ff6874440
--- /dev/null
+++ b/extras/dispatch/src/iovec.c
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iovec.h>
+#include <qpid/dispatch/alloc.h>
+#include <string.h>
+
+#define DX_IOVEC_MAX 64
+
+struct dx_iovec_t {
+ struct iovec iov_array[DX_IOVEC_MAX];
+ struct iovec *iov;
+ int iov_count;
+};
+
+
+ALLOC_DECLARE(dx_iovec_t);
+ALLOC_DEFINE(dx_iovec_t);
+
+
+dx_iovec_t *dx_iovec(int vector_count)
+{
+ dx_iovec_t *iov = new_dx_iovec_t();
+ if (!iov)
+ return 0;
+
+ memset(iov, 0, sizeof(dx_iovec_t));
+
+ iov->iov_count = vector_count;
+ if (vector_count > DX_IOVEC_MAX)
+ iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count);
+ else
+ iov->iov = &iov->iov_array[0];
+
+ return iov;
+}
+
+
+void dx_iovec_free(dx_iovec_t *iov)
+{
+ if (!iov)
+ return;
+
+ if (iov->iov && iov->iov != &iov->iov_array[0])
+ free(iov->iov);
+
+ free_dx_iovec_t(iov);
+}
+
+
+struct iovec *dx_iovec_array(dx_iovec_t *iov)
+{
+ if (!iov)
+ return 0;
+ return iov->iov;
+}
+
+
+int dx_iovec_count(dx_iovec_t *iov)
+{
+ if (!iov)
+ return 0;
+ return iov->iov_count;
+}
+
diff --git a/extras/dispatch/src/iterator.c b/extras/dispatch/src/iterator.c
new file mode 100644
index 0000000000..6ab67f948d
--- /dev/null
+++ b/extras/dispatch/src/iterator.c
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
+#include "message_private.h"
+#include <stdio.h>
+#include <string.h>
+
+typedef enum {
+MODE_TO_END,
+MODE_TO_SLASH
+} parse_mode_t;
+
+struct dx_field_iterator_t {
+ dx_buffer_t *start_buffer;
+ unsigned char *start_cursor;
+ int start_length;
+ dx_buffer_t *buffer;
+ unsigned char *cursor;
+ int length;
+ dx_iterator_view_t view;
+ parse_mode_t mode;
+};
+
+
+ALLOC_DECLARE(dx_field_iterator_t);
+ALLOC_DEFINE(dx_field_iterator_t);
+
+
+typedef enum {
+STATE_START,
+STATE_SLASH_LEFT,
+STATE_SKIPPING_TO_NEXT_SLASH,
+STATE_SCANNING,
+STATE_COLON,
+STATE_COLON_SLASH,
+STATE_AT_NODE_ID
+} state_t;
+
+
+static void view_initialize(dx_field_iterator_t *iter)
+{
+ if (iter->view == ITER_VIEW_ALL) {
+ iter->mode = MODE_TO_END;
+ return;
+ }
+
+ //
+ // Advance to the node-id.
+ //
+ state_t state = STATE_START;
+ unsigned int octet;
+ while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) {
+ octet = dx_field_iterator_octet(iter);
+ switch (state) {
+ case STATE_START :
+ if (octet == '/')
+ state = STATE_SLASH_LEFT;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_SLASH_LEFT :
+ if (octet == '/')
+ state = STATE_SKIPPING_TO_NEXT_SLASH;
+ else
+ state = STATE_AT_NODE_ID;
+ break;
+
+ case STATE_SKIPPING_TO_NEXT_SLASH :
+ if (octet == '/')
+ state = STATE_AT_NODE_ID;
+ break;
+
+ case STATE_SCANNING :
+ if (octet == ':')
+ state = STATE_COLON;
+ break;
+
+ case STATE_COLON :
+ if (octet == '/')
+ state = STATE_COLON_SLASH;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_COLON_SLASH :
+ if (octet == '/')
+ state = STATE_SKIPPING_TO_NEXT_SLASH;
+ else
+ state = STATE_SCANNING;
+ break;
+
+ case STATE_AT_NODE_ID :
+ break;
+ }
+ }
+
+ if (state != STATE_AT_NODE_ID) {
+ //
+ // The address string was relative, not absolute. The node-id
+ // is at the beginning of the string.
+ //
+ iter->buffer = iter->start_buffer;
+ iter->cursor = iter->start_cursor;
+ iter->length = iter->start_length;
+ }
+
+ //
+ // Cursor is now on the first octet of the node-id
+ //
+ if (iter->view == ITER_VIEW_NODE_ID) {
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+
+ if (iter->view == ITER_VIEW_NO_HOST) {
+ iter->mode = MODE_TO_END;
+ return;
+ }
+
+ if (iter->view == ITER_VIEW_NODE_SPECIFIC) {
+ iter->mode = MODE_TO_END;
+ while (!dx_field_iterator_end(iter)) {
+ octet = dx_field_iterator_octet(iter);
+ if (octet == '/')
+ break;
+ }
+ return;
+ }
+}
+
+
+dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view)
+{
+ dx_field_iterator_t *iter = new_dx_field_iterator_t();
+ if (!iter)
+ return 0;
+
+ iter->start_buffer = 0;
+ iter->start_cursor = (unsigned char*) text;
+ iter->start_length = strlen(text);
+
+ dx_field_iterator_reset(iter, view);
+
+ return iter;
+}
+
+
+dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, dx_iterator_view_t view)
+{
+ dx_field_iterator_t *iter = new_dx_field_iterator_t();
+ if (!iter)
+ return 0;
+
+ iter->start_buffer = buffer;
+ iter->start_cursor = dx_buffer_base(buffer) + offset;
+ iter->start_length = length;
+
+ dx_field_iterator_reset(iter, view);
+
+ return iter;
+}
+
+
+void dx_field_iterator_free(dx_field_iterator_t *iter)
+{
+ free_dx_field_iterator_t(iter);
+}
+
+
+void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view)
+{
+ iter->buffer = iter->start_buffer;
+ iter->cursor = iter->start_cursor;
+ iter->length = iter->start_length;
+ iter->view = view;
+
+ view_initialize(iter);
+}
+
+
+unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter)
+{
+ if (iter->length == 0)
+ return (unsigned char) 0;
+
+ unsigned char result = *(iter->cursor);
+
+ iter->cursor++;
+ iter->length--;
+
+ if (iter->length > 0) {
+ if (iter->buffer) {
+ if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) {
+ iter->buffer = iter->buffer->next;
+ if (iter->buffer == 0)
+ iter->length = 0;
+ iter->cursor = dx_buffer_base(iter->buffer);
+ }
+ }
+ }
+
+ if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/')
+ iter->length = 0;
+
+ return result;
+}
+
+
+int dx_field_iterator_end(dx_field_iterator_t *iter)
+{
+ return iter->length == 0;
+}
+
+
+int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string)
+{
+ dx_field_iterator_reset(iter, iter->view);
+ while (!dx_field_iterator_end(iter) && *string) {
+ if (*string != dx_field_iterator_octet(iter))
+ return 0;
+ string++;
+ }
+
+ return (dx_field_iterator_end(iter) && (*string == 0));
+}
+
+
+unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter)
+{
+ int length = 0;
+ int idx = 0;
+ unsigned char *copy;
+
+ dx_field_iterator_reset(iter, iter->view);
+ while (!dx_field_iterator_end(iter)) {
+ dx_field_iterator_octet(iter);
+ length++;
+ }
+
+ dx_field_iterator_reset(iter, iter->view);
+ copy = (unsigned char*) malloc(length + 1);
+ while (!dx_field_iterator_end(iter))
+ copy[idx++] = dx_field_iterator_octet(iter);
+ copy[idx] = '\0';
+
+ return copy;
+}
+
diff --git a/extras/dispatch/src/log.c b/extras/dispatch/src/log.c
new file mode 100644
index 0000000000..d4ec534915
--- /dev/null
+++ b/extras/dispatch/src/log.c
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/log.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+static int mask=LOG_INFO;
+
+static char *cls_prefix(int cls)
+{
+ switch (cls) {
+ case LOG_TRACE : return "TRACE";
+ case LOG_ERROR : return "ERROR";
+ case LOG_INFO : return "INFO";
+ }
+
+ return "";
+}
+
+void dx_log(const char *module, int cls, const char *fmt, ...)
+{
+ if (!(cls & mask))
+ return;
+
+ va_list ap;
+ char line[128];
+
+ va_start(ap, fmt);
+ vsnprintf(line, 127, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line);
+}
+
+void dx_log_set_mask(int _mask)
+{
+ mask = _mask;
+}
+
diff --git a/extras/dispatch/src/message.c b/extras/dispatch/src/message.c
new file mode 100644
index 0000000000..f66e79010c
--- /dev/null
+++ b/extras/dispatch/src/message.c
@@ -0,0 +1,1120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include "message_private.h"
+#include <string.h>
+#include <stdio.h>
+
+ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
+ALLOC_DEFINE(dx_message_content_t);
+
+
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+{
+ unsigned char *local_cursor = *cursor;
+ dx_buffer_t *local_buffer = *buffer;
+
+ int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
+ while (consume > 0) {
+ if (consume < remaining) {
+ local_cursor += consume;
+ consume = 0;
+ } else {
+ consume -= remaining;
+ local_buffer = local_buffer->next;
+ if (local_buffer == 0){
+ local_cursor = 0;
+ break;
+ }
+ local_cursor = dx_buffer_base(local_buffer);
+ remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
+ }
+ }
+
+ *cursor = local_cursor;
+ *buffer = local_buffer;
+}
+
+
+static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
+{
+ unsigned char result = **cursor;
+ advance(cursor, buffer, 1);
+ return result;
+}
+
+
+static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field)
+{
+ unsigned char tag = next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ int consume = 0;
+ switch (tag & 0xF0) {
+ case 0x40 : consume = 0; break;
+ case 0x50 : consume = 1; break;
+ case 0x60 : consume = 2; break;
+ case 0x70 : consume = 4; break;
+ case 0x80 : consume = 8; break;
+ case 0x90 : consume = 16; break;
+
+ case 0xB0 :
+ case 0xD0 :
+ case 0xF0 :
+ consume |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ consume |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ consume |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ // Fall through to the next case...
+
+ case 0xA0 :
+ case 0xC0 :
+ case 0xE0 :
+ consume |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ break;
+ }
+
+ if (field) {
+ field->buffer = *buffer;
+ field->offset = *cursor - dx_buffer_base(*buffer);
+ field->length = consume;
+ field->parsed = 1;
+ }
+
+ advance(cursor, buffer, consume);
+ return 1;
+}
+
+
+static int start_list(unsigned char **cursor, dx_buffer_t **buffer)
+{
+ unsigned char tag = next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ int length = 0;
+ int count = 0;
+
+ switch (tag) {
+ case 0x45 : // list0
+ break;
+ case 0xd0 : // list32
+ length |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ length |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ length |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ length |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ count |= ((int) next_octet(cursor, buffer)) << 24;
+ if (!(*cursor)) return 0;
+ count |= ((int) next_octet(cursor, buffer)) << 16;
+ if (!(*cursor)) return 0;
+ count |= ((int) next_octet(cursor, buffer)) << 8;
+ if (!(*cursor)) return 0;
+ count |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ break;
+
+ case 0xc0 : // list8
+ length |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+
+ count |= (int) next_octet(cursor, buffer);
+ if (!(*cursor)) return 0;
+ break;
+ }
+
+ return count;
+}
+
+
+//
+// Check the buffer chain, starting at cursor to see if it matches the pattern.
+// If the pattern matches, check the next tag to see if it's in the set of expected
+// tags. If not, return zero. If so, set the location descriptor to the good
+// tag and advance the cursor (and buffer, if needed) to the end of the matched section.
+//
+// If there is no match, don't advance the cursor.
+//
+// Return 0 if the pattern matches but the following tag is unexpected
+// Return 0 if the pattern matches and the location already has a pointer (duplicate section)
+// Return 1 if the pattern matches and we've advanced the cursor/buffer
+// Return 1 if the pattern does not match
+//
+static int dx_check_and_advance(dx_buffer_t **buffer,
+ unsigned char **cursor,
+ unsigned char *pattern,
+ int pattern_length,
+ unsigned char *expected_tags,
+ dx_field_location_t *location)
+{
+ dx_buffer_t *test_buffer = *buffer;
+ unsigned char *test_cursor = *cursor;
+
+ if (!test_cursor)
+ return 1; // no match
+
+ unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer);
+ int idx = 0;
+
+ while (idx < pattern_length && *test_cursor == pattern[idx]) {
+ idx++;
+ test_cursor++;
+ if (test_cursor == end_of_buffer) {
+ test_buffer = test_buffer->next;
+ if (test_buffer == 0)
+ return 1; // Pattern didn't match
+ test_cursor = dx_buffer_base(test_buffer);
+ end_of_buffer = test_cursor + dx_buffer_size(test_buffer);
+ }
+ }
+
+ if (idx < pattern_length)
+ return 1; // Pattern didn't match
+
+ //
+ // Pattern matched, check the tag
+ //
+ while (*expected_tags && *test_cursor != *expected_tags)
+ expected_tags++;
+ if (*expected_tags == 0)
+ return 0; // Unexpected tag
+
+ if (location->parsed)
+ return 0; // Duplicate section
+
+ //
+ // Pattern matched and tag is expected. Mark the beginning of the section.
+ //
+ location->parsed = 1;
+ location->buffer = test_buffer;
+ location->offset = test_cursor - dx_buffer_base(test_buffer);
+ location->length = 0;
+
+ //
+ // Advance the pointers to consume the whole section.
+ //
+ int consume = 0;
+ unsigned char tag = next_octet(&test_cursor, &test_buffer);
+ if (!test_cursor) return 0;
+ switch (tag) {
+ case 0x45 : // list0
+ break;
+
+ case 0xd0 : // list32
+ case 0xd1 : // map32
+ case 0xb0 : // vbin32
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24;
+ if (!test_cursor) return 0;
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16;
+ if (!test_cursor) return 0;
+ consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8;
+ if (!test_cursor) return 0;
+ // Fall through to the next case...
+
+ case 0xc0 : // list8
+ case 0xc1 : // map8
+ case 0xa0 : // vbin8
+ consume |= (int) next_octet(&test_cursor, &test_buffer);
+ if (!test_cursor) return 0;
+ break;
+ }
+
+ if (consume)
+ advance(&test_cursor, &test_buffer, consume);
+
+ *cursor = test_cursor;
+ *buffer = test_buffer;
+ return 1;
+}
+
+
+static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len)
+{
+ dx_buffer_t *buf = DEQ_TAIL(msg->buffers);
+
+ while (len > 0) {
+ if (buf == 0 || dx_buffer_capacity(buf) == 0) {
+ buf = dx_allocate_buffer();
+ if (buf == 0)
+ return;
+ DEQ_INSERT_TAIL(msg->buffers, buf);
+ }
+
+ size_t to_copy = dx_buffer_capacity(buf);
+ if (to_copy > len)
+ to_copy = len;
+ memcpy(dx_buffer_cursor(buf), seq, to_copy);
+ dx_buffer_insert(buf, to_copy);
+ len -= to_copy;
+ seq += to_copy;
+ msg->length += to_copy;
+ }
+}
+
+
+static void dx_insert_8(dx_message_content_t *msg, uint8_t value)
+{
+ dx_insert(msg, &value, 1);
+}
+
+
+static void dx_insert_32(dx_message_content_t *msg, uint32_t value)
+{
+ uint8_t buf[4];
+ buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
+ buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
+ buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
+ buf[3] = (uint8_t) (value & 0x000000FF);
+ dx_insert(msg, buf, 4);
+}
+
+
+static void dx_insert_64(dx_message_content_t *msg, uint64_t value)
+{
+ uint8_t buf[8];
+ buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
+ buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48);
+ buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40);
+ buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32);
+ buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24);
+ buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16);
+ buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8);
+ buf[7] = (uint8_t) (value & 0x00000000000000FFL);
+ dx_insert(msg, buf, 8);
+}
+
+
+static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value)
+{
+ while (*buf) {
+ if (*cursor >= dx_buffer_size(*buf)) {
+ *buf = (*buf)->next;
+ *cursor = 0;
+ } else {
+ dx_buffer_base(*buf)[*cursor] = value;
+ (*cursor)++;
+ return;
+ }
+ }
+}
+
+
+static void dx_overwrite_32(dx_field_location_t *field, uint32_t value)
+{
+ dx_buffer_t *buf = field->buffer;
+ size_t cursor = field->offset;
+
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
+ dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF));
+}
+
+
+static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code)
+{
+ //
+ // Insert the short-form performative tag
+ //
+ dx_insert(msg, (const uint8_t*) "\x00\x53", 2);
+ dx_insert_8(msg, code);
+
+ //
+ // Open the list with a list32 tag
+ //
+ dx_insert_8(msg, 0xd0);
+
+ //
+ // Mark the current location to later overwrite the length
+ //
+ msg->compose_length.buffer = DEQ_TAIL(msg->buffers);
+ msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer);
+ msg->compose_length.length = 4;
+ msg->compose_length.parsed = 1;
+
+ dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ //
+ // Mark the current location to later overwrite the count
+ //
+ msg->compose_count.buffer = DEQ_TAIL(msg->buffers);
+ msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer);
+ msg->compose_count.length = 4;
+ msg->compose_count.parsed = 1;
+
+ dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
+
+ msg->length = 4; // Include the length of the count field
+ msg->count = 0;
+}
+
+
+static void dx_end_list(dx_message_content_t *msg)
+{
+ dx_overwrite_32(&msg->compose_length, msg->length);
+ dx_overwrite_32(&msg->compose_count, msg->count);
+}
+
+
+static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+
+ switch (field) {
+ case DX_FIELD_TO:
+ while (1) {
+ if (content->field_to.parsed)
+ return &content->field_to;
+
+ if (content->section_message_properties.parsed == 0)
+ break;
+
+ dx_buffer_t *buffer = content->section_message_properties.buffer;
+ unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset;
+
+ int count = start_list(&cursor, &buffer);
+ int result;
+
+ if (count < 3)
+ break;
+
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &content->field_to); // to
+ if (!result) return 0;
+ }
+ break;
+
+ case DX_FIELD_BODY:
+ while (1) {
+ if (content->body.parsed)
+ return &content->body;
+
+ if (content->section_body.parsed == 0)
+ break;
+
+ dx_buffer_t *buffer = content->section_body.buffer;
+ unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset;
+ int result;
+
+ result = traverse_field(&cursor, &buffer, &content->body);
+ if (!result) return 0;
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+
+dx_message_t *dx_allocate_message()
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t();
+ if (!msg)
+ return 0;
+
+ DEQ_ITEM_INIT(msg);
+ msg->content = new_dx_message_content_t();
+ msg->out_delivery = 0;
+
+ if (msg->content == 0) {
+ free_dx_message_t((dx_message_t*) msg);
+ return 0;
+ }
+
+ memset(msg->content, 0, sizeof(dx_message_content_t));
+ msg->content->lock = sys_mutex();
+ msg->content->ref_count = 1;
+
+ return (dx_message_t*) msg;
+}
+
+
+void dx_free_message(dx_message_t *in_msg)
+{
+ uint32_t rc;
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+
+ sys_mutex_lock(content->lock);
+ rc = --content->ref_count;
+ sys_mutex_unlock(content->lock);
+
+ if (rc == 0) {
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->buffers);
+ dx_free_buffer(buf);
+ buf = DEQ_HEAD(content->buffers);
+ }
+
+ sys_mutex_free(content->lock);
+ free_dx_message_content_t(content);
+ }
+
+ free_dx_message_t((dx_message_t*) msg);
+}
+
+
+dx_message_t *dx_message_copy(dx_message_t *in_msg)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t();
+
+ if (!copy)
+ return 0;
+
+ DEQ_ITEM_INIT(copy);
+ copy->content = content;
+ copy->out_delivery = 0;
+
+ sys_mutex_lock(content->lock);
+ content->ref_count++;
+ sys_mutex_unlock(content->lock);
+
+ return (dx_message_t*) copy;
+}
+
+
+void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+{
+ ((dx_message_pvt_t*) msg)->out_delivery = delivery;
+}
+
+
+pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
+{
+ return ((dx_message_pvt_t*) msg)->out_delivery;
+}
+
+
+void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ content->in_delivery = delivery;
+}
+
+
+pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ return content->in_delivery;
+}
+
+
+dx_message_t *dx_message_receive(pn_delivery_t *delivery)
+{
+ pn_link_t *link = pn_delivery_link(delivery);
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery);
+ ssize_t rc;
+ dx_buffer_t *buf;
+
+ //
+ // If there is no message associated with the delivery, this is the first time
+ // we've received anything on this delivery. Allocate a message descriptor and
+ // link it and the delivery together.
+ //
+ if (!msg) {
+ msg = (dx_message_pvt_t*) dx_allocate_message();
+ pn_delivery_set_context(delivery, (void*) msg);
+
+ //
+ // Record the incoming delivery only if it is not settled. If it is
+ // settled, it should not be recorded as no future operations on it are
+ // permitted.
+ //
+ if (!pn_delivery_settled(delivery))
+ msg->content->in_delivery = delivery;
+ }
+
+ //
+ // Get a reference to the tail buffer on the message. This is the buffer into which
+ // we will store incoming message data. If there is no buffer in the message, allocate
+ // an empty one and add it to the message.
+ //
+ buf = DEQ_TAIL(msg->content->buffers);
+ if (!buf) {
+ buf = dx_allocate_buffer();
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
+ }
+
+ while (1) {
+ //
+ // Try to receive enough data to fill the remaining space in the tail buffer.
+ //
+ rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf));
+
+ //
+ // If we receive PN_EOS, we have come to the end of the message.
+ //
+ if (rc == PN_EOS) {
+ //
+ // If the last buffer in the list is empty, remove it and free it. This
+ // will only happen if the size of the message content is an exact multiple
+ // of the buffer size.
+ //
+ if (dx_buffer_size(buf) == 0) {
+ DEQ_REMOVE_TAIL(msg->content->buffers);
+ dx_free_buffer(buf);
+ }
+ return (dx_message_t*) msg;
+ }
+
+ if (rc > 0) {
+ //
+ // We have received a positive number of bytes for the message. Advance
+ // the cursor in the buffer.
+ //
+ dx_buffer_insert(buf, rc);
+
+ //
+ // If the buffer is full, allocate a new empty buffer and append it to the
+ // tail of the message's list.
+ //
+ if (dx_buffer_capacity(buf) == 0) {
+ buf = dx_allocate_buffer();
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
+ }
+ } else
+ //
+ // We received zero bytes, and no PN_EOS. This means that we've received
+ // all of the data available up to this point, but it does not constitute
+ // the entire message. We'll be back later to finish it up.
+ //
+ break;
+ }
+
+ return 0;
+}
+
+
+void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+
+ // TODO - Handle cases where annotations have been added or modified
+ while (buf) {
+ pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+ buf = DEQ_NEXT(buf);
+ }
+}
+
+
+int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth)
+{
+
+#define LONG 10
+#define SHORT 3
+#define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"
+#define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70"
+#define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"
+#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71"
+#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
+#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72"
+#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
+#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
+#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
+#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74"
+#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
+#define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75"
+#define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"
+#define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76"
+#define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"
+#define FOOTER_SHORT (unsigned char*) "\x00\x53\x78"
+#define TAGS_LIST (unsigned char*) "\x45\xc0\xd0"
+#define TAGS_MAP (unsigned char*) "\xc1\xd1"
+#define TAGS_BINARY (unsigned char*) "\xa0\xb0"
+
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_buffer_t *buffer = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
+
+ if (!buffer)
+ return 0; // Invalid - No data in the message
+
+ if (depth == DX_DEPTH_NONE)
+ return 1;
+
+ cursor = dx_buffer_base(buffer);
+
+ //
+ // MESSAGE HEADER
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header))
+ return 0;
+
+ if (depth == DX_DEPTH_HEADER)
+ return 1;
+
+ //
+ // DELIVERY ANNOTATION
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation))
+ return 0;
+
+ if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS)
+ return 1;
+
+ //
+ // MESSAGE ANNOTATION
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation))
+ return 0;
+
+ if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS)
+ return 1;
+
+ //
+ // PROPERTIES
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties))
+ return 0;
+
+ if (depth == DX_DEPTH_PROPERTIES)
+ return 1;
+
+ //
+ // APPLICATION PROPERTIES
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties))
+ return 0;
+
+ if (depth == DX_DEPTH_APPLICATION_PROPERTIES)
+ return 1;
+
+ //
+ // BODY (Note that this function expects a single data section or a single AMQP sequence)
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body))
+ return 0;
+
+ if (depth == DX_DEPTH_BODY)
+ return 1;
+
+ //
+ // FOOTER
+ //
+ if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer))
+ return 0;
+ if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer))
+ return 0;
+
+ return 1;
+}
+
+
+dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field)
+{
+ dx_field_location_t *loc = dx_message_field_location(msg, field);
+ if (!loc)
+ return 0;
+
+ return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL);
+}
+
+
+dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field)
+{
+ dx_field_location_t *loc = dx_message_field_location(msg, field);
+ if (!loc)
+ return 0;
+
+ //
+ // Count the number of buffers this field straddles
+ //
+ int bufcnt = 1;
+ dx_buffer_t *buf = loc->buffer;
+ size_t bufsize = dx_buffer_size(buf) - loc->offset;
+ ssize_t remaining = loc->length - bufsize;
+
+ while (remaining > 0) {
+ bufcnt++;
+ buf = buf->next;
+ if (!buf)
+ return 0;
+ remaining -= dx_buffer_size(buf);
+ }
+
+ //
+ // Allocate an iovec object big enough to hold the number of buffers
+ //
+ dx_iovec_t *iov = dx_iovec(bufcnt);
+ if (!iov)
+ return 0;
+
+ //
+ // Build out the io vectors with pointers to the segments of the field in buffers
+ //
+ bufcnt = 0;
+ buf = loc->buffer;
+ bufsize = dx_buffer_size(buf) - loc->offset;
+ void *base = dx_buffer_base(buf) + loc->offset;
+ remaining = loc->length;
+
+ while (remaining > 0) {
+ dx_iovec_array(iov)[bufcnt].iov_base = base;
+ dx_iovec_array(iov)[bufcnt].iov_len = bufsize;
+ bufcnt++;
+ remaining -= bufsize;
+ if (remaining > 0) {
+ buf = buf->next;
+ base = dx_buffer_base(buf);
+ bufsize = dx_buffer_size(buf);
+ if (bufsize > remaining)
+ bufsize = remaining;
+ }
+ }
+
+ return iov;
+}
+
+
+void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers)
+{
+ dx_message_begin_header(msg);
+ dx_message_insert_boolean(msg, 0); // durable
+ //dx_message_insert_null(msg); // priority
+ //dx_message_insert_null(msg); // ttl
+ //dx_message_insert_boolean(msg, 0); // first-acquirer
+ //dx_message_insert_uint(msg, 0); // delivery-count
+ dx_message_end_header(msg);
+
+ dx_message_begin_message_properties(msg);
+ dx_message_insert_null(msg); // message-id
+ dx_message_insert_null(msg); // user-id
+ dx_message_insert_string(msg, to); // to
+ //dx_message_insert_null(msg); // subject
+ //dx_message_insert_null(msg); // reply-to
+ //dx_message_insert_null(msg); // correlation-id
+ //dx_message_insert_null(msg); // content-type
+ //dx_message_insert_null(msg); // content-encoding
+ //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time
+ //dx_message_insert_timestamp(msg, 0); // creation-time
+ //dx_message_insert_null(msg); // group-id
+ //dx_message_insert_uint(msg, 0); // group-sequence
+ //dx_message_insert_null(msg); // reply-to-group-id
+ dx_message_end_message_properties(msg);
+
+ if (buffers)
+ dx_message_append_body_data(msg, buffers);
+}
+
+
+void dx_message_begin_header(dx_message_t *msg)
+{
+ dx_start_list_performative(MSG_CONTENT(msg), 0x70);
+}
+
+
+void dx_message_end_header(dx_message_t *msg)
+{
+ dx_end_list(MSG_CONTENT(msg));
+}
+
+
+void dx_message_begin_delivery_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_delivery_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_begin_message_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_message_annotations(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_begin_message_properties(dx_message_t *msg)
+{
+ dx_start_list_performative(MSG_CONTENT(msg), 0x73);
+}
+
+
+void dx_message_end_message_properties(dx_message_t *msg)
+{
+ dx_end_list(MSG_CONTENT(msg));
+}
+
+
+void dx_message_begin_application_properties(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_application_properties(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_buffer_t *buf = DEQ_HEAD(*buffers);
+ uint32_t len = 0;
+
+ //
+ // Calculate the size of the body to be appended.
+ //
+ while (buf) {
+ len += dx_buffer_size(buf);
+ buf = DEQ_NEXT(buf);
+ }
+
+ //
+ // Insert a DATA section performative header.
+ //
+ dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3);
+ if (len < 256) {
+ dx_insert_8(content, 0xa0); // vbin8
+ dx_insert_8(content, (uint8_t) len);
+ } else {
+ dx_insert_8(content, 0xb0); // vbin32
+ dx_insert_32(content, len);
+ }
+
+ //
+ // Move the supplied buffers to the tail of the message's buffer list.
+ //
+ buf = DEQ_HEAD(*buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(*buffers);
+ DEQ_INSERT_TAIL(content->buffers, buf);
+ buf = DEQ_HEAD(*buffers);
+ }
+}
+
+
+void dx_message_begin_body_sequence(dx_message_t *msg)
+{
+}
+
+
+void dx_message_end_body_sequence(dx_message_t *msg)
+{
+}
+
+
+void dx_message_begin_footer(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_footer(dx_message_t *msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_insert_null(dx_message_t *msg)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x40);
+ content->count++;
+}
+
+
+void dx_message_insert_boolean(dx_message_t *msg, int value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (value)
+ dx_insert(content, (const uint8_t*) "\x56\x01", 2);
+ else
+ dx_insert(content, (const uint8_t*) "\x56\x00", 2);
+ content->count++;
+}
+
+
+void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x50);
+ dx_insert_8(content, value);
+ content->count++;
+}
+
+
+void dx_message_insert_uint(dx_message_t *msg, uint32_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (value == 0) {
+ dx_insert_8(content, 0x43); // uint0
+ } else if (value < 256) {
+ dx_insert_8(content, 0x52); // smalluint
+ dx_insert_8(content, (uint8_t) value);
+ } else {
+ dx_insert_8(content, 0x70); // uint
+ dx_insert_32(content, value);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_ulong(dx_message_t *msg, uint64_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (value == 0) {
+ dx_insert_8(content, 0x44); // ulong0
+ } else if (value < 256) {
+ dx_insert_8(content, 0x53); // smallulong
+ dx_insert_8(content, (uint8_t) value);
+ } else {
+ dx_insert_8(content, 0x80); // ulong
+ dx_insert_64(content, value);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (len < 256) {
+ dx_insert_8(content, 0xa0); // vbin8
+ dx_insert_8(content, (uint8_t) len);
+ } else {
+ dx_insert_8(content, 0xb0); // vbin32
+ dx_insert_32(content, len);
+ }
+ dx_insert(content, start, len);
+ content->count++;
+}
+
+
+void dx_message_insert_string(dx_message_t *msg, const char *start)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ uint32_t len = strlen(start);
+
+ if (len < 256) {
+ dx_insert_8(content, 0xa1); // str8-utf8
+ dx_insert_8(content, (uint8_t) len);
+ dx_insert(content, (const uint8_t*) start, len);
+ } else {
+ dx_insert_8(content, 0xb1); // str32-utf8
+ dx_insert_32(content, len);
+ dx_insert(content, (const uint8_t*) start, len);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x98); // uuid
+ dx_insert(content, value, 16);
+ content->count++;
+}
+
+
+void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ if (len < 256) {
+ dx_insert_8(content, 0xa3); // sym8
+ dx_insert_8(content, (uint8_t) len);
+ dx_insert(content, (const uint8_t*) start, len);
+ } else {
+ dx_insert_8(content, 0xb3); // sym32
+ dx_insert_32(content, len);
+ dx_insert(content, (const uint8_t*) start, len);
+ }
+ content->count++;
+}
+
+
+void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_insert_8(content, 0x83); // timestamp
+ dx_insert_64(content, value);
+ content->count++;
+}
+
+
+void dx_message_begin_list(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_list(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_begin_map(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
+
+void dx_message_end_map(dx_message_t* msg)
+{
+ assert(0); // Not Implemented
+}
+
diff --git a/extras/dispatch/src/message_private.h b/extras/dispatch/src/message_private.h
new file mode 100644
index 0000000000..5fb18078f5
--- /dev/null
+++ b/extras/dispatch/src/message_private.h
@@ -0,0 +1,94 @@
+#ifndef __message_private_h__
+#define __message_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/threading.h>
+
+/**
+ * Architecture of the message module:
+ *
+ * +--------------+ +----------------------+
+ * | | | |
+ * | dx_message_t |----------->| dx_message_content_t |
+ * | | +----->| |
+ * +--------------+ | +----------------------+
+ * | |
+ * +--------------+ | | +-------------+ +-------------+ +-------------+
+ * | | | +--->| dx_buffer_t |-->| dx_buffer_t |-->| dx_buffer_t |--/
+ * | dx_message_t |-----+ +-------------+ +-------------+ +-------------+
+ * | |
+ * +--------------+
+ *
+ * The message module provides chained-fixed-sized-buffer storage of message content with multiple
+ * references. If a message is received and is to be queued for multiple destinations, there is only
+ * one copy of the message content in memory but multiple lightweight references to the content.
+ *
+ */
+
+typedef struct {
+ dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
+} dx_field_location_t;
+
+
+// TODO - consider using pointers to dx_field_location_t below to save memory
+// TODO - we need a second buffer list for modified annotations and header
+// There are three message scenarios:
+// 1) Received message is held and forwarded unmodified - single buffer list
+// 2) Received message is held and modified before forwarding - two buffer lists
+// 3) Message is composed internally - single buffer list
+
+typedef struct {
+ sys_mutex_t *lock;
+ uint32_t ref_count; // The number of qmessages referencing this
+ dx_buffer_list_t buffers; // The buffer chain containing the message
+ pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ dx_field_location_t section_message_header; // The message header list
+ dx_field_location_t section_delivery_annotation; // The delivery annotation map
+ dx_field_location_t section_message_annotation; // The message annotation map
+ dx_field_location_t section_message_properties; // The message properties list
+ dx_field_location_t section_application_properties; // The application properties list
+ dx_field_location_t section_body; // The message body: Data
+ dx_field_location_t section_footer; // The footer
+ dx_field_location_t field_user_id; // The string value of the user-id
+ dx_field_location_t field_to; // The string value of the to field
+ dx_field_location_t body; // The body of the message
+ dx_field_location_t compose_length;
+ dx_field_location_t compose_count;
+ uint32_t length;
+ uint32_t count;
+} dx_message_content_t;
+
+typedef struct {
+ DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t
+ dx_message_content_t *content;
+ pn_delivery_t *out_delivery;
+} dx_message_pvt_t;
+
+ALLOC_DECLARE(dx_message_t);
+ALLOC_DECLARE(dx_message_content_t);
+
+#define MSG_CONTENT(m) (((dx_message_pvt_t*) m)->content)
+
+#endif
diff --git a/extras/dispatch/src/posix/threading.c b/extras/dispatch/src/posix/threading.c
new file mode 100644
index 0000000000..8edce86cdc
--- /dev/null
+++ b/extras/dispatch/src/posix/threading.c
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/ctools.h>
+#include <stdio.h>
+#include <pthread.h>
+
+struct sys_mutex_t {
+ pthread_mutex_t mutex;
+ int acquired;
+};
+
+sys_mutex_t *sys_mutex(void)
+{
+ sys_mutex_t *mutex = NEW(sys_mutex_t);
+ pthread_mutex_init(&(mutex->mutex), 0);
+ mutex->acquired = 0;
+ return mutex;
+}
+
+
+void sys_mutex_free(sys_mutex_t *mutex)
+{
+ assert(!mutex->acquired);
+ pthread_mutex_destroy(&(mutex->mutex));
+ free(mutex);
+}
+
+
+void sys_mutex_lock(sys_mutex_t *mutex)
+{
+ pthread_mutex_lock(&(mutex->mutex));
+ assert(!mutex->acquired);
+ mutex->acquired++;
+}
+
+
+void sys_mutex_unlock(sys_mutex_t *mutex)
+{
+ mutex->acquired--;
+ assert(!mutex->acquired);
+ pthread_mutex_unlock(&(mutex->mutex));
+}
+
+
+struct sys_cond_t {
+ pthread_cond_t cond;
+};
+
+
+sys_cond_t *sys_cond(void)
+{
+ sys_cond_t *cond = NEW(sys_cond_t);
+ pthread_cond_init(&(cond->cond), 0);
+ return cond;
+}
+
+
+void sys_cond_free(sys_cond_t *cond)
+{
+ pthread_cond_destroy(&(cond->cond));
+ free(cond);
+}
+
+
+void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex)
+{
+ assert(held_mutex->acquired);
+ held_mutex->acquired--;
+ pthread_cond_wait(&(cond->cond), &(held_mutex->mutex));
+ held_mutex->acquired++;
+}
+
+
+void sys_cond_signal(sys_cond_t *cond)
+{
+ pthread_cond_signal(&(cond->cond));
+}
+
+
+void sys_cond_signal_all(sys_cond_t *cond)
+{
+ pthread_cond_broadcast(&(cond->cond));
+}
+
+
+struct sys_thread_t {
+ pthread_t thread;
+};
+
+sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg)
+{
+ sys_thread_t *thread = NEW(sys_thread_t);
+ pthread_create(&(thread->thread), 0, run_function, arg);
+ return thread;
+}
+
+
+void sys_thread_free(sys_thread_t *thread)
+{
+ free(thread);
+}
+
+
+void sys_thread_join(sys_thread_t *thread)
+{
+ pthread_join(thread->thread, 0);
+}
+
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
new file mode 100644
index 0000000000..6ddc8f45dd
--- /dev/null
+++ b/extras/dispatch/src/router_node.c
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/message.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/hash.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/router.h>
+
+static char *module="ROUTER_NODE";
+
+struct dx_router_t {
+ dx_node_t *node;
+ dx_link_list_t in_links;
+ dx_link_list_t out_links;
+ dx_message_list_t in_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ hash_t *out_hash;
+ uint64_t dtag;
+};
+
+
+typedef struct {
+ dx_link_t *link;
+ dx_message_list_t out_fifo;
+} dx_router_link_t;
+
+
+ALLOC_DECLARE(dx_router_link_t);
+ALLOC_DEFINE(dx_router_link_t);
+
+
+/**
+ * Outbound Delivery Handler
+ */
+static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_message_t *msg;
+ size_t size;
+
+ sys_mutex_lock(router->lock);
+ msg = DEQ_HEAD(rlink->out_fifo);
+ if (!msg) {
+ // TODO - Recind the delivery
+ sys_mutex_unlock(router->lock);
+ return;
+ }
+
+ DEQ_REMOVE_HEAD(rlink->out_fifo);
+ size = (DEQ_SIZE(rlink->out_fifo));
+ sys_mutex_unlock(router->lock);
+
+ dx_message_send(msg, pn_link);
+
+ //
+ // If there is no incoming delivery, it was pre-settled. In this case,
+ // we must pre-settle the outgoing delivery as well.
+ //
+ if (dx_message_in_delivery(msg)) {
+ pn_delivery_set_context(delivery, (void*) msg);
+ dx_message_set_out_delivery(msg, delivery);
+ } else {
+ pn_delivery_settle(delivery);
+ dx_free_message(msg);
+ }
+
+ pn_link_advance(pn_link);
+ pn_link_offered(pn_link, size);
+}
+
+
+/**
+ * Inbound Delivery Handler
+ */
+static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_message_t *msg;
+ int valid_message = 0;
+
+ //
+ // Receive the message into a local representation. If the returned message
+ // pointer is NULL, we have not yet received a complete message.
+ //
+ sys_mutex_lock(router->lock);
+ msg = dx_message_receive(delivery);
+ sys_mutex_unlock(router->lock);
+
+ if (!msg)
+ return;
+
+ //
+ // Validate the message through the Properties section
+ //
+ valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
+ pn_link_advance(pn_link);
+ pn_link_flow(pn_link, 1);
+
+ if (valid_message) {
+ dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
+ dx_router_link_t *rlink;
+ if (iter) {
+ dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ sys_mutex_lock(router->lock);
+ int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+ dx_field_iterator_free(iter);
+
+ if (result == 0) {
+ //
+ // To field is valid and contains a known destination. Enqueue on
+ // the output fifo for the next-hop-to-destination.
+ //
+ pn_link_t* pn_outlink = dx_link_pn(rlink->link);
+ DEQ_INSERT_TAIL(rlink->out_fifo, msg);
+ pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo));
+ dx_link_activate(rlink->link);
+ } else {
+ //
+ // To field contains an unknown address. Release the message.
+ //
+ pn_delivery_update(delivery, PN_RELEASED);
+ pn_delivery_settle(delivery);
+ }
+
+ sys_mutex_unlock(router->lock);
+ }
+ } else {
+ //
+ // Message is invalid. Reject the message.
+ //
+ pn_delivery_update(delivery, PN_REJECTED);
+ pn_delivery_settle(delivery);
+ pn_delivery_set_context(delivery, 0);
+ dx_free_message(msg);
+ }
+}
+
+
+/**
+ * Delivery Disposition Handler
+ */
+static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+{
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+
+ if (pn_link_is_sender(pn_link)) {
+ pn_disposition_t disp = pn_delivery_remote_state(delivery);
+ dx_message_t *msg = pn_delivery_get_context(delivery);
+ pn_delivery_t *activate = 0;
+
+ if (msg) {
+ assert(delivery == dx_message_out_delivery(msg));
+ if (disp != 0) {
+ activate = dx_message_in_delivery(msg);
+ pn_delivery_update(activate, disp);
+ // TODO - handling of the data accompanying RECEIVED/MODIFIED
+ }
+
+ if (pn_delivery_settled(delivery)) {
+ //
+ // Downstream delivery has been settled. Propagate the settlement
+ // upstream.
+ //
+ activate = dx_message_in_delivery(msg);
+ pn_delivery_settle(activate);
+ pn_delivery_settle(delivery);
+ dx_free_message(msg);
+ }
+
+ if (activate) {
+ //
+ // Activate the upstream/incoming link so that the settlement will
+ // get pushed out.
+ //
+ dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate));
+ dx_link_activate(act_link);
+ }
+
+ return;
+ }
+ }
+
+ pn_delivery_settle(delivery);
+}
+
+
+/**
+ * New Incoming Link Handler
+ */
+static int router_incoming_link_handler(void* context, dx_link_t *link)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ dx_link_item_t *item = new_dx_link_item_t();
+ pn_link_t *pn_link = dx_link_pn(link);
+
+ if (item) {
+ DEQ_ITEM_INIT(item);
+ item->link = link;
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, item);
+ sys_mutex_unlock(router->lock);
+
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_flow(pn_link, 8);
+ pn_link_open(pn_link);
+ } else {
+ pn_link_close(pn_link);
+ }
+ return 0;
+}
+
+
+/**
+ * New Outgoing Link Handler
+ */
+static int router_outgoing_link_handler(void* context, dx_link_t *link)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+
+ sys_mutex_lock(router->lock);
+ dx_router_link_t *rlink = new_dx_router_link_t();
+ rlink->link = link;
+ DEQ_INIT(rlink->out_fifo);
+ dx_link_set_context(link, rlink);
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ int result = hash_insert(router->out_hash, iter, rlink);
+ dx_field_iterator_free(iter);
+
+ if (result == 0) {
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_open(pn_link);
+ sys_mutex_unlock(router->lock);
+ dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
+ return 0;
+ }
+
+ dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
+ pn_link_close(pn_link);
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+/**
+ * Outgoing Link Writable Handler
+ */
+static int router_writable_link_handler(void* context, dx_link_t *link)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ int grant_delivery = 0;
+ pn_delivery_t *delivery;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ pn_link_t *pn_link = dx_link_pn(link);
+ uint64_t tag;
+
+ sys_mutex_lock(router->lock);
+ if (DEQ_SIZE(rlink->out_fifo) > 0) {
+ grant_delivery = 1;
+ tag = router->dtag++;
+ }
+ sys_mutex_unlock(router->lock);
+
+ if (grant_delivery) {
+ pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
+ delivery = pn_link_current(pn_link);
+ if (delivery) {
+ router_tx_handler(context, link, delivery);
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+/**
+ * Link Detached Handler
+ */
+static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
+{
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_link_item_t *item;
+
+ sys_mutex_lock(router->lock);
+ if (pn_link_is_sender(pn_link)) {
+ item = DEQ_HEAD(router->out_links);
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ dx_router_link_t *rlink;
+ if (iter) {
+ int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+ if (result == 0) {
+ dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ hash_remove(router->out_hash, iter);
+ free_dx_router_link_t(rlink);
+ dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ }
+ dx_field_iterator_free(iter);
+ }
+ }
+ else
+ item = DEQ_HEAD(router->in_links);
+
+ while (item) {
+ if (item->link == link) {
+ if (pn_link_is_sender(pn_link))
+ DEQ_REMOVE(router->out_links, item);
+ else
+ DEQ_REMOVE(router->in_links, item);
+ free_dx_link_item_t(item);
+ break;
+ }
+ item = item->next;
+ }
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static void router_inbound_open_handler(void *type_context, dx_connection_t *conn)
+{
+}
+
+
+static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
+{
+}
+
+
+static void dx_router_timer_handler(void *context)
+{
+ dx_router_t *router = (dx_router_t*) context;
+
+ //
+ // Periodic processing.
+ //
+ dx_timer_schedule(router->timer, 1000);
+}
+
+
+static dx_node_type_t router_node = {"router", 0, 0,
+ router_rx_handler,
+ router_tx_handler,
+ router_disp_handler,
+ router_incoming_link_handler,
+ router_outgoing_link_handler,
+ router_writable_link_handler,
+ router_link_detach_handler,
+ 0, // node_created_handler
+ 0, // node_destroyed_handler
+ router_inbound_open_handler,
+ router_outbound_open_handler };
+static int type_registered = 0;
+
+
+dx_router_t *dx_router(dx_router_configuration_t *config)
+{
+ if (!type_registered) {
+ type_registered = 1;
+ dx_container_register_node_type(&router_node);
+ }
+
+ dx_router_t *router = NEW(dx_router_t);
+ dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH);
+
+ DEQ_INIT(router->in_links);
+ DEQ_INIT(router->out_links);
+ DEQ_INIT(router->in_fifo);
+
+ router->lock = sys_mutex();
+
+ router->timer = dx_timer(dx_router_timer_handler, (void*) router);
+ dx_timer_schedule(router->timer, 0); // Immediate
+
+ router->out_hash = hash(10, 32, 0);
+ router->dtag = 1;
+
+ return router;
+}
+
+
+void dx_router_free(dx_router_t *router)
+{
+ dx_container_set_default_node_type(0, 0, DX_DIST_BOTH);
+ sys_mutex_free(router->lock);
+ free(router);
+}
+
diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c
new file mode 100644
index 0000000000..0099393f60
--- /dev/null
+++ b/extras/dispatch/src/server.c
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/log.h>
+#include "server_private.h"
+#include "timer_private.h"
+#include "alloc_private.h"
+#include "auth.h"
+#include "work_queue.h"
+#include <stdio.h>
+#include <time.h>
+#include <signal.h>
+
+static char *module="SERVER";
+
+typedef struct dx_thread_t {
+ int thread_id;
+ volatile int running;
+ volatile int canceled;
+ int using_thread;
+ sys_thread_t *thread;
+} dx_thread_t;
+
+
+typedef struct dx_server_t {
+ int thread_count;
+ pn_driver_t *driver;
+ dx_thread_start_cb_t start_handler;
+ dx_conn_handler_cb_t conn_handler;
+ dx_signal_handler_cb_t signal_handler;
+ dx_user_fd_handler_cb_t ufd_handler;
+ void *start_context;
+ void *conn_context;
+ void *signal_context;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ dx_thread_t **threads;
+ work_queue_t *work_queue;
+ dx_timer_list_t pending_timers;
+ bool a_thread_is_waiting;
+ int threads_active;
+ int pause_requests;
+ int threads_paused;
+ int pause_next_sequence;
+ int pause_now_serving;
+ int pending_signal;
+} dx_server_t;
+
+
+ALLOC_DEFINE(dx_listener_t);
+ALLOC_DEFINE(dx_connector_t);
+ALLOC_DEFINE(dx_connection_t);
+ALLOC_DEFINE(dx_user_fd_t);
+
+
+/**
+ * Singleton Concurrent Proton Driver object
+ */
+static dx_server_t *dx_server = 0;
+
+
+static void signal_handler(int signum)
+{
+ dx_server->pending_signal = signum;
+ sys_cond_signal_all(dx_server->cond);
+}
+
+
+static dx_thread_t *thread(int id)
+{
+ dx_thread_t *thread = NEW(dx_thread_t);
+ if (!thread)
+ return 0;
+
+ thread->thread_id = id;
+ thread->running = 0;
+ thread->canceled = 0;
+ thread->using_thread = 0;
+
+ return thread;
+}
+
+
+static void thread_process_listeners(pn_driver_t *driver)
+{
+ pn_listener_t *listener = pn_driver_listener(driver);
+ pn_connector_t *cxtr;
+ dx_connection_t *ctx;
+
+ while (listener) {
+ dx_log(module, LOG_TRACE, "Accepting Connection");
+ cxtr = pn_listener_accept(listener);
+ ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_SASL_SERVER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_cxtr = cxtr;
+ ctx->pn_conn = 0;
+ ctx->listener = (dx_listener_t*) pn_listener_context(listener);
+ ctx->connector = 0;
+ ctx->context = ctx->listener->context;
+ ctx->ufd = 0;
+
+ pn_connector_set_context(cxtr, ctx);
+ listener = pn_driver_listener(driver);
+ }
+}
+
+
+static void handle_signals_LH(void)
+{
+ int signum = dx_server->pending_signal;
+
+ if (signum) {
+ dx_server->pending_signal = 0;
+ if (dx_server->signal_handler) {
+ sys_mutex_unlock(dx_server->lock);
+ dx_server->signal_handler(dx_server->signal_context, signum);
+ sys_mutex_lock(dx_server->lock);
+ }
+ }
+}
+
+
+static void block_if_paused_LH(void)
+{
+ if (dx_server->pause_requests > 0) {
+ dx_server->threads_paused++;
+ sys_cond_signal_all(dx_server->cond);
+ while (dx_server->pause_requests > 0)
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+ dx_server->threads_paused--;
+ }
+}
+
+
+static void process_connector(pn_connector_t *cxtr)
+{
+ dx_connection_t *ctx = pn_connector_context(cxtr);
+ int events = 0;
+ int auth_passes = 0;
+
+ if (ctx->state == CONN_STATE_USER) {
+ dx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
+ return;
+ }
+
+ do {
+ //
+ // Step the engine for pre-handler processing
+ //
+ pn_connector_process(cxtr);
+
+ //
+ // Call the handler that is appropriate for the connector's state.
+ //
+ switch (ctx->state) {
+ case CONN_STATE_CONNECTING:
+ if (!pn_connector_closed(cxtr)) {
+ ctx->state = CONN_STATE_SASL_CLIENT;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
+ events = 1;
+ } else {
+ ctx->state = CONN_STATE_FAILED;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_CLIENT:
+ if (auth_passes == 0) {
+ auth_client_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_SASL_SERVER:
+ if (auth_passes == 0) {
+ auth_server_handler(cxtr);
+ events = 1;
+ } else {
+ auth_passes++;
+ events = 0;
+ }
+ break;
+
+ case CONN_STATE_OPENING:
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, "dispatch"); // TODO - make unique
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+
+ case CONN_STATE_OPERATIONAL:
+ if (pn_connector_closed(cxtr)) {
+ dx_server->conn_handler(ctx->context,
+ DX_CONN_EVENT_CLOSE,
+ (dx_connection_t*) pn_connector_context(cxtr));
+ events = 0;
+ }
+ else
+ events = dx_server->conn_handler(ctx->context,
+ DX_CONN_EVENT_PROCESS,
+ (dx_connection_t*) pn_connector_context(cxtr));
+ break;
+
+ default:
+ break;
+ }
+ } while (events > 0);
+}
+
+
+//
+// TEMPORARY FUNCTION PROTOTYPES
+//
+void pn_driver_wait_1(pn_driver_t *d);
+int pn_driver_wait_2(pn_driver_t *d, int timeout);
+void pn_driver_wait_3(pn_driver_t *d);
+//
+// END TEMPORARY
+//
+
+static void *thread_run(void *arg)
+{
+ dx_thread_t *thread = (dx_thread_t*) arg;
+ pn_connector_t *work;
+ pn_connection_t *conn;
+ dx_connection_t *ctx;
+ int error;
+ int poll_result;
+ int timer_holdoff = 0;
+
+ if (!thread)
+ return 0;
+
+ thread->running = 1;
+
+ if (thread->canceled)
+ return 0;
+
+ //
+ // Invoke the start handler if the application supplied one.
+ // This handler can be used to set NUMA or processor affinnity for the thread.
+ //
+ if (dx_server->start_handler)
+ dx_server->start_handler(dx_server->start_context, thread->thread_id);
+
+ //
+ // Main Loop
+ //
+ while (thread->running) {
+ sys_mutex_lock(dx_server->lock);
+
+ //
+ // Check for pending signals to process
+ //
+ handle_signals_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Check to see if the server is pausing. If so, block here.
+ //
+ block_if_paused_LH();
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Service pending timers.
+ //
+ dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(dx_server->pending_timers);
+
+ //
+ // Mark the timer as idle in case it reschedules itself.
+ //
+ dx_timer_idle_LH(timer);
+
+ //
+ // Release the lock and invoke the connection handler.
+ //
+ sys_mutex_unlock(dx_server->lock);
+ timer->handler(timer->context);
+ pn_driver_wakeup(dx_server->driver);
+ continue;
+ }
+
+ //
+ // Check the work queue for connectors scheduled for processing.
+ //
+ work = work_queue_get(dx_server->work_queue);
+ if (!work) {
+ //
+ // There is no pending work to do
+ //
+ if (dx_server->a_thread_is_waiting) {
+ //
+ // Another thread is waiting on the proton driver, this thread must
+ // wait on the condition variable until signaled.
+ //
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+ } else {
+ //
+ // This thread elects itself to wait on the proton driver. Set the
+ // thread-is-waiting flag so other idle threads will not interfere.
+ //
+ dx_server->a_thread_is_waiting = true;
+
+ //
+ // Ask the timer module when its next timer is scheduled to fire. We'll
+ // use this value in driver_wait as the timeout. If there are no scheduled
+ // timers, the returned value will be -1.
+ //
+ long duration = dx_timer_next_duration_LH();
+
+ //
+ // Invoke the proton driver's wait sequence. This is a bit of a hack for now
+ // and will be improved in the future. The wait process is divided into three parts,
+ // the first and third of which need to be non-reentrant, and the second of which
+ // must be reentrant (and blocks).
+ //
+ pn_driver_wait_1(dx_server->driver);
+ sys_mutex_unlock(dx_server->lock);
+
+ do {
+ error = 0;
+ poll_result = pn_driver_wait_2(dx_server->driver, duration);
+ if (poll_result == -1)
+ error = pn_driver_errno(dx_server->driver);
+ } while (error == PN_INTR);
+ if (error) {
+ dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver)));
+ exit(-1);
+ }
+
+ sys_mutex_lock(dx_server->lock);
+ pn_driver_wait_3(dx_server->driver);
+
+ if (!thread->running) {
+ sys_mutex_unlock(dx_server->lock);
+ break;
+ }
+
+ //
+ // Visit the timer module.
+ //
+ if (poll_result == 0 || ++timer_holdoff == 100) {
+ struct timespec tv;
+ clock_gettime(CLOCK_REALTIME, &tv);
+ long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
+ dx_timer_visit_LH(milliseconds);
+ timer_holdoff = 0;
+ }
+
+ //
+ // Process listeners (incoming connections).
+ //
+ thread_process_listeners(dx_server->driver);
+
+ //
+ // Traverse the list of connectors-needing-service from the proton driver.
+ // If the connector is not already in the work queue and it is not currently
+ // being processed by another thread, put it in the work queue and signal the
+ // condition variable.
+ //
+ work = pn_driver_connector(dx_server->driver);
+ while (work) {
+ ctx = pn_connector_context(work);
+ if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->enqueued = 1;
+ work_queue_put(dx_server->work_queue, work);
+ sys_cond_signal(dx_server->cond);
+ }
+ work = pn_driver_connector(dx_server->driver);
+ }
+
+ //
+ // Release our exclusive claim on pn_driver_wait.
+ //
+ dx_server->a_thread_is_waiting = false;
+ }
+ }
+
+ //
+ // If we were given a connector to work on from the work queue, mark it as
+ // owned by this thread and as no longer enqueued.
+ //
+ if (work) {
+ ctx = pn_connector_context(work);
+ if (ctx->owner_thread == CONTEXT_NO_OWNER) {
+ ctx->owner_thread = thread->thread_id;
+ ctx->enqueued = 0;
+ dx_server->threads_active++;
+ } else {
+ //
+ // This connector is being processed by another thread, re-queue it.
+ //
+ work_queue_put(dx_server->work_queue, work);
+ work = 0;
+ }
+ }
+ sys_mutex_unlock(dx_server->lock);
+
+ //
+ // Process the connector that we now have exclusive access to.
+ //
+ if (work) {
+ process_connector(work);
+
+ //
+ // Check to see if the connector was closed during processing
+ //
+ if (pn_connector_closed(work)) {
+ //
+ // Connector is closed. Free the context and the connector.
+ //
+ conn = pn_connector_connection(work);
+ if (ctx->connector) {
+ ctx->connector->ctx = 0;
+ ctx->connector->state = CXTR_STATE_CONNECTING;
+ dx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+ }
+ sys_mutex_lock(dx_server->lock);
+ free_dx_connection_t(ctx);
+ pn_connector_free(work);
+ if (conn)
+ pn_connection_free(conn);
+ dx_server->threads_active--;
+ sys_mutex_unlock(dx_server->lock);
+ } else {
+ //
+ // The connector lives on. Mark it as no longer owned by this thread.
+ //
+ sys_mutex_lock(dx_server->lock);
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ dx_server->threads_active--;
+ sys_mutex_unlock(dx_server->lock);
+ }
+
+ //
+ // Wake up the proton driver to force it to reconsider its set of FDs
+ // in light of the processing that just occurred.
+ //
+ pn_driver_wakeup(dx_server->driver);
+ }
+ }
+
+ return 0;
+}
+
+
+static void thread_start(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->using_thread = 1;
+ thread->thread = sys_thread(thread_run, (void*) thread);
+}
+
+
+static void thread_cancel(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ thread->running = 0;
+ thread->canceled = 1;
+}
+
+
+static void thread_join(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ if (thread->using_thread)
+ sys_thread_join(thread->thread);
+}
+
+
+static void thread_free(dx_thread_t *thread)
+{
+ if (!thread)
+ return;
+
+ free(thread);
+}
+
+
+static void cxtr_try_open(void *context)
+{
+ dx_connector_t *ct = (dx_connector_t*) context;
+ if (ct->state != CXTR_STATE_CONNECTING)
+ return;
+
+ dx_connection_t *ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_CONNECTING;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = ct;
+ ctx->context = ct->context;
+ ctx->user_context = 0;
+ ctx->ufd = 0;
+
+ //
+ // pn_connector is not thread safe
+ //
+ sys_mutex_lock(dx_server->lock);
+ ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
+ sys_mutex_unlock(dx_server->lock);
+
+ ct->ctx = ctx;
+ ct->delay = 5000;
+ dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
+}
+
+
+void dx_server_initialize(int thread_count)
+{
+ int i;
+
+ if (dx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ dx_alloc_initialize();
+ dx_server = NEW(dx_server_t);
+
+ if (!dx_server)
+ return; // TODO - Fail in a more dramatic way
+
+ dx_server->thread_count = thread_count;
+ dx_server->driver = pn_driver();
+ dx_server->start_handler = 0;
+ dx_server->conn_handler = 0;
+ dx_server->signal_handler = 0;
+ dx_server->ufd_handler = 0;
+ dx_server->start_context = 0;
+ dx_server->signal_context = 0;
+ dx_server->lock = sys_mutex();
+ dx_server->cond = sys_cond();
+
+ dx_timer_initialize(dx_server->lock);
+
+ dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
+ for (i = 0; i < thread_count; i++)
+ dx_server->threads[i] = thread(i);
+
+ dx_server->work_queue = work_queue();
+ DEQ_INIT(dx_server->pending_timers);
+ dx_server->a_thread_is_waiting = false;
+ dx_server->threads_active = 0;
+ dx_server->pause_requests = 0;
+ dx_server->threads_paused = 0;
+ dx_server->pause_next_sequence = 0;
+ dx_server->pause_now_serving = 0;
+ dx_server->pending_signal = 0;
+}
+
+
+void dx_server_finalize(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ for (i = 0; i < dx_server->thread_count; i++)
+ thread_free(dx_server->threads[i]);
+
+ work_queue_free(dx_server->work_queue);
+
+ pn_driver_free(dx_server->driver);
+ sys_mutex_free(dx_server->lock);
+ sys_cond_free(dx_server->cond);
+ free(dx_server);
+ dx_server = 0;
+}
+
+
+void dx_server_set_conn_handler(dx_conn_handler_cb_t handler)
+{
+ dx_server->conn_handler = handler;
+}
+
+
+void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context)
+{
+ dx_server->signal_handler = handler;
+ dx_server->signal_context = context;
+}
+
+
+void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context)
+{
+ dx_server->start_handler = handler;
+ dx_server->start_context = context;
+}
+
+
+void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler)
+{
+ dx_server->ufd_handler = ufd_handler;
+}
+
+
+void dx_server_run(void)
+{
+ int i;
+ if (!dx_server)
+ return;
+
+ assert(dx_server->conn_handler); // Server can't run without a connection handler.
+
+ for (i = 1; i < dx_server->thread_count; i++)
+ thread_start(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
+
+ thread_run((void*) dx_server->threads[0]);
+
+ for (i = 1; i < dx_server->thread_count; i++)
+ thread_join(dx_server->threads[i]);
+
+ dx_log(module, LOG_INFO, "Shut Down");
+}
+
+
+void dx_server_stop(void)
+{
+ int idx;
+
+ sys_mutex_lock(dx_server->lock);
+ for (idx = 0; idx < dx_server->thread_count; idx++)
+ thread_cancel(dx_server->threads[idx]);
+ sys_cond_signal_all(dx_server->cond);
+ pn_driver_wakeup(dx_server->driver);
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_signal(int signum)
+{
+ signal(signum, signal_handler);
+}
+
+
+void dx_server_pause(void)
+{
+ sys_mutex_lock(dx_server->lock);
+
+ //
+ // Bump the request count to stop all the threads.
+ //
+ dx_server->pause_requests++;
+ int my_sequence = dx_server->pause_next_sequence++;
+
+ //
+ // Awaken all threads that are currently blocking.
+ //
+ sys_cond_signal_all(dx_server->cond);
+ pn_driver_wakeup(dx_server->driver);
+
+ //
+ // Wait for the paused thread count plus the number of threads requesting a pause to equal
+ // the total thread count. Also, don't exit the blocking loop until now_serving equals our
+ // sequence number. This ensures that concurrent pausers don't run at the same time.
+ //
+ while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) ||
+ (my_sequence != dx_server->pause_now_serving))
+ sys_cond_wait(dx_server->cond, dx_server->lock);
+
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_resume(void)
+{
+ sys_mutex_lock(dx_server->lock);
+ dx_server->pause_requests--;
+ dx_server->pause_now_serving++;
+ sys_cond_signal_all(dx_server->cond);
+ sys_mutex_unlock(dx_server->lock);
+}
+
+
+void dx_server_activate(dx_connection_t *ctx)
+{
+ if (!ctx)
+ return;
+
+ pn_connector_t *ctor = ctx->pn_cxtr;
+ if (!ctor)
+ return;
+
+ if (!pn_connector_closed(ctor))
+ pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE);
+}
+
+
+void dx_connection_set_context(dx_connection_t *conn, void *context)
+{
+ conn->user_context = context;
+}
+
+
+void *dx_connection_get_context(dx_connection_t *conn)
+{
+ return conn->user_context;
+}
+
+
+pn_connection_t *dx_connection_pn(dx_connection_t *conn)
+{
+ return conn->pn_conn;
+}
+
+
+dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context)
+{
+ dx_listener_t *li = new_dx_listener_t();
+
+ if (!li)
+ return 0;
+
+ li->config = config;
+ li->context = context;
+ li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);
+
+ if (!li->pn_listener) {
+ dx_log(module, LOG_ERROR, "Driver Error %d (%s)",
+ pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver));
+ free_dx_listener_t(li);
+ return 0;
+ }
+ dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
+
+ return li;
+}
+
+
+void dx_server_listener_free(dx_listener_t* li)
+{
+ pn_listener_free(li->pn_listener);
+ free_dx_listener_t(li);
+}
+
+
+void dx_server_listener_close(dx_listener_t* li)
+{
+ pn_listener_close(li->pn_listener);
+}
+
+
+dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context)
+{
+ dx_connector_t *ct = new_dx_connector_t();
+
+ if (!ct)
+ return 0;
+
+ ct->state = CXTR_STATE_CONNECTING;
+ ct->config = config;
+ ct->context = context;
+ ct->ctx = 0;
+ ct->timer = dx_timer(cxtr_try_open, (void*) ct);
+ ct->delay = 0;
+
+ dx_timer_schedule(ct->timer, ct->delay);
+ return ct;
+}
+
+
+void dx_server_connector_free(dx_connector_t* ct)
+{
+ // Don't free the proton connector. This will be done by the connector
+ // processing/cleanup.
+
+ if (ct->ctx) {
+ pn_connector_close(ct->ctx->pn_cxtr);
+ ct->ctx->connector = 0;
+ }
+
+ dx_timer_free(ct->timer);
+ free_dx_connector_t(ct);
+}
+
+
+dx_user_fd_t *dx_user_fd(int fd, void *context)
+{
+ dx_user_fd_t *ufd = new_dx_user_fd_t();
+
+ if (!ufd)
+ return 0;
+
+ dx_connection_t *ctx = new_dx_connection_t();
+ ctx->state = CONN_STATE_USER;
+ ctx->owner_thread = CONTEXT_NO_OWNER;
+ ctx->enqueued = 0;
+ ctx->pn_conn = 0;
+ ctx->listener = 0;
+ ctx->connector = 0;
+ ctx->context = 0;
+ ctx->user_context = 0;
+ ctx->ufd = ufd;
+
+ ufd->context = context;
+ ufd->fd = fd;
+ ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
+ pn_driver_wakeup(dx_server->driver);
+
+ return ufd;
+}
+
+
+void dx_user_fd_free(dx_user_fd_t *ufd)
+{
+ pn_connector_close(ufd->pn_conn);
+ free_dx_user_fd_t(ufd);
+}
+
+
+void dx_user_fd_activate_read(dx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
+ pn_driver_wakeup(dx_server->driver);
+}
+
+
+void dx_user_fd_activate_write(dx_user_fd_t *ufd)
+{
+ pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+ pn_driver_wakeup(dx_server->driver);
+}
+
+
+bool dx_user_fd_is_readable(dx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
+}
+
+
+bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
+{
+ return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
+}
+
+
+void dx_server_timer_pending_LH(dx_timer_t *timer)
+{
+ DEQ_INSERT_TAIL(dx_server->pending_timers, timer);
+}
+
+
+void dx_server_timer_cancel_LH(dx_timer_t *timer)
+{
+ DEQ_REMOVE(dx_server->pending_timers, timer);
+}
+
diff --git a/extras/dispatch/src/server_private.h b/extras/dispatch/src/server_private.h
new file mode 100644
index 0000000000..1722175e35
--- /dev/null
+++ b/extras/dispatch/src/server_private.h
@@ -0,0 +1,96 @@
+#ifndef __server_private_h__
+#define __server_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/user_fd.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/alloc.h>
+#include <proton/driver.h>
+#include <proton/driver_extras.h>
+
+void dx_server_timer_pending_LH(dx_timer_t *timer);
+void dx_server_timer_cancel_LH(dx_timer_t *timer);
+
+
+typedef enum {
+ CONN_STATE_CONNECTING = 0,
+ CONN_STATE_SASL_CLIENT,
+ CONN_STATE_SASL_SERVER,
+ CONN_STATE_OPENING,
+ CONN_STATE_OPERATIONAL,
+ CONN_STATE_FAILED,
+ CONN_STATE_USER
+} conn_state_t;
+
+#define CONTEXT_NO_OWNER -1
+
+typedef enum {
+ CXTR_STATE_CONNECTING = 0,
+ CXTR_STATE_OPEN,
+ CXTR_STATE_FAILED
+} cxtr_state_t;
+
+
+struct dx_listener_t {
+ const dx_server_config_t *config;
+ void *context;
+ pn_listener_t *pn_listener;
+};
+
+
+struct dx_connector_t {
+ cxtr_state_t state;
+ const dx_server_config_t *config;
+ void *context;
+ dx_connection_t *ctx;
+ dx_timer_t *timer;
+ long delay;
+};
+
+
+struct dx_connection_t {
+ conn_state_t state;
+ int owner_thread;
+ int enqueued;
+ pn_connector_t *pn_cxtr;
+ pn_connection_t *pn_conn;
+ dx_listener_t *listener;
+ dx_connector_t *connector;
+ void *context; // Copy of context from listener or connector
+ void *user_context;
+ dx_user_fd_t *ufd;
+};
+
+
+struct dx_user_fd_t {
+ void *context;
+ int fd;
+ pn_connector_t *pn_conn;
+};
+
+
+ALLOC_DECLARE(dx_listener_t);
+ALLOC_DECLARE(dx_connector_t);
+ALLOC_DECLARE(dx_connection_t);
+ALLOC_DECLARE(dx_user_fd_t);
+
+
+#endif
diff --git a/extras/dispatch/src/timer.c b/extras/dispatch/src/timer.c
new file mode 100644
index 0000000000..b6b4864e26
--- /dev/null
+++ b/extras/dispatch/src/timer.c
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "timer_private.h"
+#include "server_private.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/alloc.h>
+#include <assert.h>
+#include <stdio.h>
+
+static sys_mutex_t *lock;
+static dx_timer_list_t idle_timers;
+static dx_timer_list_t scheduled_timers;
+static long time_base;
+
+ALLOC_DECLARE(dx_timer_t);
+ALLOC_DEFINE(dx_timer_t);
+
+//=========================================================================
+// Private static functions
+//=========================================================================
+
+static void dx_timer_cancel_LH(dx_timer_t *timer)
+{
+ switch (timer->state) {
+ case TIMER_FREE:
+ assert(0);
+ break;
+
+ case TIMER_IDLE:
+ break;
+
+ case TIMER_SCHEDULED:
+ if (timer->next)
+ timer->next->delta_time += timer->delta_time;
+ DEQ_REMOVE(scheduled_timers, timer);
+ DEQ_INSERT_TAIL(idle_timers, timer);
+ break;
+
+ case TIMER_PENDING:
+ dx_server_timer_cancel_LH(timer);
+ break;
+ }
+
+ timer->state = TIMER_IDLE;
+}
+
+
+//=========================================================================
+// Public Functions from timer.h
+//=========================================================================
+
+dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context)
+{
+ dx_timer_t *timer = new_dx_timer_t();
+ if (!timer)
+ return 0;
+
+ DEQ_ITEM_INIT(timer);
+
+ timer->handler = cb;
+ timer->context = context;
+ timer->delta_time = 0;
+ timer->state = TIMER_IDLE;
+
+ sys_mutex_lock(lock);
+ DEQ_INSERT_TAIL(idle_timers, timer);
+ sys_mutex_unlock(lock);
+
+ return timer;
+}
+
+
+void dx_timer_free(dx_timer_t *timer)
+{
+ sys_mutex_lock(lock);
+ dx_timer_cancel_LH(timer);
+ DEQ_REMOVE(idle_timers, timer);
+ sys_mutex_unlock(lock);
+
+ timer->state = TIMER_FREE;
+ free_dx_timer_t(timer);
+}
+
+
+void dx_timer_schedule(dx_timer_t *timer, long duration)
+{
+ dx_timer_t *ptr;
+ dx_timer_t *last;
+ long total_time;
+
+ sys_mutex_lock(lock);
+ dx_timer_cancel_LH(timer); // Timer is now on the idle list
+ assert(timer->state == TIMER_IDLE);
+ DEQ_REMOVE(idle_timers, timer);
+
+ //
+ // Handle the special case of a zero-time scheduling. In this case,
+ // the timer doesn't go on the scheduled list. It goes straight to the
+ // pending list in the server.
+ //
+ if (duration == 0) {
+ timer->state = TIMER_PENDING;
+ dx_server_timer_pending_LH(timer);
+ sys_mutex_unlock(lock);
+ return;
+ }
+
+ //
+ // Find the insert point in the schedule.
+ //
+ total_time = 0;
+ ptr = DEQ_HEAD(scheduled_timers);
+ assert(!ptr || ptr->prev == 0);
+ while (ptr) {
+ total_time += ptr->delta_time;
+ if (total_time > duration)
+ break;
+ ptr = ptr->next;
+ }
+
+ //
+ // Insert the timer into the schedule and adjust the delta time
+ // of the following timer if present.
+ //
+ if (total_time <= duration) {
+ assert(ptr == 0);
+ timer->delta_time = duration - total_time;
+ DEQ_INSERT_TAIL(scheduled_timers, timer);
+ } else {
+ total_time -= ptr->delta_time;
+ timer->delta_time = duration - total_time;
+ assert(ptr->delta_time > timer->delta_time);
+ ptr->delta_time -= timer->delta_time;
+ last = ptr->prev;
+ if (last)
+ DEQ_INSERT_AFTER(scheduled_timers, timer, last);
+ else
+ DEQ_INSERT_HEAD(scheduled_timers, timer);
+ }
+
+ timer->state = TIMER_SCHEDULED;
+
+ sys_mutex_unlock(lock);
+}
+
+
+void dx_timer_cancel(dx_timer_t *timer)
+{
+ sys_mutex_lock(lock);
+ dx_timer_cancel_LH(timer);
+ sys_mutex_unlock(lock);
+}
+
+
+//=========================================================================
+// Private Functions from timer_private.h
+//=========================================================================
+
+void dx_timer_initialize(sys_mutex_t *server_lock)
+{
+ lock = server_lock;
+ DEQ_INIT(idle_timers);
+ DEQ_INIT(scheduled_timers);
+ time_base = 0;
+}
+
+
+void dx_timer_finalize(void)
+{
+ lock = 0;
+}
+
+
+long dx_timer_next_duration_LH(void)
+{
+ dx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+ if (timer)
+ return timer->delta_time;
+ return -1;
+}
+
+
+void dx_timer_visit_LH(long current_time)
+{
+ long delta;
+ dx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+
+ if (time_base == 0) {
+ time_base = current_time;
+ return;
+ }
+
+ delta = current_time - time_base;
+ time_base = current_time;
+
+ while (timer) {
+ assert(delta >= 0);
+ if (timer->delta_time > delta) {
+ timer->delta_time -= delta;
+ break;
+ } else {
+ DEQ_REMOVE_HEAD(scheduled_timers);
+ delta -= timer->delta_time;
+ timer->state = TIMER_PENDING;
+ dx_server_timer_pending_LH(timer);
+
+ }
+ timer = DEQ_HEAD(scheduled_timers);
+ }
+}
+
+
+void dx_timer_idle_LH(dx_timer_t *timer)
+{
+ timer->state = TIMER_IDLE;
+ DEQ_INSERT_TAIL(idle_timers, timer);
+}
+
diff --git a/extras/dispatch/src/timer_private.h b/extras/dispatch/src/timer_private.h
new file mode 100644
index 0000000000..618297b18e
--- /dev/null
+++ b/extras/dispatch/src/timer_private.h
@@ -0,0 +1,51 @@
+#ifndef __timer_private_h__
+#define __timer_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/threading.h>
+
+typedef enum {
+ TIMER_FREE,
+ TIMER_IDLE,
+ TIMER_SCHEDULED,
+ TIMER_PENDING
+} dx_timer_state_t;
+
+
+struct dx_timer_t {
+ DEQ_LINKS(dx_timer_t);
+ dx_timer_cb_t handler;
+ void *context;
+ long delta_time;
+ dx_timer_state_t state;
+};
+
+DEQ_DECLARE(dx_timer_t, dx_timer_list_t);
+
+void dx_timer_initialize(sys_mutex_t *server_lock);
+void dx_timer_finalize(void);
+long dx_timer_next_duration_LH(void);
+void dx_timer_visit_LH(long current_time);
+void dx_timer_idle_LH(dx_timer_t *timer);
+
+
+#endif
diff --git a/extras/dispatch/src/work_queue.c b/extras/dispatch/src/work_queue.c
new file mode 100644
index 0000000000..4b3c5d7fa5
--- /dev/null
+++ b/extras/dispatch/src/work_queue.c
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include "work_queue.h"
+#include <string.h>
+#include <stdio.h>
+
+#define BATCH_SIZE 100
+typedef struct work_item_t work_item_t;
+
+struct work_item_t {
+ DEQ_LINKS(work_item_t);
+ pn_connector_t *conn;
+};
+
+DEQ_DECLARE(work_item_t, work_list_t);
+
+struct work_queue_t {
+ work_list_t items;
+ work_list_t free_list;
+};
+
+static void allocate_batch(work_queue_t *w)
+{
+ int i;
+ work_item_t *batch = NEW_ARRAY(work_item_t, BATCH_SIZE);
+ if (!batch)
+ return;
+
+ memset(batch, 0, sizeof(work_item_t) * BATCH_SIZE);
+
+ for (i = 0; i < BATCH_SIZE; i++)
+ DEQ_INSERT_TAIL(w->free_list, &batch[i]);
+}
+
+
+work_queue_t *work_queue(void)
+{
+ work_queue_t *w = NEW(work_queue_t);
+ if (!w)
+ return 0;
+
+ DEQ_INIT(w->items);
+ DEQ_INIT(w->free_list);
+
+ allocate_batch(w);
+
+ return w;
+}
+
+
+void work_queue_free(work_queue_t *w)
+{
+ if (!w)
+ return;
+
+ // KEEP TRACK OF BATCHES AND FREE
+ free(w);
+}
+
+
+void work_queue_put(work_queue_t *w, pn_connector_t *conn)
+{
+ work_item_t *item;
+
+ if (!w)
+ return;
+ if (DEQ_SIZE(w->free_list) == 0)
+ allocate_batch(w);
+ if (DEQ_SIZE(w->free_list) == 0)
+ return;
+
+ item = DEQ_HEAD(w->free_list);
+ DEQ_REMOVE_HEAD(w->free_list);
+
+ item->conn = conn;
+
+ DEQ_INSERT_TAIL(w->items, item);
+}
+
+
+pn_connector_t *work_queue_get(work_queue_t *w)
+{
+ work_item_t *item;
+ pn_connector_t *conn;
+
+ if (!w)
+ return 0;
+ item = DEQ_HEAD(w->items);
+ if (!item)
+ return 0;
+
+ DEQ_REMOVE_HEAD(w->items);
+ conn = item->conn;
+ item->conn = 0;
+
+ DEQ_INSERT_TAIL(w->free_list, item);
+
+ return conn;
+}
+
+
+int work_queue_empty(work_queue_t *w)
+{
+ return !w || DEQ_SIZE(w->items) == 0;
+}
+
+
+int work_queue_depth(work_queue_t *w)
+{
+ if (!w)
+ return 0;
+ return DEQ_SIZE(w->items);
+}
+
diff --git a/extras/dispatch/src/work_queue.h b/extras/dispatch/src/work_queue.h
new file mode 100644
index 0000000000..597a484a9c
--- /dev/null
+++ b/extras/dispatch/src/work_queue.h
@@ -0,0 +1,33 @@
+#ifndef __work_queue_h__
+#define __work_queue_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/driver.h>
+
+typedef struct work_queue_t work_queue_t;
+
+work_queue_t *work_queue(void);
+void work_queue_free(work_queue_t *w);
+void work_queue_put(work_queue_t *w, pn_connector_t *conn);
+pn_connector_t *work_queue_get(work_queue_t *w);
+int work_queue_empty(work_queue_t *w);
+int work_queue_depth(work_queue_t *w);
+
+#endif
diff --git a/extras/dispatch/tests/CMakeLists.txt b/extras/dispatch/tests/CMakeLists.txt
new file mode 100644
index 0000000000..10bf1eb43a
--- /dev/null
+++ b/extras/dispatch/tests/CMakeLists.txt
@@ -0,0 +1,34 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License.
+##
+
+##
+## Build test applications
+##
+set(test_SOURCES
+ alloc_test.c
+ message_test.c
+ run_tests.c
+ server_test.c
+ timer_test.c
+ tool_test.c
+ )
+
+add_executable(run_tests ${test_SOURCES})
+target_link_libraries(run_tests qpid-dispatch)
+
diff --git a/extras/dispatch/tests/alloc_test.c b/extras/dispatch/tests/alloc_test.c
new file mode 100644
index 0000000000..2406048209
--- /dev/null
+++ b/extras/dispatch/tests/alloc_test.c
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include "alloc_private.h"
+
+typedef struct {
+ int A;
+ int B;
+} object_t;
+
+dx_alloc_config_t config = {3, 7, 10};
+
+ALLOC_DECLARE(object_t);
+ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), 0, &config);
+
+
+static char* check_stats(dx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
+{
+ if (stats->total_alloc_from_heap != ah) return "Incorrect alloc-from-heap";
+ if (stats->total_free_to_heap != fh) return "Incorrect free-to-heap";
+ if (stats->held_by_threads != ht) return "Incorrect held-by-threads";
+ if (stats->batches_rebalanced_to_threads != rt) return "Incorrect rebalance-to-threads";
+ if (stats->batches_rebalanced_to_global != rg) return "Incorrect rebalance-to-global";
+ return 0;
+}
+
+
+static char* test_alloc_basic(void *context)
+{
+ object_t *obj[50];
+ int idx;
+ dx_alloc_stats_t *stats;
+ char *error;
+
+ for (idx = 0; idx < 20; idx++)
+ obj[idx] = new_object_t();
+
+ stats = alloc_stats_object_t();
+ error = check_stats(stats, 21, 0, 21, 0, 0);
+ if (error) return error;
+
+ for (idx = 0; idx < 20; idx++)
+ free_object_t(obj[idx]);
+
+ error = check_stats(stats, 21, 5, 6, 0, 5);
+ if (error) return error;
+
+ for (idx = 0; idx < 20; idx++)
+ obj[idx] = new_object_t();
+
+ error = check_stats(stats, 27, 5, 21, 3, 5);
+ if (error) return error;
+
+ return 0;
+}
+
+
+int alloc_tests(void)
+{
+ int result = 0;
+ dx_alloc_initialize();
+
+ TEST_CASE(test_alloc_basic, 0);
+
+ return result;
+}
+
diff --git a/extras/dispatch/tests/message_test.c b/extras/dispatch/tests/message_test.c
new file mode 100644
index 0000000000..590b7f6ed7
--- /dev/null
+++ b/extras/dispatch/tests/message_test.c
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include "message_private.h"
+#include <qpid/dispatch/iterator.h>
+#include <proton/message.h>
+
+
+static char* test_send_to_messenger(void *context)
+{
+ dx_message_t *msg = dx_allocate_message();
+ dx_message_content_t *content = MSG_CONTENT(msg);
+
+ dx_message_compose_1(msg, "test_addr_0", 0);
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ if (buf == 0) return "Expected a buffer in the test message";
+
+ pn_message_t *pn_msg = pn_message();
+ int result = pn_message_decode(pn_msg, (const char*) dx_buffer_base(buf), dx_buffer_size(buf));
+ if (result != 0) return "Error in pn_message_decode";
+
+ if (strcmp(pn_message_get_address(pn_msg), "test_addr_0") != 0)
+ return "Address mismatch in received message";
+
+ pn_message_free(pn_msg);
+ dx_free_message(msg);
+
+ return 0;
+}
+
+
+static char* test_receive_from_messenger(void *context)
+{
+ pn_message_t *pn_msg = pn_message();
+ pn_message_set_address(pn_msg, "test_addr_1");
+
+ dx_buffer_t *buf = dx_allocate_buffer();
+ size_t size = dx_buffer_capacity(buf);
+ int result = pn_message_encode(pn_msg, (char*) dx_buffer_cursor(buf), &size);
+ if (result != 0) return "Error in pn_message_encode";
+ dx_buffer_insert(buf, size);
+
+ dx_message_t *msg = dx_allocate_message();
+ dx_message_content_t *content = MSG_CONTENT(msg);
+
+ DEQ_INSERT_TAIL(content->buffers, buf);
+ int valid = dx_message_check(msg, DX_DEPTH_ALL);
+ if (!valid) return "dx_message_check returns 'invalid'";
+
+ dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
+ if (iter == 0) return "Expected an iterator for the 'to' field";
+
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "test_addr_1"))
+ return "Mismatched 'to' field contents";
+
+ pn_message_free(pn_msg);
+ dx_free_message(msg);
+
+ return 0;
+}
+
+
+static char* test_insufficient_check_depth(void *context)
+{
+ pn_message_t *pn_msg = pn_message();
+ pn_message_set_address(pn_msg, "test_addr_2");
+
+ dx_buffer_t *buf = dx_allocate_buffer();
+ size_t size = dx_buffer_capacity(buf);
+ int result = pn_message_encode(pn_msg, (char*) dx_buffer_cursor(buf), &size);
+ if (result != 0) return "Error in pn_message_encode";
+ dx_buffer_insert(buf, size);
+
+ dx_message_t *msg = dx_allocate_message();
+ dx_message_content_t *content = MSG_CONTENT(msg);
+
+ DEQ_INSERT_TAIL(content->buffers, buf);
+ int valid = dx_message_check(msg, DX_DEPTH_DELIVERY_ANNOTATIONS);
+ if (!valid) return "dx_message_check returns 'invalid'";
+
+ dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
+ if (iter) return "Expected no iterator for the 'to' field";
+
+ dx_free_message(msg);
+
+ return 0;
+}
+
+
+int message_tests(void)
+{
+ int result = 0;
+
+ TEST_CASE(test_send_to_messenger, 0);
+ TEST_CASE(test_receive_from_messenger, 0);
+ TEST_CASE(test_insufficient_check_depth, 0);
+
+ return result;
+}
+
diff --git a/extras/dispatch/tests/run_tests.c b/extras/dispatch/tests/run_tests.c
new file mode 100644
index 0000000000..a677c04577
--- /dev/null
+++ b/extras/dispatch/tests/run_tests.c
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+int tool_tests();
+int timer_tests();
+int alloc_tests();
+int server_tests();
+int message_tests();
+
+int main(int argc, char** argv)
+{
+ int result = 0;
+ result += tool_tests();
+ result += timer_tests();
+ result += alloc_tests();
+ result += server_tests();
+ result += message_tests();
+ return result;
+}
+
diff --git a/extras/dispatch/tests/server_test.c b/extras/dispatch/tests/server_test.c
new file mode 100644
index 0000000000..adeab62af9
--- /dev/null
+++ b/extras/dispatch/tests/server_test.c
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+#include <qpid/dispatch/timer.h>
+#include "test_case.h"
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/user_fd.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/log.h>
+
+#define THREAD_COUNT 4
+#define OCTET_COUNT 100
+
+static sys_mutex_t *test_lock;
+
+static void *expected_context;
+static int call_count;
+static int threads_seen[THREAD_COUNT];
+static char stored_error[512];
+
+static int write_count;
+static int read_count;
+static int fd[2];
+static dx_user_fd_t *ufd_write;
+static dx_user_fd_t *ufd_read;
+
+
+static void thread_start(void *context, int thread_id)
+{
+ sys_mutex_lock(test_lock);
+ if (context != expected_context && !stored_error[0])
+ sprintf(stored_error, "Unexpected Context Value: %lx", (long) context);
+ if (thread_id >= THREAD_COUNT && !stored_error[0])
+ sprintf(stored_error, "Thread_ID too large: %d", thread_id);
+ if (thread_id < 0 && !stored_error[0])
+ sprintf(stored_error, "Thread_ID negative: %d", thread_id);
+
+ call_count++;
+ if (thread_id >= 0 && thread_id < THREAD_COUNT)
+ threads_seen[thread_id]++;
+
+ if (call_count == THREAD_COUNT)
+ dx_server_stop();
+ sys_mutex_unlock(test_lock);
+}
+
+
+static int conn_handler(void *context, dx_conn_event_t event, dx_connection_t *conn)
+{
+ return 0;
+}
+
+
+static void ufd_handler(void *context, dx_user_fd_t *ufd)
+{
+ long dir = (long) context;
+ char buffer;
+ ssize_t len;
+ static int in_read = 0;
+ static int in_write = 0;
+
+ if (dir == 0) { // READ
+ in_read++;
+ assert(in_read == 1);
+ if (!dx_user_fd_is_readable(ufd_read)) {
+ sprintf(stored_error, "Expected Readable");
+ dx_server_stop();
+ } else {
+ len = read(fd[0], &buffer, 1);
+ if (len == 1) {
+ read_count++;
+ if (read_count == OCTET_COUNT)
+ dx_server_stop();
+ }
+ dx_user_fd_activate_read(ufd_read);
+ }
+ in_read--;
+ } else { // WRITE
+ in_write++;
+ assert(in_write == 1);
+ if (!dx_user_fd_is_writeable(ufd_write)) {
+ sprintf(stored_error, "Expected Writable");
+ dx_server_stop();
+ } else {
+ write(fd[1], "X", 1);
+
+ write_count++;
+ if (write_count < OCTET_COUNT)
+ dx_user_fd_activate_write(ufd_write);
+ }
+ in_write--;
+ }
+}
+
+
+static void fd_test_start(void *context)
+{
+ dx_user_fd_activate_read(ufd_read);
+}
+
+
+static char* test_start_handler(void *context)
+{
+ int i;
+
+ dx_server_initialize(THREAD_COUNT);
+
+ expected_context = (void*) 0x00112233;
+ stored_error[0] = 0x0;
+ call_count = 0;
+ for (i = 0; i < THREAD_COUNT; i++)
+ threads_seen[i] = 0;
+
+ dx_server_set_conn_handler(conn_handler);
+ dx_server_set_start_handler(thread_start, expected_context);
+ dx_server_run();
+ dx_server_finalize();
+
+ if (stored_error[0]) return stored_error;
+ if (call_count != THREAD_COUNT) return "Incorrect number of thread-start callbacks";
+ for (i = 0; i < THREAD_COUNT; i++)
+ if (threads_seen[i] != 1) return "Incorrect count on one thread ID";
+
+ return 0;
+}
+
+
+static char* test_user_fd(void *context)
+{
+ int res;
+ dx_timer_t *timer;
+
+ dx_server_initialize(THREAD_COUNT);
+ dx_server_set_conn_handler(conn_handler);
+ dx_server_set_user_fd_handler(ufd_handler);
+ timer = dx_timer(fd_test_start, 0);
+ dx_timer_schedule(timer, 0);
+
+ stored_error[0] = 0x0;
+ res = pipe2(fd, O_NONBLOCK);
+ if (res != 0) return "Error creating pipe2";
+
+ ufd_write = dx_user_fd(fd[1], (void*) 1);
+ ufd_read = dx_user_fd(fd[0], (void*) 0);
+
+ dx_server_run();
+ dx_timer_free(timer);
+ dx_server_finalize();
+ close(fd[0]);
+ close(fd[1]);
+
+ if (stored_error[0]) return stored_error;
+ if (write_count - OCTET_COUNT > 2) sprintf(stored_error, "Excessively high Write Count: %d", write_count);
+ if (read_count != OCTET_COUNT) sprintf(stored_error, "Incorrect Read Count: %d", read_count);;
+
+ if (stored_error[0]) return stored_error;
+ return 0;
+}
+
+
+int server_tests(void)
+{
+ int result = 0;
+ test_lock = sys_mutex();
+ dx_log_set_mask(LOG_NONE);
+
+ TEST_CASE(test_start_handler, 0);
+ TEST_CASE(test_user_fd, 0);
+
+ sys_mutex_free(test_lock);
+ return result;
+}
+
diff --git a/extras/dispatch/tests/test_case.h b/extras/dispatch/tests/test_case.h
new file mode 100644
index 0000000000..6e36b440a5
--- /dev/null
+++ b/extras/dispatch/tests/test_case.h
@@ -0,0 +1,36 @@
+#ifndef _nexus_test_case_h_
+#define _nexus_test_case_h_ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef char* (*testcase_t)(void *context);
+
+#define TEST_CASE(T,C) do { \
+ char *r = T(C); \
+ printf("Test Case %s.%s: ", __FUNCTION__, #T); \
+ if (r) { \
+ printf("FAIL: %s\n", r); \
+ result++; \
+ } else \
+ printf("PASS\n"); \
+} while(0);
+
+
+#endif
+
diff --git a/extras/dispatch/tests/timer_test.c b/extras/dispatch/tests/timer_test.c
new file mode 100644
index 0000000000..3d199f2aa2
--- /dev/null
+++ b/extras/dispatch/tests/timer_test.c
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/dispatch/timer.h>
+#include "alloc_private.h"
+#include "timer_private.h"
+#include "test_case.h"
+#include <qpid/dispatch/threading.h>
+
+
+static unsigned long fire_mask;
+static dx_timer_list_t pending_timers;
+static sys_mutex_t *lock;
+static long time;
+static dx_timer_t *timers[16];
+
+
+void dx_server_timer_pending_LH(dx_timer_t *timer)
+{
+ DEQ_INSERT_TAIL(pending_timers, timer);
+}
+
+
+void dx_server_timer_cancel_LH(dx_timer_t *timer)
+{
+ if (timer->state == TIMER_PENDING)
+ DEQ_REMOVE(pending_timers, timer);
+}
+
+
+static int fire_head()
+{
+ sys_mutex_lock(lock);
+ int result = DEQ_SIZE(pending_timers);
+ dx_timer_t *timer = DEQ_HEAD(pending_timers);
+ if (timer) {
+ DEQ_REMOVE_HEAD(pending_timers);
+ dx_timer_idle_LH(timer);
+ fire_mask |= (unsigned long) timer->context;
+ }
+ sys_mutex_unlock(lock);
+ return result;
+}
+
+
+static char* test_quiet(void *context)
+{
+ fire_mask = 0;
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+
+ while(fire_head());
+
+ if (fire_mask != 0)
+ return "Expected zero timers fired";
+ return 0;
+}
+
+static char* test_immediate(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 0);
+
+ if (fire_mask != 0) return "Premature firing";
+ if (fire_head() > 1) return "Too many firings";
+ if (fire_mask != 1) return "Incorrect fire mask";
+
+ return 0;
+}
+
+
+static char* test_immediate_plus_delayed(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 0);
+ dx_timer_schedule(timers[1], 5);
+
+ if (fire_mask != 0) return "Premature firing";
+ if (fire_head() > 1) return "Too many firings";
+ if (fire_mask != 1) return "Incorrect fire mask 1";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ time += 8;
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+
+ if (fire_head() < 1) return "Delayed Failed to fire";
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ return 0;
+}
+
+
+static char* test_single(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 2);
+ if (fire_head() > 0) return "Premature firing 1";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() > 0) return "Premature firing 2";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() < 1) return "Failed to fire";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() != 0) return "Spurious fires";
+
+ if (fire_mask != 1) return "Incorrect fire mask";
+ if (timers[0]->state != TIMER_IDLE) return "Expected idle timer state";
+
+ return 0;
+}
+
+
+static char* test_two_inorder(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 2);
+ dx_timer_schedule(timers[1], 4);
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ int count = fire_head();
+ if (count < 1) return "First failed to fire";
+ if (count > 1) return "Second fired prematurely";
+ if (fire_mask != 1) return "Incorrect fire mask 1";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() < 1) return "Second failed to fire";
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ return 0;
+}
+
+
+static char* test_two_reverse(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 4);
+ dx_timer_schedule(timers[1], 2);
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ int count = fire_head();
+ if (count < 1) return "First failed to fire";
+ if (count > 1) return "Second fired prematurely";
+ if (fire_mask != 2) return "Incorrect fire mask 2";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() < 1) return "Second failed to fire";
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ return 0;
+}
+
+
+static char* test_two_duplicate(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 2);
+ dx_timer_schedule(timers[1], 2);
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ int count = fire_head();
+ if (count != 2) return "Expected two firings";
+ fire_head();
+ if (fire_mask != 3) return "Incorrect fire mask 3";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ if (fire_head() > 0) return "Spurious timer fires";
+
+ return 0;
+}
+
+
+static char* test_separated(void *context)
+{
+ int count;
+
+ while(fire_head());
+ fire_mask = 0;
+
+ dx_timer_schedule(timers[0], 2);
+ dx_timer_schedule(timers[1], 4);
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ if (count < 1) return "First failed to fire";
+ if (count > 1) return "Second fired prematurely";
+ if (fire_mask != 1) return "Incorrect fire mask 1";
+
+ dx_timer_schedule(timers[2], 2);
+ dx_timer_schedule(timers[3], 4);
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ fire_head();
+ if (count < 1) return "Second failed to fire";
+ if (count < 2) return "Third failed to fire";
+ if (fire_mask != 7) return "Incorrect fire mask 7";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ if (count < 1) return "Fourth failed to fire";
+ if (fire_mask != 15) return "Incorrect fire mask 15";
+
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ count = fire_head();
+ if (count > 0) return "Spurious fire";
+
+ return 0;
+}
+
+
+static char* test_big(void *context)
+{
+ while(fire_head());
+ fire_mask = 0;
+
+ long durations[16] =
+ { 5, 8, 7, 6,
+ 14, 10, 16, 15,
+ 11, 12, 9, 12,
+ 1, 2, 3, 4};
+ unsigned long masks[18] = {
+ 0x1000,
+ 0x3000,
+ 0x7000,
+ 0xf000,
+ 0xf001,
+ 0xf009,
+ 0xf00d,
+ 0xf00f,
+ 0xf40f,
+ 0xf42f,
+ 0xf52f,
+ 0xff2f,
+ 0xff2f,
+ 0xff3f,
+ 0xffbf,
+ 0xffff,
+ 0xffff,
+ 0xffff
+ };
+
+ int i;
+ for (i = 0; i < 16; i++)
+ dx_timer_schedule(timers[i], durations[i]);
+ for (i = 0; i < 18; i++) {
+ sys_mutex_lock(lock);
+ dx_timer_visit_LH(time++);
+ sys_mutex_unlock(lock);
+ while(fire_head());
+ if (fire_mask != masks[i]) {
+ static char error[100];
+ sprintf(error, "Iteration %d: expected mask %04lx, got %04lx", i, masks[i], fire_mask);
+ return error;
+ }
+ }
+
+ return 0;
+}
+
+
+int timer_tests(void)
+{
+ int result = 0;
+ dx_alloc_initialize();
+
+ fire_mask = 0;
+ DEQ_INIT(pending_timers);
+ lock = sys_mutex();
+ dx_timer_initialize(lock);
+ time = 1;
+
+ timers[0] = dx_timer(0, (void*) 0x00000001);
+ timers[1] = dx_timer(0, (void*) 0x00000002);
+ timers[2] = dx_timer(0, (void*) 0x00000004);
+ timers[3] = dx_timer(0, (void*) 0x00000008);
+ timers[4] = dx_timer(0, (void*) 0x00000010);
+ timers[5] = dx_timer(0, (void*) 0x00000020);
+ timers[6] = dx_timer(0, (void*) 0x00000040);
+ timers[7] = dx_timer(0, (void*) 0x00000080);
+ timers[8] = dx_timer(0, (void*) 0x00000100);
+ timers[9] = dx_timer(0, (void*) 0x00000200);
+ timers[10] = dx_timer(0, (void*) 0x00000400);
+ timers[11] = dx_timer(0, (void*) 0x00000800);
+ timers[12] = dx_timer(0, (void*) 0x00001000);
+ timers[13] = dx_timer(0, (void*) 0x00002000);
+ timers[14] = dx_timer(0, (void*) 0x00004000);
+ timers[15] = dx_timer(0, (void*) 0x00008000);
+
+ TEST_CASE(test_quiet, 0);
+ TEST_CASE(test_immediate, 0);
+ TEST_CASE(test_immediate_plus_delayed, 0);
+ TEST_CASE(test_single, 0);
+ TEST_CASE(test_two_inorder, 0);
+ TEST_CASE(test_two_reverse, 0);
+ TEST_CASE(test_two_duplicate, 0);
+ TEST_CASE(test_separated, 0);
+ TEST_CASE(test_big, 0);
+
+ int i;
+ for (i = 0; i < 16; i++)
+ dx_timer_free(timers[i]);
+
+ dx_timer_finalize();
+
+ return result;
+}
+
diff --git a/extras/dispatch/tests/tool_test.c b/extras/dispatch/tests/tool_test.c
new file mode 100644
index 0000000000..7923ee3381
--- /dev/null
+++ b/extras/dispatch/tests/tool_test.c
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include <qpid/dispatch/ctools.h>
+
+typedef struct item_t {
+ DEQ_LINKS(struct item_t);
+ char letter;
+} item_t;
+
+DEQ_DECLARE(item_t, item_list_t);
+
+
+static char* list_well_formed(item_list_t list, char *key)
+{
+ item_t *ptr;
+ item_t *last = 0;
+ int size = DEQ_SIZE(list);
+ int count = 0;
+ char str[32];
+
+ ptr = DEQ_HEAD(list);
+ while (ptr) {
+ str[count] = ptr->letter;
+ count++;
+ if (DEQ_PREV(ptr) != last) return "Corrupt previous link";
+ last = ptr;
+ ptr = DEQ_NEXT(ptr);
+ }
+ str[count] = '\0';
+ if (strcmp(str, key) != 0) return "Invalid key";
+
+ if (count != size) return "Size different from number of items (forward)";
+
+ count = 0;
+ last = 0;
+ ptr = DEQ_TAIL(list);
+ while (ptr) {
+ count++;
+ if (DEQ_NEXT(ptr) != last) return "Corrupt next link";
+ last = ptr;
+ ptr = DEQ_PREV(ptr);
+ }
+
+ if (count != size) return "Size different from number of items (backward)";
+
+ return 0;
+}
+
+
+static char* test_deq_basic(void *context)
+{
+ item_list_t list;
+ item_t item[10];
+ item_t *ptr;
+ int idx;
+ char *subtest;
+
+ DEQ_INIT(list);
+ if (DEQ_SIZE(list) != 0) return "Expected zero initial size";
+
+ for (idx = 0; idx < 10; idx++) {
+ DEQ_ITEM_INIT(&item[idx]);
+ item[idx].letter = 'A' + idx;
+ DEQ_INSERT_TAIL(list, &item[idx]);
+ }
+ if (DEQ_SIZE(list) != 10) return "Expected 10 items in list";
+
+ ptr = DEQ_HEAD(list);
+ if (!ptr) return "Expected valid head item";
+ if (DEQ_PREV(ptr)) return "Head item has non-null previous link";
+ if (ptr->letter != 'A') return "Expected item A at the head";
+ if (DEQ_NEXT(ptr) == 0) return "Head item has null next link";
+ subtest = list_well_formed(list, "ABCDEFGHIJ");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE_HEAD(list);
+ if (DEQ_SIZE(list) != 9) return "Expected 9 items in list";
+ ptr = DEQ_HEAD(list);
+ if (ptr->letter != 'B') return "Expected item B at the head";
+ subtest = list_well_formed(list, "BCDEFGHIJ");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE_TAIL(list);
+ if (DEQ_SIZE(list) != 8) return "Expected 8 items in list";
+ ptr = DEQ_TAIL(list);
+ if (ptr->letter != 'I') return "Expected item I at the tail";
+ subtest = list_well_formed(list, "BCDEFGHI");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE(list, &item[4]);
+ if (DEQ_SIZE(list) != 7) return "Expected 7 items in list";
+ subtest = list_well_formed(list, "BCDFGHI");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE(list, &item[1]);
+ if (DEQ_SIZE(list) != 6) return "Expected 6 items in list";
+ subtest = list_well_formed(list, "CDFGHI");
+ if (subtest) return subtest;
+
+ DEQ_REMOVE(list, &item[8]);
+ if (DEQ_SIZE(list) != 5) return "Expected 5 items in list";
+ subtest = list_well_formed(list, "CDFGH");
+ if (subtest) return subtest;
+
+ DEQ_INSERT_HEAD(list, &item[8]);
+ if (DEQ_SIZE(list) != 6) return "Expected 6 items in list";
+ ptr = DEQ_HEAD(list);
+ if (ptr->letter != 'I') return "Expected item I at the head";
+ subtest = list_well_formed(list, "ICDFGH");
+ if (subtest) return subtest;
+
+ DEQ_INSERT_AFTER(list, &item[4], &item[7]);
+ if (DEQ_SIZE(list) != 7) return "Expected 7 items in list";
+ ptr = DEQ_TAIL(list);
+ if (ptr->letter != 'E') return "Expected item E at the head";
+ subtest = list_well_formed(list, "ICDFGHE");
+ if (subtest) return subtest;
+
+ DEQ_INSERT_AFTER(list, &item[1], &item[5]);
+ if (DEQ_SIZE(list) != 8) return "Expected 8 items in list";
+ subtest = list_well_formed(list, "ICDFBGHE");
+ if (subtest) return subtest;
+
+ if (item[0].prev || item[0].next) return "Unlisted item A has non-null pointers";
+ if (item[9].prev || item[9].next) return "Unlisted item J has non-null pointers";
+
+ return 0;
+}
+
+
+int tool_tests(void)
+{
+ int result = 0;
+
+ TEST_CASE(test_deq_basic, 0);
+
+ return result;
+}
+