summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-04-26 16:34:33 +0000
committerTed Ross <tross@apache.org>2013-04-26 16:34:33 +0000
commit64ab8be9c34528ef71ca5c58ff075ed57a48c9e0 (patch)
tree82c66d38e2d710d780b22c71d5afd6e00297082e
parentcf474845241b0c206710dbd9c87fe2e752c512a0 (diff)
downloadqpid-python-64ab8be9c34528ef71ca5c58ff075ed57a48c9e0.tar.gz
NO-JIRA - Development update to Dispatch Router
- Began refactoring of the routing table to support in-process destinations and multi-hop paths. - Added API for the internal management agent. Began integrating the agent with the router module for communication. - Added field parsing to handle topological addresses. - Added tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476280 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/dispatch/CMakeLists.txt2
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch.h7
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/agent.h18
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/error.h29
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/hash.h11
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/iterator.h46
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/log.h3
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/message.h2
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/router.h24
-rw-r--r--qpid/extras/dispatch/router/src/main.c14
-rw-r--r--qpid/extras/dispatch/src/agent.c46
-rw-r--r--qpid/extras/dispatch/src/container.c83
-rw-r--r--qpid/extras/dispatch/src/dispatch.c27
-rw-r--r--qpid/extras/dispatch/src/hash.c68
-rw-r--r--qpid/extras/dispatch/src/iterator.c231
-rw-r--r--qpid/extras/dispatch/src/log.c53
-rw-r--r--qpid/extras/dispatch/src/message.c8
-rw-r--r--qpid/extras/dispatch/src/router_node.c190
-rw-r--r--qpid/extras/dispatch/src/server.c12
-rw-r--r--qpid/extras/dispatch/tests/CMakeLists.txt11
-rw-r--r--qpid/extras/dispatch/tests/field_test.c149
-rw-r--r--qpid/extras/dispatch/tests/run_unit_tests.c (renamed from qpid/extras/dispatch/tests/run_tests.c)2
-rw-r--r--qpid/extras/dispatch/tests/server_test.c6
23 files changed, 856 insertions, 186 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt
index e5e94fec3a..f96f62817b 100644
--- a/qpid/extras/dispatch/CMakeLists.txt
+++ b/qpid/extras/dispatch/CMakeLists.txt
@@ -59,7 +59,7 @@ find_library(pthread_lib pthread)
find_library(rt_lib rt)
find_path(proton_include proton/driver.h)
-set(CMAKE_C_FLAGS "-pthread -Wall -Werror")
+set(CMAKE_C_FLAGS "-pthread -Wall -Werror -std=gnu99")
set(CATCH_UNDEFINED "-Wl,--no-undefined")
##
diff --git a/qpid/extras/dispatch/include/qpid/dispatch.h b/qpid/extras/dispatch/include/qpid/dispatch.h
index ce52d0cdbe..7b306631f3 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch.h
@@ -46,9 +46,14 @@ typedef struct dx_dispatch_t dx_dispatch_t;
* \brief Initialize the Dispatch library and prepare it for operation.
*
* @param thread_count The number of worker threads (1 or more) that the server shall create
+ * @param container_name The name of the container. If NULL, a UUID will be generated.
+ * @param router_area The name of the router's area. If NULL, a default value will be supplied.
+ * @param router_id The identifying name of the router. If NULL, it will be set the same as the
+ * container_name.
* @return A handle to be used in API calls for this instance.
*/
-dx_dispatch_t *dx_dispatch(int thread_count);
+dx_dispatch_t *dx_dispatch(int thread_count, const char *container_name,
+ const char *router_area, const char *router_id);
/**
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/agent.h b/qpid/extras/dispatch/include/qpid/dispatch/agent.h
index 77863b184a..3b4eea7be2 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/agent.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/agent.h
@@ -38,7 +38,7 @@ typedef struct dx_agent_class_t dx_agent_class_t;
*
* @param context The handler context supplied in dx_agent_register.
*/
-typedef void (*dx_agent_schema_cb_t)(void* context);
+typedef void (*dx_agent_schema_cb_t)(void* context, const void *correlator);
/**
@@ -71,19 +71,19 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx,
/**
*
*/
-void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value);
-void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value);
-void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key);
-void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value);
-void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len);
-void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value);
-void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value);
+void dx_agent_value_string(const void *correlator, const char *key, const char *value);
+void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value);
+void dx_agent_value_null(const void *correlator, const char *key);
+void dx_agent_value_boolean(const void *correlator, const char *key, bool value);
+void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len);
+void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value);
+void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value);
/**
*
*/
-void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more);
+void dx_agent_value_complete(const void *correlator, bool more);
/**
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/error.h b/qpid/extras/dispatch/include/qpid/dispatch/error.h
new file mode 100644
index 0000000000..a62f80fde4
--- /dev/null
+++ b/qpid/extras/dispatch/include/qpid/dispatch/error.h
@@ -0,0 +1,29 @@
+#ifndef __dispatch_error_h__
+#define __dispatch_error_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef enum {
+ DX_ERROR_NONE = 0,
+ DX_ERROR_NOT_FOUND,
+ DX_ERROR_ALREADY_EXISTS,
+ DX_ERROR_ALLOC
+} dx_error_t;
+
+#endif
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/hash.h b/qpid/extras/dispatch/include/qpid/dispatch/hash.h
index 7f4a4bb950..bfad142517 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/hash.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/hash.h
@@ -21,6 +21,7 @@
#include <stdlib.h>
#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/error.h>
typedef struct hash_t hash_t;
@@ -28,10 +29,10 @@ hash_t *hash(int bucket_exponent, int batch_size, int value_is_const);
void hash_free(hash_t *h);
size_t hash_size(hash_t *h);
-int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val);
-int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val);
-int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val);
-int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val);
-int hash_remove(hash_t *h, dx_field_iterator_t *key);
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val);
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val);
+dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val);
+dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val);
+dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key);
#endif
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h
index 9844286483..f47bbf5bad 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h
@@ -54,14 +54,42 @@ typedef struct dx_field_iterator_t dx_field_iterator_t;
* ^^^^^^^^^^^^^
* node-id/node/specific
* ^^^^^^^^^^^^^
+ *
+ * ITER_VIEW_ADDRESS_HASH - Isolate the hashable part of the address depending on address syntax
+ *
+ * amqp:/_local/<local>
+ * L^^^^^^^
+ * amqp:/_topo/<area>/<router>/<local>
+ * A^^^^^^
+ * amqp:/_topo/<my-area>/<router>/<local>
+ * R^^^^^^^^
+ * amqp:/_topo/<my_area>/<my-router>/<local>
+ * L^^^^^^^
+ * amqp:/_topo/<area>/all/<local>
+ * A^^^^^^
+ * amqp:/_topo/<my-area>/all/<local>
+ * L^^^^^^^
+ * amqp:/_topo/all/all/<local>
+ * L^^^^^^^
+ * amqp:/<mobile>
+ * M^^^^^^^^
+ *
*/
typedef enum {
ITER_VIEW_ALL,
ITER_VIEW_NO_HOST,
ITER_VIEW_NODE_ID,
- ITER_VIEW_NODE_SPECIFIC
+ ITER_VIEW_NODE_SPECIFIC,
+ ITER_VIEW_ADDRESS_HASH
} dx_iterator_view_t;
+
+/**
+ * Set the area and router names for the local router. These are used to match
+ * my-area and my-router in address fields.
+ */
+void dx_field_iterator_set_address(const char *area, const char *router);
+
/**
* Create an iterator from a null-terminated string.
*
@@ -87,8 +115,10 @@ void dx_field_iterator_free(dx_field_iterator_t *iter);
/**
* Reset the iterator to the first octet and set a new view
*/
-void dx_field_iterator_reset(dx_field_iterator_t *iter,
- dx_iterator_view_t view);
+void dx_field_iterator_reset(dx_field_iterator_t *iter);
+
+void dx_field_iterator_reset_view(dx_field_iterator_t *iter,
+ dx_iterator_view_t view);
/**
* Return the current octet in the iterator's view and step to the next.
@@ -103,7 +133,15 @@ int dx_field_iterator_end(dx_field_iterator_t *iter);
/**
* Compare an input string to the iterator's view. Return true iff they are equal.
*/
-int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string);
+int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string);
+
+/**
+ * Return true iff the string matches the characters at the current location in the view.
+ * This function ignores octets beyond the length of the prefix.
+ * This function does not alter the position of the iterator if the prefix does not match,
+ * if it matches, the prefix is consumed.
+ */
+int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix);
/**
* Return a copy of the iterator's view.
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/log.h b/qpid/extras/dispatch/include/qpid/dispatch/log.h
index cbea50f266..ad659ad493 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/log.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/log.h
@@ -24,7 +24,8 @@
#define LOG_ERROR 0x00000002
#define LOG_INFO 0x00000004
-void dx_log(const char *module, int cls, const char *fmt, ...);
+void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...);
+#define dx_log(m, c, f, ...) dx_log_impl(m, c, __FILE__, __LINE__, f , ##__VA_ARGS__)
void dx_log_set_mask(int mask);
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/message.h b/qpid/extras/dispatch/include/qpid/dispatch/message.h
index 4c1b1920b5..8311f846d6 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/message.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/message.h
@@ -156,7 +156,7 @@ void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value);
void dx_message_insert_uint(dx_message_t *msg, uint32_t value);
void dx_message_insert_ulong(dx_message_t *msg, uint64_t value);
void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len);
-void dx_message_insert_string(dx_message_t *msg, const char *start);
+void dx_message_insert_string(dx_message_t *msg, const char *str);
void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value);
void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len);
void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value);
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h
index 5ae2c8e846..58eb026775 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/router.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h
@@ -19,6 +19,28 @@
* under the License.
*/
-// TODO - Add router message-passing methods
+#include <stdbool.h>
+
+typedef struct dx_dispatch_t dx_dispatch_t;
+typedef struct dx_message_t dx_message_t;
+typedef struct dx_address_t dx_address_t;
+
+
+typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg);
+
+
+dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
+ bool is_local,
+ const char *address,
+ dx_router_message_cb handler,
+ void *context);
+
+void dx_router_unregister_address(dx_address_t *address);
+
+
+void dx_router_send(dx_dispatch_t *dx,
+ const char *address,
+ dx_message_t *msg);
+
#endif
diff --git a/qpid/extras/dispatch/router/src/main.c b/qpid/extras/dispatch/router/src/main.c
index fb78d9dca0..5fb194980b 100644
--- a/qpid/extras/dispatch/router/src/main.c
+++ b/qpid/extras/dispatch/router/src/main.c
@@ -26,17 +26,29 @@
static int exit_with_sigint = 0;
static dx_dispatch_t *dispatch;
+/**
+ * The thread_start_handler is invoked once for each server thread at thread startup.
+ */
static void thread_start_handler(void* context, int thread_id)
{
}
+/**
+ * This is the OS signal handler, invoked on an undetermined thread at a completely
+ * arbitrary point of time. It is not safe to do anything here but signal the dispatch
+ * server with the signal number.
+ */
static void signal_handler(int signum)
{
dx_server_signal(dispatch, signum);
}
+/**
+ * This signal handler is called cleanly by one of the server's worker threads in
+ * response to an earlier call to dx_server_signal.
+ */
static void server_signal_handler(void* context, int signum)
{
dx_server_pause(dispatch);
@@ -94,7 +106,7 @@ int main(int argc, char **argv)
{
dx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR);
- dispatch = dx_dispatch(4);
+ dispatch = dx_dispatch(4, "Qpid.Dispatch", "area", "Router.A");
dx_server_set_signal_handler(dispatch, server_signal_handler, 0);
dx_server_set_start_handler(dispatch, thread_start_handler, 0);
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 864d58fadc..aca9ab3560 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -18,15 +18,18 @@
*/
#include "dispatch_private.h"
+#include <qpid/dispatch/error.h>
#include <qpid/dispatch/agent.h>
+#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/message.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
+#include <qpid/dispatch/router.h>
#include <string.h>
-
+#include <stdio.h>
typedef struct dx_agent_t {
dx_server_t *server;
@@ -35,6 +38,7 @@ typedef struct dx_agent_t {
dx_message_list_t out_fifo;
sys_mutex_t *lock;
dx_timer_t *timer;
+ dx_address_t *address;
} dx_agent_t;
@@ -46,12 +50,30 @@ struct dx_agent_class_t {
};
+typedef struct {
+ dx_agent_t *agent;
+ dx_message_t *response_msg;
+} dx_agent_request_t;
+
+ALLOC_DECLARE(dx_agent_request_t);
+ALLOC_DEFINE(dx_agent_request_t);
+
+
static void dx_agent_timer_handler(void *context)
{
// TODO - Process the in_fifo here
}
+static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+{
+ dx_agent_t *agent = (dx_agent_t*) context;
+ DEQ_INSERT_TAIL(agent->in_fifo, msg);
+ dx_timer_schedule(agent->timer, 0);
+ printf("dx_agent_rx_handler - inbound message\n");
+}
+
+
dx_agent_t *dx_agent(dx_dispatch_t *dx)
{
dx_agent_t *agent = NEW(dx_agent_t);
@@ -59,8 +81,9 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
agent->class_hash = hash(6, 10, 1);
DEQ_INIT(agent->in_fifo);
DEQ_INIT(agent->out_fifo);
- agent->lock = sys_mutex();
- agent->timer = dx_timer(dx, dx_agent_timer_handler, agent);
+ agent->lock = sys_mutex();
+ agent->timer = dx_timer(dx, dx_agent_timer_handler, agent);
+ agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
return agent;
}
@@ -68,6 +91,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
void dx_agent_free(dx_agent_t *agent)
{
+ dx_router_unregister_address(agent->address);
sys_mutex_free(agent->lock);
dx_timer_free(agent->timer);
hash_free(agent->class_hash);
@@ -110,42 +134,42 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx,
}
-void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value)
+void dx_agent_value_string(const void *correlator, const char *key, const char *value)
{
}
-void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value)
+void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value)
{
}
-void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key)
+void dx_agent_value_null(const void *correlator, const char *key)
{
}
-void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value)
+void dx_agent_value_boolean(const void *correlator, const char *key, bool value)
{
}
-void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len)
+void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len)
{
}
-void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value)
+void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value)
{
}
-void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value)
+void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value)
{
}
-void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more)
+void dx_agent_value_complete(const void *correlator, bool more)
{
}
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 0b31ab8ab5..e65d0c4b63 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -30,6 +30,7 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/agent.h>
static char *module="CONTAINER";
@@ -63,34 +64,44 @@ typedef struct dxc_node_type_t {
} dxc_node_type_t;
DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t);
+static int DX_CONTAINER_CLASS_CONTAINER = 1;
+static int DX_CONTAINER_CLASS_NODE_TYPE = 2;
+static int DX_CONTAINER_CLASS_NODE = 3;
+
+typedef struct container_class_t {
+ dx_container_t *container;
+ int class_id;
+} container_class_t;
struct dx_container_t {
+ dx_dispatch_t *dx;
dx_server_t *server;
hash_t *node_type_map;
hash_t *node_map;
sys_mutex_t *lock;
dx_node_t *default_node;
dxc_node_type_list_t node_type_list;
+ dx_agent_class_t *class_container;
+ dx_agent_class_t *class_node_type;
+ dx_agent_class_t *class_node;
};
static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link)
{
sys_mutex_lock(container->lock);
- dx_node_t *node;
- int result;
+ dx_node_t *node = 0;
const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
dx_field_iterator_t *iter;
// TODO - Extract the name from the structured source
if (source) {
iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
- result = hash_retrieve(container->node_map, iter, (void*) &node);
+ hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
- } else
- result = -1;
+ }
sys_mutex_unlock(container->lock);
- if (result < 0) {
+ if (node == 0) {
if (container->default_node)
node = container->default_node;
else {
@@ -119,21 +130,19 @@ static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link)
static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link)
{
sys_mutex_lock(container->lock);
- dx_node_t *node;
- int result;
+ dx_node_t *node = 0;
const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
dx_field_iterator_t *iter;
// TODO - Extract the name from the structured target
if (target) {
iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
- result = hash_retrieve(container->node_map, iter, (void*) &node);
+ hash_retrieve(container->node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
- } else
- result = -1;
+ }
sys_mutex_unlock(container->lock);
- if (result < 0) {
+ if (node == 0) {
if (container->default_node)
node = container->default_node;
else {
@@ -404,10 +413,49 @@ static int handler(void *handler_context, void *conn_context, dx_conn_event_t ev
}
+static void container_schema_handler(void *context, const void *correlator)
+{
+}
+
+
+static void container_query_handler(void* context, const char *id, const void *correlator)
+{
+ container_class_t *cls = (container_class_t*) context;
+
+ if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) {
+ dx_agent_value_uint(correlator, "node_type_count", hash_size(cls->container->node_type_map));
+ dx_agent_value_uint(correlator, "node_count", hash_size(cls->container->node_map));
+ if (cls->container->default_node)
+ dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name);
+ else
+ dx_agent_value_null(correlator, "default_node_type");
+ dx_agent_value_complete(correlator, false);
+
+ } else if (cls->class_id == DX_CONTAINER_CLASS_NODE_TYPE) {
+
+ } else if (cls->class_id == DX_CONTAINER_CLASS_NODE) {
+
+ }
+}
+
+
+dx_agent_class_t *setup_class(dx_container_t *container, const char *fqname, int id)
+{
+ container_class_t *cls = NEW(container_class_t);
+ cls->container = container;
+ cls->class_id = id;
+
+ return dx_agent_register_class(container->dx, fqname, cls,
+ container_schema_handler,
+ container_query_handler);
+}
+
+
dx_container_t *dx_container(dx_dispatch_t *dx)
{
dx_container_t *container = NEW(dx_container_t);
+ container->dx = dx;
container->server = dx->server;
container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
@@ -422,6 +470,17 @@ dx_container_t *dx_container(dx_dispatch_t *dx)
}
+void dx_container_setup_agent(dx_dispatch_t *dx)
+{
+ dx->container->class_container =
+ setup_class(dx->container, "org.apache.qpid.dispatch.container", DX_CONTAINER_CLASS_CONTAINER);
+ dx->container->class_node_type =
+ setup_class(dx->container, "org.apache.qpid.dispatch.container.node_type", DX_CONTAINER_CLASS_NODE_TYPE);
+ dx->container->class_node =
+ setup_class(dx->container, "org.apache.qpid.dispatch.container.node", DX_CONTAINER_CLASS_NODE);
+}
+
+
void dx_container_free(dx_container_t *container)
{
// TODO - Free the nodes
diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c
index 87eb535b8a..47a1a07330 100644
--- a/qpid/extras/dispatch/src/dispatch.c
+++ b/qpid/extras/dispatch/src/dispatch.c
@@ -24,27 +24,44 @@
/**
* Private Function Prototypes
*/
-dx_server_t *dx_server(int tc);
+dx_server_t *dx_server(int tc, const char *container_name);
+void dx_server_setup_agent(dx_dispatch_t *dx);
void dx_server_free(dx_server_t *server);
dx_container_t *dx_container(dx_dispatch_t *dx);
+void dx_container_setup_agent(dx_dispatch_t *dx);
void dx_container_free(dx_container_t *container);
-dx_router_t *dx_router(dx_dispatch_t *dx);
+dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id);
+void dx_router_setup_agent(dx_dispatch_t *dx);
void dx_router_free(dx_router_t *router);
dx_agent_t *dx_agent(dx_dispatch_t *dx);
void dx_agent_free(dx_agent_t *agent);
-dx_dispatch_t *dx_dispatch(int thread_count)
+dx_dispatch_t *dx_dispatch(int thread_count, const char *container_name,
+ const char *router_area, const char *router_id)
{
dx_dispatch_t *dx = NEW(dx_dispatch_t);
dx_alloc_initialize();
- dx->server = dx_server(thread_count);
+ if (!container_name)
+ container_name = "00000000-0000-0000-0000-000000000000"; // TODO - gen a real uuid
+
+ if (!router_area)
+ router_area = "area";
+
+ if (!router_id)
+ router_id = container_name;
+
+ dx->server = dx_server(thread_count, container_name);
dx->container = dx_container(dx);
- dx->router = dx_router(dx);
+ dx->router = dx_router(dx, router_area, router_id);
dx->agent = dx_agent(dx);
+ dx_server_setup_agent(dx);
+ dx_container_setup_agent(dx);
+ dx_router_setup_agent(dx);
+
return dx;
}
diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c
index c54d5d6fcf..19744366aa 100644
--- a/qpid/extras/dispatch/src/hash.c
+++ b/qpid/extras/dispatch/src/hash.c
@@ -58,6 +58,7 @@ static unsigned long hash_function(dx_field_iterator_t *iter)
unsigned long hash = 5381;
int c;
+ dx_field_iterator_reset(iter);
while (!dx_field_iterator_end(iter)) {
c = (int) dx_field_iterator_octet(iter);
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
@@ -101,13 +102,11 @@ size_t hash_size(hash_t *h)
}
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error)
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
- *error = 0;
-
while (item) {
if (dx_field_iterator_equal(key, item->key))
break;
@@ -115,40 +114,44 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
}
if (item) {
- *error = -1;
- return 0;
+ *exists = 1;
+ return item;
}
item = new_hash_item_t();
- if (!item) {
- *error = -2;
+ if (!item)
return 0;
- }
DEQ_ITEM_INIT(item);
item->key = dx_field_iterator_copy(key);
DEQ_INSERT_TAIL(h->buckets[idx].items, item);
h->size++;
+ *exists = 0;
return item;
}
-int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
{
- int error = 0;
- hash_item_t *item = hash_internal_insert(h, key, &error);
+ int exists = 0;
+ hash_item_t *item = hash_internal_insert(h, key, &exists);
- if (item)
- item->v.val = val;
- return error;
+ if (!item)
+ return DX_ERROR_ALLOC;
+
+ if (exists)
+ return DX_ERROR_ALREADY_EXISTS;
+
+ item->v.val = val;
+
+ return DX_ERROR_NONE;
}
-int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
{
- if (!h->is_const)
- return -3;
+ assert(h->is_const);
int error = 0;
hash_item_t *item = hash_internal_insert(h, key, &error);
@@ -174,32 +177,33 @@ static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key)
}
-int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
+dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
{
hash_item_t *item = hash_internal_retrieve(h, key);
- if (item) {
+ if (item)
*val = item->v.val;
- return 0;
- }
- return -1;
+ else
+ *val = 0;
+
+ return DX_ERROR_NONE;
}
-int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
+dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
{
- if (!h->is_const)
- return -3;
+ assert(h->is_const);
hash_item_t *item = hash_internal_retrieve(h, key);
- if (item) {
+ if (item)
*val = item->v.val_const;
- return 0;
- }
- return -1;
+ else
+ *val = 0;
+
+ return DX_ERROR_NONE;
}
-int hash_remove(hash_t *h, dx_field_iterator_t *key)
+dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
@@ -215,9 +219,9 @@ int hash_remove(hash_t *h, dx_field_iterator_t *key)
DEQ_REMOVE(h->buckets[idx].items, item);
free_hash_item_t(item);
h->size--;
- return 0;
+ return DX_ERROR_NONE;
}
- return -1;
+ return DX_ERROR_NOT_FOUND;
}
diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c
index 6ab67f948d..92a7a1f479 100644
--- a/qpid/extras/dispatch/src/iterator.c
+++ b/qpid/extras/dispatch/src/iterator.c
@@ -25,19 +25,25 @@
#include <string.h>
typedef enum {
-MODE_TO_END,
-MODE_TO_SLASH
+ MODE_TO_END,
+ MODE_TO_SLASH
} parse_mode_t;
+typedef struct {
+ dx_buffer_t *buffer;
+ unsigned char *cursor;
+ int length;
+} pointer_t;
+
struct dx_field_iterator_t {
- dx_buffer_t *start_buffer;
- unsigned char *start_cursor;
- int start_length;
- dx_buffer_t *buffer;
- unsigned char *cursor;
- int length;
+ pointer_t start_pointer;
+ pointer_t view_start_pointer;
+ pointer_t pointer;
dx_iterator_view_t view;
parse_mode_t mode;
+ unsigned char prefix;
+ int at_prefix;
+ int view_prefix;
};
@@ -46,28 +52,86 @@ ALLOC_DEFINE(dx_field_iterator_t);
typedef enum {
-STATE_START,
-STATE_SLASH_LEFT,
-STATE_SKIPPING_TO_NEXT_SLASH,
-STATE_SCANNING,
-STATE_COLON,
-STATE_COLON_SLASH,
-STATE_AT_NODE_ID
+ STATE_START,
+ STATE_SLASH_LEFT,
+ STATE_SKIPPING_TO_NEXT_SLASH,
+ STATE_SCANNING,
+ STATE_COLON,
+ STATE_COLON_SLASH,
+ STATE_AT_NODE_ID
} state_t;
+static char *my_area = "";
+static char *my_router = "";
+
+
+static void parse_address_view(dx_field_iterator_t *iter)
+{
+ //
+ // This function starts with an iterator view that is identical to
+ // ITER_VIEW_NO_HOST. We will now further refine the view in order
+ // to aid the router in looking up addresses.
+ //
+
+ if (dx_field_iterator_prefix(iter, "_")) {
+ if (dx_field_iterator_prefix(iter, "local/")) {
+ iter->prefix = 'L';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ return;
+ }
+
+ if (dx_field_iterator_prefix(iter, "topo/")) {
+ if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_area)) {
+ if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_router)) {
+ iter->prefix = 'L';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ return;
+ }
+
+ iter->prefix = 'R';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+
+ iter->prefix = 'A';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+ iter->mode = MODE_TO_SLASH;
+ return;
+ }
+ }
+
+ iter->prefix = 'M';
+ iter->at_prefix = 1;
+ iter->view_prefix = 1;
+}
+
+
static void view_initialize(dx_field_iterator_t *iter)
{
- if (iter->view == ITER_VIEW_ALL) {
- iter->mode = MODE_TO_END;
+ //
+ // The default behavior is for the view to *not* have a prefix.
+ // We'll add one if it's needed later.
+ //
+ iter->at_prefix = 0;
+ iter->view_prefix = 0;
+ iter->mode = MODE_TO_END;
+
+ if (iter->view == ITER_VIEW_ALL)
return;
- }
//
// Advance to the node-id.
//
- state_t state = STATE_START;
- unsigned int octet;
+ state_t state = STATE_START;
+ unsigned int octet;
+ pointer_t save_pointer = {0,0,0};
+
while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) {
octet = dx_field_iterator_octet(iter);
switch (state) {
@@ -96,17 +160,20 @@ static void view_initialize(dx_field_iterator_t *iter)
break;
case STATE_COLON :
- if (octet == '/')
+ if (octet == '/') {
state = STATE_COLON_SLASH;
- else
+ save_pointer = iter->pointer;
+ } else
state = STATE_SCANNING;
break;
case STATE_COLON_SLASH :
if (octet == '/')
state = STATE_SKIPPING_TO_NEXT_SLASH;
- else
- state = STATE_SCANNING;
+ else {
+ state = STATE_AT_NODE_ID;
+ iter->pointer = save_pointer;
+ }
break;
case STATE_AT_NODE_ID :
@@ -119,9 +186,7 @@ static void view_initialize(dx_field_iterator_t *iter)
// The address string was relative, not absolute. The node-id
// is at the beginning of the string.
//
- iter->buffer = iter->start_buffer;
- iter->cursor = iter->start_cursor;
- iter->length = iter->start_length;
+ iter->pointer = iter->start_pointer;
}
//
@@ -137,6 +202,12 @@ static void view_initialize(dx_field_iterator_t *iter)
return;
}
+ if (iter->view == ITER_VIEW_ADDRESS_HASH) {
+ iter->mode = MODE_TO_END;
+ parse_address_view(iter);
+ return;
+ }
+
if (iter->view == ITER_VIEW_NODE_SPECIFIC) {
iter->mode = MODE_TO_END;
while (!dx_field_iterator_end(iter)) {
@@ -149,17 +220,29 @@ static void view_initialize(dx_field_iterator_t *iter)
}
+void dx_field_iterator_set_address(const char *area, const char *router)
+{
+ my_area = (char*) malloc(strlen(area) + 2);
+ strcpy(my_area, area);
+ strcat(my_area, "/");
+
+ my_router = (char*) malloc(strlen(router) + 2);
+ strcpy(my_router, router);
+ strcat(my_router, "/");
+}
+
+
dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view)
{
dx_field_iterator_t *iter = new_dx_field_iterator_t();
if (!iter)
return 0;
- iter->start_buffer = 0;
- iter->start_cursor = (unsigned char*) text;
- iter->start_length = strlen(text);
+ iter->start_pointer.buffer = 0;
+ iter->start_pointer.cursor = (unsigned char*) text;
+ iter->start_pointer.length = strlen(text);
- dx_field_iterator_reset(iter, view);
+ dx_field_iterator_reset_view(iter, view);
return iter;
}
@@ -171,11 +254,11 @@ dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, i
if (!iter)
return 0;
- iter->start_buffer = buffer;
- iter->start_cursor = dx_buffer_base(buffer) + offset;
- iter->start_length = length;
+ iter->start_pointer.buffer = buffer;
+ iter->start_pointer.cursor = dx_buffer_base(buffer) + offset;
+ iter->start_pointer.length = length;
- dx_field_iterator_reset(iter, view);
+ dx_field_iterator_reset_view(iter, view);
return iter;
}
@@ -187,40 +270,52 @@ void dx_field_iterator_free(dx_field_iterator_t *iter)
}
-void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view)
+void dx_field_iterator_reset(dx_field_iterator_t *iter)
+{
+ iter->pointer = iter->view_start_pointer;
+ iter->at_prefix = iter->view_prefix;
+}
+
+
+void dx_field_iterator_reset_view(dx_field_iterator_t *iter, dx_iterator_view_t view)
{
- iter->buffer = iter->start_buffer;
- iter->cursor = iter->start_cursor;
- iter->length = iter->start_length;
- iter->view = view;
+ iter->pointer = iter->start_pointer;
+ iter->view = view;
view_initialize(iter);
+
+ iter->view_start_pointer = iter->pointer;
}
unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter)
{
- if (iter->length == 0)
+ if (iter->at_prefix) {
+ iter->at_prefix = 0;
+ return iter->prefix;
+ }
+
+ if (iter->pointer.length == 0)
return (unsigned char) 0;
- unsigned char result = *(iter->cursor);
+ unsigned char result = *(iter->pointer.cursor);
- iter->cursor++;
- iter->length--;
+ iter->pointer.cursor++;
+ iter->pointer.length--;
- if (iter->length > 0) {
- if (iter->buffer) {
- if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) {
- iter->buffer = iter->buffer->next;
- if (iter->buffer == 0)
- iter->length = 0;
- iter->cursor = dx_buffer_base(iter->buffer);
+ if (iter->pointer.length > 0) {
+ if (iter->pointer.buffer) {
+ if (iter->pointer.cursor - dx_buffer_base(iter->pointer.buffer) == dx_buffer_size(iter->pointer.buffer)) {
+ iter->pointer.buffer = iter->pointer.buffer->next;
+ if (iter->pointer.buffer == 0)
+ iter->pointer.length = 0;
+ iter->pointer.cursor = dx_buffer_base(iter->pointer.buffer);
}
}
}
- if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/')
- iter->length = 0;
+ if (iter->pointer.length && iter->mode == MODE_TO_SLASH && *(iter->pointer.cursor) == '/')
+ iter->pointer.length = 0;
return result;
}
@@ -228,13 +323,13 @@ unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter)
int dx_field_iterator_end(dx_field_iterator_t *iter)
{
- return iter->length == 0;
+ return iter->pointer.length == 0;
}
-int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string)
+int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string)
{
- dx_field_iterator_reset(iter, iter->view);
+ dx_field_iterator_reset(iter);
while (!dx_field_iterator_end(iter) && *string) {
if (*string != dx_field_iterator_octet(iter))
return 0;
@@ -245,19 +340,39 @@ int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string)
}
+int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix)
+{
+ pointer_t save_pointer = iter->pointer;
+ unsigned char *c = (unsigned char*) prefix;
+
+ while(*c) {
+ if (*c != dx_field_iterator_octet(iter))
+ break;
+ c++;
+ }
+
+ if (*c) {
+ iter->pointer = save_pointer;
+ return 0;
+ }
+
+ return 1;
+}
+
+
unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter)
{
int length = 0;
int idx = 0;
unsigned char *copy;
- dx_field_iterator_reset(iter, iter->view);
+ dx_field_iterator_reset(iter);
while (!dx_field_iterator_end(iter)) {
dx_field_iterator_octet(iter);
length++;
}
- dx_field_iterator_reset(iter, iter->view);
+ dx_field_iterator_reset(iter);
copy = (unsigned char*) malloc(length + 1);
while (!dx_field_iterator_end(iter))
copy[idx++] = dx_field_iterator_octet(iter);
diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c
index d4ec534915..c6cffe0321 100644
--- a/qpid/extras/dispatch/src/log.c
+++ b/qpid/extras/dispatch/src/log.c
@@ -18,11 +18,36 @@
*/
#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/alloc.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
+#include <sys/time.h>
-static int mask=LOG_INFO;
+#define TEXT_MAX 512
+#define LIST_MAX 1000
+
+typedef struct dx_log_entry_t dx_log_entry_t;
+
+struct dx_log_entry_t {
+ DEQ_LINKS(dx_log_entry_t);
+ const char *module;
+ int cls;
+ const char *file;
+ int line;
+ struct timeval tv;
+ char text[TEXT_MAX];
+};
+
+ALLOC_DECLARE(dx_log_entry_t);
+ALLOC_DEFINE(dx_log_entry_t);
+
+DEQ_DECLARE(dx_log_entry_t, dx_log_list_t);
+
+static int mask = LOG_INFO;
+static dx_log_list_t entries;
+static int list_init = 0;
static char *cls_prefix(int cls)
{
@@ -35,18 +60,36 @@ static char *cls_prefix(int cls)
return "";
}
-void dx_log(const char *module, int cls, const char *fmt, ...)
+void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...)
{
if (!(cls & mask))
return;
+ if (list_init == 0) {
+ list_init = 1;
+ DEQ_INIT(entries);
+ }
+
+ dx_log_entry_t *entry = new_dx_log_entry_t();
+ entry->module = module;
+ entry->cls = cls;
+ entry->file = file;
+ entry->line = line;
+ gettimeofday(&entry->tv, 0);
+
va_list ap;
- char line[128];
va_start(ap, fmt);
- vsnprintf(line, 127, fmt, ap);
+ vsnprintf(entry->text, TEXT_MAX, fmt, ap);
va_end(ap);
- fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line);
+ fprintf(stderr, "%s (%s) %s\n", module, cls_prefix(cls), entry->text);
+
+ DEQ_INSERT_TAIL(entries, entry);
+ if (DEQ_SIZE(entries) > LIST_MAX) {
+ entry = DEQ_HEAD(entries);
+ DEQ_REMOVE_HEAD(entries);
+ free_dx_log_entry_t(entry);
+ }
}
void dx_log_set_mask(int _mask)
diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c
index c914c5ca7b..fd08753f4d 100644
--- a/qpid/extras/dispatch/src/message.c
+++ b/qpid/extras/dispatch/src/message.c
@@ -1081,19 +1081,19 @@ void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t le
}
-void dx_message_insert_string(dx_message_t *msg, const char *start)
+void dx_message_insert_string(dx_message_t *msg, const char *str)
{
dx_message_content_t *content = MSG_CONTENT(msg);
- uint32_t len = strlen(start);
+ uint32_t len = strlen(str);
if (len < 256) {
dx_insert_8(content, 0xa1); // str8-utf8
dx_insert_8(content, (uint8_t) len);
- dx_insert(content, (const uint8_t*) start, len);
+ dx_insert(content, (const uint8_t*) str, len);
} else {
dx_insert_8(content, 0xb1); // str32-utf8
dx_insert_32(content, len);
- dx_insert(content, (const uint8_t*) start, len);
+ dx_insert(content, (const uint8_t*) str, len);
}
content->count++;
}
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 0513b08a6b..65756be215 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -18,13 +18,34 @@
*/
#include <stdio.h>
+#include <string.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
-static char *module="ROUTER_NODE";
+static char *module = "ROUTER";
+
+//static char *local_prefix = "_local/";
+//static char *topo_prefix = "_topo/";
+
+/**
+ * Address Types and Processing:
+ *
+ * Address Hash Compare onReceive onEmit
+ * =============================================================================
+ * _local/<local> L<local> handler forward
+ * _topo/<area>/<router>/<local> A<area> forward forward
+ * _topo/<my-area>/<router>/<local> R<router> forward forward
+ * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward
+ * _topo/<area>/all/<local> A<area> forward forward
+ * _topo/<my-area>/all/<local> L<local> forward+handler forward
+ * _topo/all/all/<local> L<local> forward+handler forward
+ * <mobile> M<mobile> forward+handler forward
+ */
struct dx_router_t {
dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
dx_node_t *node;
dx_link_list_t in_links;
dx_link_list_t out_links;
@@ -41,11 +62,32 @@ typedef struct {
dx_message_list_t out_fifo;
} dx_router_link_t;
-
ALLOC_DECLARE(dx_router_link_t);
ALLOC_DEFINE(dx_router_link_t);
+typedef struct {
+ const char *id;
+ dx_router_link_t *next_hop;
+ // list of valid origins (pointers to router_node) - (bit masks?)
+} dx_router_node_t;
+
+ALLOC_DECLARE(dx_router_node_t);
+ALLOC_DEFINE(dx_router_node_t);
+
+
+struct dx_address_t {
+ int is_local;
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context;
+ dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list
+ dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list
+};
+
+ALLOC_DECLARE(dx_address_t);
+ALLOC_DEFINE(dx_address_t);
+
+
/**
* Outbound Delivery Handler
*/
@@ -119,22 +161,42 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
if (valid_message) {
dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
- dx_router_link_t *rlink;
+ dx_address_t *addr;
if (iter) {
- dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
+ hash_retrieve(router->out_hash, iter, (void*) &addr);
dx_field_iterator_free(iter);
- if (result == 0) {
+ if (addr) {
+ //
+ // To field is valid and contains a known destination. Handle the various
+ // cases for forwarding.
+ //
+ // Forward to the in-process handler for this message if there is one.
+ //
+ if (addr->handler)
+ addr->handler(addr->handler_context, msg);
+
//
- // To field is valid and contains a known destination. Enqueue on
- // the output fifo for the next-hop-to-destination.
+ // Forward to the local link for the locally-connected consumer, if present.
+ // TODO - Don't forward if this is a "_local" address.
//
- pn_link_t* pn_outlink = dx_link_pn(rlink->link);
- DEQ_INSERT_TAIL(rlink->out_fifo, msg);
- pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo));
- dx_link_activate(rlink->link);
+ if (addr->rlink) {
+ pn_link_t* pn_outlink = dx_link_pn(addr->rlink->link);
+ DEQ_INSERT_TAIL(addr->rlink->out_fifo, msg);
+ pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
+ dx_link_activate(addr->rlink->link);
+ }
+
+ //
+ // Forward to the next-hop for a remotely-connected consumer, if present.
+ // Don't forward if this is a "_local" address.
+ //
+ if (addr->rnode) {
+ // TODO
+ }
+
} else {
//
// To field contains an unknown address. Release the message.
@@ -242,17 +304,30 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
pn_link_t *pn_link = dx_link_pn(link);
const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- sys_mutex_lock(router->lock);
dx_router_link_t *rlink = new_dx_router_link_t();
rlink->link = link;
DEQ_INIT(rlink->out_fifo);
dx_link_set_context(link, rlink);
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- int result = hash_insert(router->out_hash, iter, rlink);
+ dx_address_t *addr;
+
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->is_local = 0;
+ addr->handler = 0;
+ addr->handler_context = 0;
+ addr->rlink = 0;
+ addr->rnode = 0;
+ hash_insert(router->out_hash, iter, addr);
+ }
dx_field_iterator_free(iter);
- if (result == 0) {
+ if (addr->rlink == 0) {
+ addr->rlink = rlink;
pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
pn_link_open(pn_link);
@@ -314,14 +389,14 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed
if (pn_link_is_sender(pn_link)) {
item = DEQ_HEAD(router->out_links);
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- dx_router_link_t *rlink;
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
if (iter) {
- int result = hash_retrieve(router->out_hash, iter, (void*) &rlink);
- if (result == 0) {
- dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (addr) {
hash_remove(router->out_hash, iter);
- free_dx_router_link_t(rlink);
+ free_dx_router_link_t(addr->rlink);
+ free_dx_address_t(addr);
dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
}
dx_field_iterator_free(iter);
@@ -383,7 +458,7 @@ static dx_node_type_t router_node = {"router", 0, 0,
static int type_registered = 0;
-dx_router_t *dx_router(dx_dispatch_t *dx)
+dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
{
if (!type_registered) {
type_registered = 1;
@@ -397,8 +472,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx)
DEQ_INIT(router->out_links);
DEQ_INIT(router->in_fifo);
- router->dx = dx;
- router->lock = sys_mutex();
+ router->dx = dx;
+ router->lock = sys_mutex();
+ router->router_area = area;
+ router->router_id = id;
router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
dx_timer_schedule(router->timer, 0); // Immediate
@@ -406,10 +483,22 @@ dx_router_t *dx_router(dx_dispatch_t *dx)
router->out_hash = hash(10, 32, 0);
router->dtag = 1;
+ //
+ // Inform the field iterator module of this router's id and area. The field iterator
+ // uses this to offload some of the address-processing load from the router.
+ //
+ dx_field_iterator_set_address(area, id);
+
return router;
}
+void dx_router_setup_agent(dx_dispatch_t *dx)
+{
+ // TODO
+}
+
+
void dx_router_free(dx_router_t *router)
{
dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH);
@@ -417,3 +506,54 @@ void dx_router_free(dx_router_t *router)
free(router);
}
+
+dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
+ bool is_local,
+ const char *address,
+ dx_router_message_cb handler,
+ void *context)
+{
+ char addr[1000];
+ dx_address_t *ad = new_dx_address_t();
+ dx_field_iterator_t *iter;
+ int result;
+
+ if (!ad)
+ return 0;
+
+ ad->is_local = is_local;
+ ad->handler = handler;
+ ad->handler_context = context;
+ ad->rlink = 0;
+
+ if (ad->is_local)
+ strcpy(addr, "L"); // Local Hash-Key Space
+ else
+ strcpy(addr, "M"); // Mobile Hash-Key Space
+
+ strcat(addr, address);
+ iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
+ result = hash_insert(dx->router->out_hash, iter, ad);
+ dx_field_iterator_free(iter);
+ if (result != 0) {
+ free_dx_address_t(ad);
+ return 0;
+ }
+
+ dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
+ return ad;
+}
+
+
+void dx_router_unregister_address(dx_address_t *ad)
+{
+ free_dx_address_t(ad);
+}
+
+
+void dx_router_send(dx_dispatch_t *dx,
+ const char *address,
+ dx_message_t *msg)
+{
+}
+
diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c
index a2d2d4980a..536af048d8 100644
--- a/qpid/extras/dispatch/src/server.c
+++ b/qpid/extras/dispatch/src/server.c
@@ -44,6 +44,7 @@ typedef struct dx_thread_t {
struct dx_server_t {
int thread_count;
+ const char *container_name;
pn_driver_t *driver;
dx_thread_start_cb_t start_handler;
dx_conn_handler_cb_t conn_handler;
@@ -201,7 +202,7 @@ static void process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
- pn_connection_set_container(conn, "dispatch"); // TODO - make unique
+ pn_connection_set_container(conn, dx_server->container_name);
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
@@ -557,7 +558,7 @@ static void cxtr_try_open(void *context)
}
-dx_server_t *dx_server(int thread_count)
+dx_server_t *dx_server(int thread_count, const char *container_name)
{
int i;
@@ -566,6 +567,7 @@ dx_server_t *dx_server(int thread_count)
return 0;
dx_server->thread_count = thread_count;
+ dx_server->container_name = container_name;
dx_server->driver = pn_driver();
dx_server->start_handler = 0;
dx_server->conn_handler = 0;
@@ -596,6 +598,12 @@ dx_server_t *dx_server(int thread_count)
}
+void dx_server_setup_agent(dx_dispatch_t *dx)
+{
+ // TODO
+}
+
+
void dx_server_free(dx_server_t *dx_server)
{
int i;
diff --git a/qpid/extras/dispatch/tests/CMakeLists.txt b/qpid/extras/dispatch/tests/CMakeLists.txt
index b92f66939f..362aac08e7 100644
--- a/qpid/extras/dispatch/tests/CMakeLists.txt
+++ b/qpid/extras/dispatch/tests/CMakeLists.txt
@@ -22,8 +22,9 @@
##
set(unit_test_SOURCES
alloc_test.c
+ field_test.c
message_test.c
- run_tests.c
+ run_unit_tests.c
server_test.c
timer_test.c
tool_test.c
@@ -32,7 +33,7 @@ set(unit_test_SOURCES
add_executable(unit_tests ${unit_test_SOURCES})
target_link_libraries(unit_tests qpid-dispatch)
-add_test(unit_tests_buf_512 unit_tests 512)
-add_test(unit_tests_buf_10K unit_tests 10000)
-add_test(unit_tests_buf_10 unit_tests 10)
-add_test(unit_tests_buf_1 unit_tests 1)
+add_test(unit_tests_buf_10000 unit_tests 10000)
+add_test(unit_tests_buf_512 unit_tests 512)
+add_test(unit_tests_buf_10 unit_tests 10)
+add_test(unit_tests_buf_1 unit_tests 1)
diff --git a/qpid/extras/dispatch/tests/field_test.c b/qpid/extras/dispatch/tests/field_test.c
new file mode 100644
index 0000000000..59fc3c4cfb
--- /dev/null
+++ b/qpid/extras/dispatch/tests/field_test.c
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include <qpid/dispatch/iterator.h>
+
+#define FAIL_TEXT_SIZE 10000
+static char fail_text[FAIL_TEXT_SIZE];
+
+static char* test_view_global_dns(void *context)
+{
+ dx_field_iterator_t *iter = dx_field_iterator_string("amqp://host/global/sub", ITER_VIEW_ALL);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "amqp://host/global/sub"))
+ return "ITER_VIEW_ALL failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub"))
+ return "ITER_VIEW_NO_HOST failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_ID);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global"))
+ return "ITER_VIEW_NODE_ID failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_SPECIFIC);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "sub"))
+ return "ITER_VIEW_NODE_SPECIFIC failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+ return "ITER_VIEW_ADDRESS_HASH failed";
+
+ return 0;
+}
+
+
+static char* test_view_global_non_dns(void *context)
+{
+ dx_field_iterator_t *iter = dx_field_iterator_string("amqp:/global/sub", ITER_VIEW_ALL);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "amqp:/global/sub"))
+ return "ITER_VIEW_ALL failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub"))
+ return "ITER_VIEW_NO_HOST failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_ID);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global"))
+ return "ITER_VIEW_NODE_ID failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_SPECIFIC);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "sub"))
+ return "ITER_VIEW_NODE_SPECIFIC failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+ return "ITER_VIEW_ADDRESS_HASH failed";
+
+ return 0;
+}
+
+
+static char* test_view_global_no_host(void *context)
+{
+ dx_field_iterator_t *iter = dx_field_iterator_string("global/sub", ITER_VIEW_ALL);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub"))
+ return "ITER_VIEW_ALL failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub"))
+ return "ITER_VIEW_NO_HOST failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_ID);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "global"))
+ return "ITER_VIEW_NODE_ID failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_SPECIFIC);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "sub"))
+ return "ITER_VIEW_NODE_SPECIFIC failed";
+
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+ return "ITER_VIEW_ADDRESS_HASH failed";
+
+ return 0;
+}
+
+
+static char* test_view_address_hash(void *context)
+{
+ struct {const char *addr; const char *view;} cases[] = {
+ {"amqp:/_local/my-addr/sub", "Lmy-addr/sub"},
+ {"amqp:/_local/my-addr", "Lmy-addr"},
+ {"amqp:/_topo/area/router/local/sub", "Aarea"},
+ {"amqp:/_topo/my-area/router/local/sub", "Rrouter"},
+ {"amqp:/_topo/my-area/my-router/local/sub", "Llocal/sub"},
+ {"amqp:/_topo/area/all/local/sub", "Aarea"},
+ {"amqp:/_topo/my-area/all/local/sub", "Llocal/sub"},
+ {"amqp:/_topo/all/all/local/sub", "Llocal/sub"},
+ {"amqp://host:port/_local/my-addr", "Lmy-addr"},
+ {0, 0}
+ };
+ int idx;
+
+ for (idx = 0; cases[idx].addr; idx++) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH);
+ if (!dx_field_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
+ char *got = (char*) dx_field_iterator_copy(iter);
+ snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed. Expected '%s', got '%s'",
+ cases[idx].addr, cases[idx].view, got);
+ return fail_text;
+ }
+ }
+
+ return 0;
+}
+
+
+int field_tests(void)
+{
+ int result = 0;
+
+ dx_field_iterator_set_address("my-area", "my-router");
+
+ TEST_CASE(test_view_global_dns, 0);
+ TEST_CASE(test_view_global_non_dns, 0);
+ TEST_CASE(test_view_global_no_host, 0);
+ TEST_CASE(test_view_address_hash, 0);
+
+ return result;
+}
+
diff --git a/qpid/extras/dispatch/tests/run_tests.c b/qpid/extras/dispatch/tests/run_unit_tests.c
index 765dad5c66..01a8ae16b3 100644
--- a/qpid/extras/dispatch/tests/run_tests.c
+++ b/qpid/extras/dispatch/tests/run_unit_tests.c
@@ -24,6 +24,7 @@ int timer_tests();
int alloc_tests();
int server_tests();
int message_tests();
+int field_tests();
int main(int argc, char** argv)
{
@@ -43,6 +44,7 @@ int main(int argc, char** argv)
result += alloc_tests();
result += server_tests();
result += message_tests();
+ result += field_tests();
return result;
}
diff --git a/qpid/extras/dispatch/tests/server_test.c b/qpid/extras/dispatch/tests/server_test.c
index 5cacd47bae..4f68ecb421 100644
--- a/qpid/extras/dispatch/tests/server_test.c
+++ b/qpid/extras/dispatch/tests/server_test.c
@@ -116,7 +116,7 @@ static char* test_start_handler(void *context)
{
int i;
- dx = dx_dispatch(THREAD_COUNT);
+ dx = dx_dispatch(THREAD_COUNT, 0, 0, 0);
expected_context = (void*) 0x00112233;
stored_error[0] = 0x0;
@@ -139,7 +139,7 @@ static char* test_start_handler(void *context)
static char *test_server_start(void *context)
{
- dx = dx_dispatch(THREAD_COUNT);
+ dx = dx_dispatch(THREAD_COUNT, 0, 0, 0);
dx_server_start(dx);
dx_server_stop(dx);
dx_dispatch_free(dx);
@@ -153,7 +153,7 @@ static char* test_user_fd(void *context)
int res;
dx_timer_t *timer;
- dx = dx_dispatch(THREAD_COUNT);
+ dx = dx_dispatch(THREAD_COUNT, 0, 0, 0);
dx_server_set_user_fd_handler(dx, ufd_handler);
timer = dx_timer(dx, fd_test_start, 0);
dx_timer_schedule(timer, 0);