diff options
author | Ted Ross <tross@apache.org> | 2013-04-26 16:34:33 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2013-04-26 16:34:33 +0000 |
commit | 64ab8be9c34528ef71ca5c58ff075ed57a48c9e0 (patch) | |
tree | 82c66d38e2d710d780b22c71d5afd6e00297082e | |
parent | cf474845241b0c206710dbd9c87fe2e752c512a0 (diff) | |
download | qpid-python-64ab8be9c34528ef71ca5c58ff075ed57a48c9e0.tar.gz |
NO-JIRA - Development update to Dispatch Router
- Began refactoring of the routing table to support in-process destinations and multi-hop paths.
- Added API for the internal management agent. Began integrating the agent with the router
module for communication.
- Added field parsing to handle topological addresses.
- Added tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1476280 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 856 insertions, 186 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt index e5e94fec3a..f96f62817b 100644 --- a/qpid/extras/dispatch/CMakeLists.txt +++ b/qpid/extras/dispatch/CMakeLists.txt @@ -59,7 +59,7 @@ find_library(pthread_lib pthread) find_library(rt_lib rt) find_path(proton_include proton/driver.h) -set(CMAKE_C_FLAGS "-pthread -Wall -Werror") +set(CMAKE_C_FLAGS "-pthread -Wall -Werror -std=gnu99") set(CATCH_UNDEFINED "-Wl,--no-undefined") ## diff --git a/qpid/extras/dispatch/include/qpid/dispatch.h b/qpid/extras/dispatch/include/qpid/dispatch.h index ce52d0cdbe..7b306631f3 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch.h +++ b/qpid/extras/dispatch/include/qpid/dispatch.h @@ -46,9 +46,14 @@ typedef struct dx_dispatch_t dx_dispatch_t; * \brief Initialize the Dispatch library and prepare it for operation. * * @param thread_count The number of worker threads (1 or more) that the server shall create + * @param container_name The name of the container. If NULL, a UUID will be generated. + * @param router_area The name of the router's area. If NULL, a default value will be supplied. + * @param router_id The identifying name of the router. If NULL, it will be set the same as the + * container_name. * @return A handle to be used in API calls for this instance. */ -dx_dispatch_t *dx_dispatch(int thread_count); +dx_dispatch_t *dx_dispatch(int thread_count, const char *container_name, + const char *router_area, const char *router_id); /** diff --git a/qpid/extras/dispatch/include/qpid/dispatch/agent.h b/qpid/extras/dispatch/include/qpid/dispatch/agent.h index 77863b184a..3b4eea7be2 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/agent.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/agent.h @@ -38,7 +38,7 @@ typedef struct dx_agent_class_t dx_agent_class_t; * * @param context The handler context supplied in dx_agent_register. */ -typedef void (*dx_agent_schema_cb_t)(void* context); +typedef void (*dx_agent_schema_cb_t)(void* context, const void *correlator); /** @@ -71,19 +71,19 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx, /** * */ -void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value); -void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value); -void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key); -void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value); -void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len); -void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value); -void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value); +void dx_agent_value_string(const void *correlator, const char *key, const char *value); +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value); +void dx_agent_value_null(const void *correlator, const char *key); +void dx_agent_value_boolean(const void *correlator, const char *key, bool value); +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len); +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value); +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value); /** * */ -void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more); +void dx_agent_value_complete(const void *correlator, bool more); /** diff --git a/qpid/extras/dispatch/include/qpid/dispatch/error.h b/qpid/extras/dispatch/include/qpid/dispatch/error.h new file mode 100644 index 0000000000..a62f80fde4 --- /dev/null +++ b/qpid/extras/dispatch/include/qpid/dispatch/error.h @@ -0,0 +1,29 @@ +#ifndef __dispatch_error_h__ +#define __dispatch_error_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef enum { + DX_ERROR_NONE = 0, + DX_ERROR_NOT_FOUND, + DX_ERROR_ALREADY_EXISTS, + DX_ERROR_ALLOC +} dx_error_t; + +#endif diff --git a/qpid/extras/dispatch/include/qpid/dispatch/hash.h b/qpid/extras/dispatch/include/qpid/dispatch/hash.h index 7f4a4bb950..bfad142517 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/hash.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/hash.h @@ -21,6 +21,7 @@ #include <stdlib.h> #include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/error.h> typedef struct hash_t hash_t; @@ -28,10 +29,10 @@ hash_t *hash(int bucket_exponent, int batch_size, int value_is_const); void hash_free(hash_t *h); size_t hash_size(hash_t *h); -int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val); -int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val); -int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val); -int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val); -int hash_remove(hash_t *h, dx_field_iterator_t *key); +dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val); +dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val); +dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val); +dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val); +dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key); #endif diff --git a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h index 9844286483..f47bbf5bad 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h @@ -54,14 +54,42 @@ typedef struct dx_field_iterator_t dx_field_iterator_t; * ^^^^^^^^^^^^^ * node-id/node/specific * ^^^^^^^^^^^^^ + * + * ITER_VIEW_ADDRESS_HASH - Isolate the hashable part of the address depending on address syntax + * + * amqp:/_local/<local> + * L^^^^^^^ + * amqp:/_topo/<area>/<router>/<local> + * A^^^^^^ + * amqp:/_topo/<my-area>/<router>/<local> + * R^^^^^^^^ + * amqp:/_topo/<my_area>/<my-router>/<local> + * L^^^^^^^ + * amqp:/_topo/<area>/all/<local> + * A^^^^^^ + * amqp:/_topo/<my-area>/all/<local> + * L^^^^^^^ + * amqp:/_topo/all/all/<local> + * L^^^^^^^ + * amqp:/<mobile> + * M^^^^^^^^ + * */ typedef enum { ITER_VIEW_ALL, ITER_VIEW_NO_HOST, ITER_VIEW_NODE_ID, - ITER_VIEW_NODE_SPECIFIC + ITER_VIEW_NODE_SPECIFIC, + ITER_VIEW_ADDRESS_HASH } dx_iterator_view_t; + +/** + * Set the area and router names for the local router. These are used to match + * my-area and my-router in address fields. + */ +void dx_field_iterator_set_address(const char *area, const char *router); + /** * Create an iterator from a null-terminated string. * @@ -87,8 +115,10 @@ void dx_field_iterator_free(dx_field_iterator_t *iter); /** * Reset the iterator to the first octet and set a new view */ -void dx_field_iterator_reset(dx_field_iterator_t *iter, - dx_iterator_view_t view); +void dx_field_iterator_reset(dx_field_iterator_t *iter); + +void dx_field_iterator_reset_view(dx_field_iterator_t *iter, + dx_iterator_view_t view); /** * Return the current octet in the iterator's view and step to the next. @@ -103,7 +133,15 @@ int dx_field_iterator_end(dx_field_iterator_t *iter); /** * Compare an input string to the iterator's view. Return true iff they are equal. */ -int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string); +int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string); + +/** + * Return true iff the string matches the characters at the current location in the view. + * This function ignores octets beyond the length of the prefix. + * This function does not alter the position of the iterator if the prefix does not match, + * if it matches, the prefix is consumed. + */ +int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix); /** * Return a copy of the iterator's view. diff --git a/qpid/extras/dispatch/include/qpid/dispatch/log.h b/qpid/extras/dispatch/include/qpid/dispatch/log.h index cbea50f266..ad659ad493 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/log.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/log.h @@ -24,7 +24,8 @@ #define LOG_ERROR 0x00000002 #define LOG_INFO 0x00000004 -void dx_log(const char *module, int cls, const char *fmt, ...); +void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...); +#define dx_log(m, c, f, ...) dx_log_impl(m, c, __FILE__, __LINE__, f , ##__VA_ARGS__) void dx_log_set_mask(int mask); diff --git a/qpid/extras/dispatch/include/qpid/dispatch/message.h b/qpid/extras/dispatch/include/qpid/dispatch/message.h index 4c1b1920b5..8311f846d6 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/message.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/message.h @@ -156,7 +156,7 @@ void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value); void dx_message_insert_uint(dx_message_t *msg, uint32_t value); void dx_message_insert_ulong(dx_message_t *msg, uint64_t value); void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len); -void dx_message_insert_string(dx_message_t *msg, const char *start); +void dx_message_insert_string(dx_message_t *msg, const char *str); void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value); void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len); void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value); diff --git a/qpid/extras/dispatch/include/qpid/dispatch/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h index 5ae2c8e846..58eb026775 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/router.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h @@ -19,6 +19,28 @@ * under the License. */ -// TODO - Add router message-passing methods +#include <stdbool.h> + +typedef struct dx_dispatch_t dx_dispatch_t; +typedef struct dx_message_t dx_message_t; +typedef struct dx_address_t dx_address_t; + + +typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg); + + +dx_address_t *dx_router_register_address(dx_dispatch_t *dx, + bool is_local, + const char *address, + dx_router_message_cb handler, + void *context); + +void dx_router_unregister_address(dx_address_t *address); + + +void dx_router_send(dx_dispatch_t *dx, + const char *address, + dx_message_t *msg); + #endif diff --git a/qpid/extras/dispatch/router/src/main.c b/qpid/extras/dispatch/router/src/main.c index fb78d9dca0..5fb194980b 100644 --- a/qpid/extras/dispatch/router/src/main.c +++ b/qpid/extras/dispatch/router/src/main.c @@ -26,17 +26,29 @@ static int exit_with_sigint = 0; static dx_dispatch_t *dispatch; +/** + * The thread_start_handler is invoked once for each server thread at thread startup. + */ static void thread_start_handler(void* context, int thread_id) { } +/** + * This is the OS signal handler, invoked on an undetermined thread at a completely + * arbitrary point of time. It is not safe to do anything here but signal the dispatch + * server with the signal number. + */ static void signal_handler(int signum) { dx_server_signal(dispatch, signum); } +/** + * This signal handler is called cleanly by one of the server's worker threads in + * response to an earlier call to dx_server_signal. + */ static void server_signal_handler(void* context, int signum) { dx_server_pause(dispatch); @@ -94,7 +106,7 @@ int main(int argc, char **argv) { dx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR); - dispatch = dx_dispatch(4); + dispatch = dx_dispatch(4, "Qpid.Dispatch", "area", "Router.A"); dx_server_set_signal_handler(dispatch, server_signal_handler, 0); dx_server_set_start_handler(dispatch, thread_start_handler, 0); diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 864d58fadc..aca9ab3560 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -18,15 +18,18 @@ */ #include "dispatch_private.h" +#include <qpid/dispatch/error.h> #include <qpid/dispatch/agent.h> +#include <qpid/dispatch/alloc.h> #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/hash.h> #include <qpid/dispatch/container.h> #include <qpid/dispatch/message.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/timer.h> +#include <qpid/dispatch/router.h> #include <string.h> - +#include <stdio.h> typedef struct dx_agent_t { dx_server_t *server; @@ -35,6 +38,7 @@ typedef struct dx_agent_t { dx_message_list_t out_fifo; sys_mutex_t *lock; dx_timer_t *timer; + dx_address_t *address; } dx_agent_t; @@ -46,12 +50,30 @@ struct dx_agent_class_t { }; +typedef struct { + dx_agent_t *agent; + dx_message_t *response_msg; +} dx_agent_request_t; + +ALLOC_DECLARE(dx_agent_request_t); +ALLOC_DEFINE(dx_agent_request_t); + + static void dx_agent_timer_handler(void *context) { // TODO - Process the in_fifo here } +static void dx_agent_rx_handler(void *context, dx_message_t *msg) +{ + dx_agent_t *agent = (dx_agent_t*) context; + DEQ_INSERT_TAIL(agent->in_fifo, msg); + dx_timer_schedule(agent->timer, 0); + printf("dx_agent_rx_handler - inbound message\n"); +} + + dx_agent_t *dx_agent(dx_dispatch_t *dx) { dx_agent_t *agent = NEW(dx_agent_t); @@ -59,8 +81,9 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx) agent->class_hash = hash(6, 10, 1); DEQ_INIT(agent->in_fifo); DEQ_INIT(agent->out_fifo); - agent->lock = sys_mutex(); - agent->timer = dx_timer(dx, dx_agent_timer_handler, agent); + agent->lock = sys_mutex(); + agent->timer = dx_timer(dx, dx_agent_timer_handler, agent); + agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent); return agent; } @@ -68,6 +91,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx) void dx_agent_free(dx_agent_t *agent) { + dx_router_unregister_address(agent->address); sys_mutex_free(agent->lock); dx_timer_free(agent->timer); hash_free(agent->class_hash); @@ -110,42 +134,42 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx, } -void dx_agent_value_string(dx_dispatch_t *dx, const void *correlator, const char *key, const char *value) +void dx_agent_value_string(const void *correlator, const char *key, const char *value) { } -void dx_agent_value_uint(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value) +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value) { } -void dx_agent_value_null(dx_dispatch_t *dx, const void *correlator, const char *key) +void dx_agent_value_null(const void *correlator, const char *key) { } -void dx_agent_value_boolean(dx_dispatch_t *dx, const void *correlator, const char *key, bool value) +void dx_agent_value_boolean(const void *correlator, const char *key, bool value) { } -void dx_agent_value_binary(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value, size_t len) +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) { } -void dx_agent_value_uuid(dx_dispatch_t *dx, const void *correlator, const char *key, const uint8_t *value) +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) { } -void dx_agent_value_timestamp(dx_dispatch_t *dx, const void *correlator, const char *key, uint64_t value) +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) { } -void dx_agent_value_complete(dx_dispatch_t *dx, const void *correlator, bool more) +void dx_agent_value_complete(const void *correlator, bool more) { } diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index 0b31ab8ab5..e65d0c4b63 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -30,6 +30,7 @@ #include <qpid/dispatch/threading.h> #include <qpid/dispatch/iterator.h> #include <qpid/dispatch/log.h> +#include <qpid/dispatch/agent.h> static char *module="CONTAINER"; @@ -63,34 +64,44 @@ typedef struct dxc_node_type_t { } dxc_node_type_t; DEQ_DECLARE(dxc_node_type_t, dxc_node_type_list_t); +static int DX_CONTAINER_CLASS_CONTAINER = 1; +static int DX_CONTAINER_CLASS_NODE_TYPE = 2; +static int DX_CONTAINER_CLASS_NODE = 3; + +typedef struct container_class_t { + dx_container_t *container; + int class_id; +} container_class_t; struct dx_container_t { + dx_dispatch_t *dx; dx_server_t *server; hash_t *node_type_map; hash_t *node_map; sys_mutex_t *lock; dx_node_t *default_node; dxc_node_type_list_t node_type_list; + dx_agent_class_t *class_container; + dx_agent_class_t *class_node_type; + dx_agent_class_t *class_node; }; static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link) { sys_mutex_lock(container->lock); - dx_node_t *node; - int result; + dx_node_t *node = 0; const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); dx_field_iterator_t *iter; // TODO - Extract the name from the structured source if (source) { iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); - result = hash_retrieve(container->node_map, iter, (void*) &node); + hash_retrieve(container->node_map, iter, (void*) &node); dx_field_iterator_free(iter); - } else - result = -1; + } sys_mutex_unlock(container->lock); - if (result < 0) { + if (node == 0) { if (container->default_node) node = container->default_node; else { @@ -119,21 +130,19 @@ static void setup_outgoing_link(dx_container_t *container, pn_link_t *pn_link) static void setup_incoming_link(dx_container_t *container, pn_link_t *pn_link) { sys_mutex_lock(container->lock); - dx_node_t *node; - int result; + dx_node_t *node = 0; const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); dx_field_iterator_t *iter; // TODO - Extract the name from the structured target if (target) { iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); - result = hash_retrieve(container->node_map, iter, (void*) &node); + hash_retrieve(container->node_map, iter, (void*) &node); dx_field_iterator_free(iter); - } else - result = -1; + } sys_mutex_unlock(container->lock); - if (result < 0) { + if (node == 0) { if (container->default_node) node = container->default_node; else { @@ -404,10 +413,49 @@ static int handler(void *handler_context, void *conn_context, dx_conn_event_t ev } +static void container_schema_handler(void *context, const void *correlator) +{ +} + + +static void container_query_handler(void* context, const char *id, const void *correlator) +{ + container_class_t *cls = (container_class_t*) context; + + if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) { + dx_agent_value_uint(correlator, "node_type_count", hash_size(cls->container->node_type_map)); + dx_agent_value_uint(correlator, "node_count", hash_size(cls->container->node_map)); + if (cls->container->default_node) + dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name); + else + dx_agent_value_null(correlator, "default_node_type"); + dx_agent_value_complete(correlator, false); + + } else if (cls->class_id == DX_CONTAINER_CLASS_NODE_TYPE) { + + } else if (cls->class_id == DX_CONTAINER_CLASS_NODE) { + + } +} + + +dx_agent_class_t *setup_class(dx_container_t *container, const char *fqname, int id) +{ + container_class_t *cls = NEW(container_class_t); + cls->container = container; + cls->class_id = id; + + return dx_agent_register_class(container->dx, fqname, cls, + container_schema_handler, + container_query_handler); +} + + dx_container_t *dx_container(dx_dispatch_t *dx) { dx_container_t *container = NEW(dx_container_t); + container->dx = dx; container->server = dx->server; container->node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 container->node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 @@ -422,6 +470,17 @@ dx_container_t *dx_container(dx_dispatch_t *dx) } +void dx_container_setup_agent(dx_dispatch_t *dx) +{ + dx->container->class_container = + setup_class(dx->container, "org.apache.qpid.dispatch.container", DX_CONTAINER_CLASS_CONTAINER); + dx->container->class_node_type = + setup_class(dx->container, "org.apache.qpid.dispatch.container.node_type", DX_CONTAINER_CLASS_NODE_TYPE); + dx->container->class_node = + setup_class(dx->container, "org.apache.qpid.dispatch.container.node", DX_CONTAINER_CLASS_NODE); +} + + void dx_container_free(dx_container_t *container) { // TODO - Free the nodes diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c index 87eb535b8a..47a1a07330 100644 --- a/qpid/extras/dispatch/src/dispatch.c +++ b/qpid/extras/dispatch/src/dispatch.c @@ -24,27 +24,44 @@ /** * Private Function Prototypes */ -dx_server_t *dx_server(int tc); +dx_server_t *dx_server(int tc, const char *container_name); +void dx_server_setup_agent(dx_dispatch_t *dx); void dx_server_free(dx_server_t *server); dx_container_t *dx_container(dx_dispatch_t *dx); +void dx_container_setup_agent(dx_dispatch_t *dx); void dx_container_free(dx_container_t *container); -dx_router_t *dx_router(dx_dispatch_t *dx); +dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id); +void dx_router_setup_agent(dx_dispatch_t *dx); void dx_router_free(dx_router_t *router); dx_agent_t *dx_agent(dx_dispatch_t *dx); void dx_agent_free(dx_agent_t *agent); -dx_dispatch_t *dx_dispatch(int thread_count) +dx_dispatch_t *dx_dispatch(int thread_count, const char *container_name, + const char *router_area, const char *router_id) { dx_dispatch_t *dx = NEW(dx_dispatch_t); dx_alloc_initialize(); - dx->server = dx_server(thread_count); + if (!container_name) + container_name = "00000000-0000-0000-0000-000000000000"; // TODO - gen a real uuid + + if (!router_area) + router_area = "area"; + + if (!router_id) + router_id = container_name; + + dx->server = dx_server(thread_count, container_name); dx->container = dx_container(dx); - dx->router = dx_router(dx); + dx->router = dx_router(dx, router_area, router_id); dx->agent = dx_agent(dx); + dx_server_setup_agent(dx); + dx_container_setup_agent(dx); + dx_router_setup_agent(dx); + return dx; } diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c index c54d5d6fcf..19744366aa 100644 --- a/qpid/extras/dispatch/src/hash.c +++ b/qpid/extras/dispatch/src/hash.c @@ -58,6 +58,7 @@ static unsigned long hash_function(dx_field_iterator_t *iter) unsigned long hash = 5381; int c; + dx_field_iterator_reset(iter); while (!dx_field_iterator_end(iter)) { c = (int) dx_field_iterator_octet(iter); hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ @@ -101,13 +102,11 @@ size_t hash_size(hash_t *h) } -static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error) +static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); - *error = 0; - while (item) { if (dx_field_iterator_equal(key, item->key)) break; @@ -115,40 +114,44 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in } if (item) { - *error = -1; - return 0; + *exists = 1; + return item; } item = new_hash_item_t(); - if (!item) { - *error = -2; + if (!item) return 0; - } DEQ_ITEM_INIT(item); item->key = dx_field_iterator_copy(key); DEQ_INSERT_TAIL(h->buckets[idx].items, item); h->size++; + *exists = 0; return item; } -int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) +dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) { - int error = 0; - hash_item_t *item = hash_internal_insert(h, key, &error); + int exists = 0; + hash_item_t *item = hash_internal_insert(h, key, &exists); - if (item) - item->v.val = val; - return error; + if (!item) + return DX_ERROR_ALLOC; + + if (exists) + return DX_ERROR_ALREADY_EXISTS; + + item->v.val = val; + + return DX_ERROR_NONE; } -int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) +dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) { - if (!h->is_const) - return -3; + assert(h->is_const); int error = 0; hash_item_t *item = hash_internal_insert(h, key, &error); @@ -174,32 +177,33 @@ static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key) } -int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) +dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) { hash_item_t *item = hash_internal_retrieve(h, key); - if (item) { + if (item) *val = item->v.val; - return 0; - } - return -1; + else + *val = 0; + + return DX_ERROR_NONE; } -int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) +dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) { - if (!h->is_const) - return -3; + assert(h->is_const); hash_item_t *item = hash_internal_retrieve(h, key); - if (item) { + if (item) *val = item->v.val_const; - return 0; - } - return -1; + else + *val = 0; + + return DX_ERROR_NONE; } -int hash_remove(hash_t *h, dx_field_iterator_t *key) +dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); @@ -215,9 +219,9 @@ int hash_remove(hash_t *h, dx_field_iterator_t *key) DEQ_REMOVE(h->buckets[idx].items, item); free_hash_item_t(item); h->size--; - return 0; + return DX_ERROR_NONE; } - return -1; + return DX_ERROR_NOT_FOUND; } diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c index 6ab67f948d..92a7a1f479 100644 --- a/qpid/extras/dispatch/src/iterator.c +++ b/qpid/extras/dispatch/src/iterator.c @@ -25,19 +25,25 @@ #include <string.h> typedef enum { -MODE_TO_END, -MODE_TO_SLASH + MODE_TO_END, + MODE_TO_SLASH } parse_mode_t; +typedef struct { + dx_buffer_t *buffer; + unsigned char *cursor; + int length; +} pointer_t; + struct dx_field_iterator_t { - dx_buffer_t *start_buffer; - unsigned char *start_cursor; - int start_length; - dx_buffer_t *buffer; - unsigned char *cursor; - int length; + pointer_t start_pointer; + pointer_t view_start_pointer; + pointer_t pointer; dx_iterator_view_t view; parse_mode_t mode; + unsigned char prefix; + int at_prefix; + int view_prefix; }; @@ -46,28 +52,86 @@ ALLOC_DEFINE(dx_field_iterator_t); typedef enum { -STATE_START, -STATE_SLASH_LEFT, -STATE_SKIPPING_TO_NEXT_SLASH, -STATE_SCANNING, -STATE_COLON, -STATE_COLON_SLASH, -STATE_AT_NODE_ID + STATE_START, + STATE_SLASH_LEFT, + STATE_SKIPPING_TO_NEXT_SLASH, + STATE_SCANNING, + STATE_COLON, + STATE_COLON_SLASH, + STATE_AT_NODE_ID } state_t; +static char *my_area = ""; +static char *my_router = ""; + + +static void parse_address_view(dx_field_iterator_t *iter) +{ + // + // This function starts with an iterator view that is identical to + // ITER_VIEW_NO_HOST. We will now further refine the view in order + // to aid the router in looking up addresses. + // + + if (dx_field_iterator_prefix(iter, "_")) { + if (dx_field_iterator_prefix(iter, "local/")) { + iter->prefix = 'L'; + iter->at_prefix = 1; + iter->view_prefix = 1; + return; + } + + if (dx_field_iterator_prefix(iter, "topo/")) { + if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_area)) { + if (dx_field_iterator_prefix(iter, "all/") || dx_field_iterator_prefix(iter, my_router)) { + iter->prefix = 'L'; + iter->at_prefix = 1; + iter->view_prefix = 1; + return; + } + + iter->prefix = 'R'; + iter->at_prefix = 1; + iter->view_prefix = 1; + iter->mode = MODE_TO_SLASH; + return; + } + + iter->prefix = 'A'; + iter->at_prefix = 1; + iter->view_prefix = 1; + iter->mode = MODE_TO_SLASH; + return; + } + } + + iter->prefix = 'M'; + iter->at_prefix = 1; + iter->view_prefix = 1; +} + + static void view_initialize(dx_field_iterator_t *iter) { - if (iter->view == ITER_VIEW_ALL) { - iter->mode = MODE_TO_END; + // + // The default behavior is for the view to *not* have a prefix. + // We'll add one if it's needed later. + // + iter->at_prefix = 0; + iter->view_prefix = 0; + iter->mode = MODE_TO_END; + + if (iter->view == ITER_VIEW_ALL) return; - } // // Advance to the node-id. // - state_t state = STATE_START; - unsigned int octet; + state_t state = STATE_START; + unsigned int octet; + pointer_t save_pointer = {0,0,0}; + while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { octet = dx_field_iterator_octet(iter); switch (state) { @@ -96,17 +160,20 @@ static void view_initialize(dx_field_iterator_t *iter) break; case STATE_COLON : - if (octet == '/') + if (octet == '/') { state = STATE_COLON_SLASH; - else + save_pointer = iter->pointer; + } else state = STATE_SCANNING; break; case STATE_COLON_SLASH : if (octet == '/') state = STATE_SKIPPING_TO_NEXT_SLASH; - else - state = STATE_SCANNING; + else { + state = STATE_AT_NODE_ID; + iter->pointer = save_pointer; + } break; case STATE_AT_NODE_ID : @@ -119,9 +186,7 @@ static void view_initialize(dx_field_iterator_t *iter) // The address string was relative, not absolute. The node-id // is at the beginning of the string. // - iter->buffer = iter->start_buffer; - iter->cursor = iter->start_cursor; - iter->length = iter->start_length; + iter->pointer = iter->start_pointer; } // @@ -137,6 +202,12 @@ static void view_initialize(dx_field_iterator_t *iter) return; } + if (iter->view == ITER_VIEW_ADDRESS_HASH) { + iter->mode = MODE_TO_END; + parse_address_view(iter); + return; + } + if (iter->view == ITER_VIEW_NODE_SPECIFIC) { iter->mode = MODE_TO_END; while (!dx_field_iterator_end(iter)) { @@ -149,17 +220,29 @@ static void view_initialize(dx_field_iterator_t *iter) } +void dx_field_iterator_set_address(const char *area, const char *router) +{ + my_area = (char*) malloc(strlen(area) + 2); + strcpy(my_area, area); + strcat(my_area, "/"); + + my_router = (char*) malloc(strlen(router) + 2); + strcpy(my_router, router); + strcat(my_router, "/"); +} + + dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view) { dx_field_iterator_t *iter = new_dx_field_iterator_t(); if (!iter) return 0; - iter->start_buffer = 0; - iter->start_cursor = (unsigned char*) text; - iter->start_length = strlen(text); + iter->start_pointer.buffer = 0; + iter->start_pointer.cursor = (unsigned char*) text; + iter->start_pointer.length = strlen(text); - dx_field_iterator_reset(iter, view); + dx_field_iterator_reset_view(iter, view); return iter; } @@ -171,11 +254,11 @@ dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, i if (!iter) return 0; - iter->start_buffer = buffer; - iter->start_cursor = dx_buffer_base(buffer) + offset; - iter->start_length = length; + iter->start_pointer.buffer = buffer; + iter->start_pointer.cursor = dx_buffer_base(buffer) + offset; + iter->start_pointer.length = length; - dx_field_iterator_reset(iter, view); + dx_field_iterator_reset_view(iter, view); return iter; } @@ -187,40 +270,52 @@ void dx_field_iterator_free(dx_field_iterator_t *iter) } -void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view) +void dx_field_iterator_reset(dx_field_iterator_t *iter) +{ + iter->pointer = iter->view_start_pointer; + iter->at_prefix = iter->view_prefix; +} + + +void dx_field_iterator_reset_view(dx_field_iterator_t *iter, dx_iterator_view_t view) { - iter->buffer = iter->start_buffer; - iter->cursor = iter->start_cursor; - iter->length = iter->start_length; - iter->view = view; + iter->pointer = iter->start_pointer; + iter->view = view; view_initialize(iter); + + iter->view_start_pointer = iter->pointer; } unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) { - if (iter->length == 0) + if (iter->at_prefix) { + iter->at_prefix = 0; + return iter->prefix; + } + + if (iter->pointer.length == 0) return (unsigned char) 0; - unsigned char result = *(iter->cursor); + unsigned char result = *(iter->pointer.cursor); - iter->cursor++; - iter->length--; + iter->pointer.cursor++; + iter->pointer.length--; - if (iter->length > 0) { - if (iter->buffer) { - if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) { - iter->buffer = iter->buffer->next; - if (iter->buffer == 0) - iter->length = 0; - iter->cursor = dx_buffer_base(iter->buffer); + if (iter->pointer.length > 0) { + if (iter->pointer.buffer) { + if (iter->pointer.cursor - dx_buffer_base(iter->pointer.buffer) == dx_buffer_size(iter->pointer.buffer)) { + iter->pointer.buffer = iter->pointer.buffer->next; + if (iter->pointer.buffer == 0) + iter->pointer.length = 0; + iter->pointer.cursor = dx_buffer_base(iter->pointer.buffer); } } } - if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/') - iter->length = 0; + if (iter->pointer.length && iter->mode == MODE_TO_SLASH && *(iter->pointer.cursor) == '/') + iter->pointer.length = 0; return result; } @@ -228,13 +323,13 @@ unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) int dx_field_iterator_end(dx_field_iterator_t *iter) { - return iter->length == 0; + return iter->pointer.length == 0; } -int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) +int dx_field_iterator_equal(dx_field_iterator_t *iter, const unsigned char *string) { - dx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter); while (!dx_field_iterator_end(iter) && *string) { if (*string != dx_field_iterator_octet(iter)) return 0; @@ -245,19 +340,39 @@ int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) } +int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix) +{ + pointer_t save_pointer = iter->pointer; + unsigned char *c = (unsigned char*) prefix; + + while(*c) { + if (*c != dx_field_iterator_octet(iter)) + break; + c++; + } + + if (*c) { + iter->pointer = save_pointer; + return 0; + } + + return 1; +} + + unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) { int length = 0; int idx = 0; unsigned char *copy; - dx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter); while (!dx_field_iterator_end(iter)) { dx_field_iterator_octet(iter); length++; } - dx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter); copy = (unsigned char*) malloc(length + 1); while (!dx_field_iterator_end(iter)) copy[idx++] = dx_field_iterator_octet(iter); diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c index d4ec534915..c6cffe0321 100644 --- a/qpid/extras/dispatch/src/log.c +++ b/qpid/extras/dispatch/src/log.c @@ -18,11 +18,36 @@ */ #include <qpid/dispatch/log.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> #include <stdarg.h> #include <stdio.h> #include <string.h> +#include <sys/time.h> -static int mask=LOG_INFO; +#define TEXT_MAX 512 +#define LIST_MAX 1000 + +typedef struct dx_log_entry_t dx_log_entry_t; + +struct dx_log_entry_t { + DEQ_LINKS(dx_log_entry_t); + const char *module; + int cls; + const char *file; + int line; + struct timeval tv; + char text[TEXT_MAX]; +}; + +ALLOC_DECLARE(dx_log_entry_t); +ALLOC_DEFINE(dx_log_entry_t); + +DEQ_DECLARE(dx_log_entry_t, dx_log_list_t); + +static int mask = LOG_INFO; +static dx_log_list_t entries; +static int list_init = 0; static char *cls_prefix(int cls) { @@ -35,18 +60,36 @@ static char *cls_prefix(int cls) return ""; } -void dx_log(const char *module, int cls, const char *fmt, ...) +void dx_log_impl(const char *module, int cls, const char *file, int line, const char *fmt, ...) { if (!(cls & mask)) return; + if (list_init == 0) { + list_init = 1; + DEQ_INIT(entries); + } + + dx_log_entry_t *entry = new_dx_log_entry_t(); + entry->module = module; + entry->cls = cls; + entry->file = file; + entry->line = line; + gettimeofday(&entry->tv, 0); + va_list ap; - char line[128]; va_start(ap, fmt); - vsnprintf(line, 127, fmt, ap); + vsnprintf(entry->text, TEXT_MAX, fmt, ap); va_end(ap); - fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line); + fprintf(stderr, "%s (%s) %s\n", module, cls_prefix(cls), entry->text); + + DEQ_INSERT_TAIL(entries, entry); + if (DEQ_SIZE(entries) > LIST_MAX) { + entry = DEQ_HEAD(entries); + DEQ_REMOVE_HEAD(entries); + free_dx_log_entry_t(entry); + } } void dx_log_set_mask(int _mask) diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index c914c5ca7b..fd08753f4d 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -1081,19 +1081,19 @@ void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t le } -void dx_message_insert_string(dx_message_t *msg, const char *start) +void dx_message_insert_string(dx_message_t *msg, const char *str) { dx_message_content_t *content = MSG_CONTENT(msg); - uint32_t len = strlen(start); + uint32_t len = strlen(str); if (len < 256) { dx_insert_8(content, 0xa1); // str8-utf8 dx_insert_8(content, (uint8_t) len); - dx_insert(content, (const uint8_t*) start, len); + dx_insert(content, (const uint8_t*) str, len); } else { dx_insert_8(content, 0xb1); // str32-utf8 dx_insert_32(content, len); - dx_insert(content, (const uint8_t*) start, len); + dx_insert(content, (const uint8_t*) str, len); } content->count++; } diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 0513b08a6b..65756be215 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -18,13 +18,34 @@ */ #include <stdio.h> +#include <string.h> #include <qpid/dispatch.h> #include "dispatch_private.h" -static char *module="ROUTER_NODE"; +static char *module = "ROUTER"; + +//static char *local_prefix = "_local/"; +//static char *topo_prefix = "_topo/"; + +/** + * Address Types and Processing: + * + * Address Hash Compare onReceive onEmit + * ============================================================================= + * _local/<local> L<local> handler forward + * _topo/<area>/<router>/<local> A<area> forward forward + * _topo/<my-area>/<router>/<local> R<router> forward forward + * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward + * _topo/<area>/all/<local> A<area> forward forward + * _topo/<my-area>/all/<local> L<local> forward+handler forward + * _topo/all/all/<local> L<local> forward+handler forward + * <mobile> M<mobile> forward+handler forward + */ struct dx_router_t { dx_dispatch_t *dx; + const char *router_area; + const char *router_id; dx_node_t *node; dx_link_list_t in_links; dx_link_list_t out_links; @@ -41,11 +62,32 @@ typedef struct { dx_message_list_t out_fifo; } dx_router_link_t; - ALLOC_DECLARE(dx_router_link_t); ALLOC_DEFINE(dx_router_link_t); +typedef struct { + const char *id; + dx_router_link_t *next_hop; + // list of valid origins (pointers to router_node) - (bit masks?) +} dx_router_node_t; + +ALLOC_DECLARE(dx_router_node_t); +ALLOC_DEFINE(dx_router_node_t); + + +struct dx_address_t { + int is_local; + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; + dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list + dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list +}; + +ALLOC_DECLARE(dx_address_t); +ALLOC_DEFINE(dx_address_t); + + /** * Outbound Delivery Handler */ @@ -119,22 +161,42 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del if (valid_message) { dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); - dx_router_link_t *rlink; + dx_address_t *addr; if (iter) { - dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); + hash_retrieve(router->out_hash, iter, (void*) &addr); dx_field_iterator_free(iter); - if (result == 0) { + if (addr) { + // + // To field is valid and contains a known destination. Handle the various + // cases for forwarding. + // + // Forward to the in-process handler for this message if there is one. + // + if (addr->handler) + addr->handler(addr->handler_context, msg); + // - // To field is valid and contains a known destination. Enqueue on - // the output fifo for the next-hop-to-destination. + // Forward to the local link for the locally-connected consumer, if present. + // TODO - Don't forward if this is a "_local" address. // - pn_link_t* pn_outlink = dx_link_pn(rlink->link); - DEQ_INSERT_TAIL(rlink->out_fifo, msg); - pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo)); - dx_link_activate(rlink->link); + if (addr->rlink) { + pn_link_t* pn_outlink = dx_link_pn(addr->rlink->link); + DEQ_INSERT_TAIL(addr->rlink->out_fifo, msg); + pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); + dx_link_activate(addr->rlink->link); + } + + // + // Forward to the next-hop for a remotely-connected consumer, if present. + // Don't forward if this is a "_local" address. + // + if (addr->rnode) { + // TODO + } + } else { // // To field contains an unknown address. Release the message. @@ -242,17 +304,30 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) pn_link_t *pn_link = dx_link_pn(link); const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - sys_mutex_lock(router->lock); dx_router_link_t *rlink = new_dx_router_link_t(); rlink->link = link; DEQ_INIT(rlink->out_fifo); dx_link_set_context(link, rlink); - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - int result = hash_insert(router->out_hash, iter, rlink); + dx_address_t *addr; + + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); + + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + addr->is_local = 0; + addr->handler = 0; + addr->handler_context = 0; + addr->rlink = 0; + addr->rnode = 0; + hash_insert(router->out_hash, iter, addr); + } dx_field_iterator_free(iter); - if (result == 0) { + if (addr->rlink == 0) { + addr->rlink = rlink; pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); @@ -314,14 +389,14 @@ static int router_link_detach_handler(void* context, dx_link_t *link, int closed if (pn_link_is_sender(pn_link)) { item = DEQ_HEAD(router->out_links); - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink; + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); + dx_address_t *addr; if (iter) { - int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); - if (result == 0) { - dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (addr) { hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); + free_dx_router_link_t(addr->rlink); + free_dx_address_t(addr); dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); } dx_field_iterator_free(iter); @@ -383,7 +458,7 @@ static dx_node_type_t router_node = {"router", 0, 0, static int type_registered = 0; -dx_router_t *dx_router(dx_dispatch_t *dx) +dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) { if (!type_registered) { type_registered = 1; @@ -397,8 +472,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx) DEQ_INIT(router->out_links); DEQ_INIT(router->in_fifo); - router->dx = dx; - router->lock = sys_mutex(); + router->dx = dx; + router->lock = sys_mutex(); + router->router_area = area; + router->router_id = id; router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); dx_timer_schedule(router->timer, 0); // Immediate @@ -406,10 +483,22 @@ dx_router_t *dx_router(dx_dispatch_t *dx) router->out_hash = hash(10, 32, 0); router->dtag = 1; + // + // Inform the field iterator module of this router's id and area. The field iterator + // uses this to offload some of the address-processing load from the router. + // + dx_field_iterator_set_address(area, id); + return router; } +void dx_router_setup_agent(dx_dispatch_t *dx) +{ + // TODO +} + + void dx_router_free(dx_router_t *router) { dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH); @@ -417,3 +506,54 @@ void dx_router_free(dx_router_t *router) free(router); } + +dx_address_t *dx_router_register_address(dx_dispatch_t *dx, + bool is_local, + const char *address, + dx_router_message_cb handler, + void *context) +{ + char addr[1000]; + dx_address_t *ad = new_dx_address_t(); + dx_field_iterator_t *iter; + int result; + + if (!ad) + return 0; + + ad->is_local = is_local; + ad->handler = handler; + ad->handler_context = context; + ad->rlink = 0; + + if (ad->is_local) + strcpy(addr, "L"); // Local Hash-Key Space + else + strcpy(addr, "M"); // Mobile Hash-Key Space + + strcat(addr, address); + iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST); + result = hash_insert(dx->router->out_hash, iter, ad); + dx_field_iterator_free(iter); + if (result != 0) { + free_dx_address_t(ad); + return 0; + } + + dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + return ad; +} + + +void dx_router_unregister_address(dx_address_t *ad) +{ + free_dx_address_t(ad); +} + + +void dx_router_send(dx_dispatch_t *dx, + const char *address, + dx_message_t *msg) +{ +} + diff --git a/qpid/extras/dispatch/src/server.c b/qpid/extras/dispatch/src/server.c index a2d2d4980a..536af048d8 100644 --- a/qpid/extras/dispatch/src/server.c +++ b/qpid/extras/dispatch/src/server.c @@ -44,6 +44,7 @@ typedef struct dx_thread_t { struct dx_server_t { int thread_count; + const char *container_name; pn_driver_t *driver; dx_thread_start_cb_t start_handler; dx_conn_handler_cb_t conn_handler; @@ -201,7 +202,7 @@ static void process_connector(dx_server_t *dx_server, pn_connector_t *cxtr) ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); - pn_connection_set_container(conn, "dispatch"); // TODO - make unique + pn_connection_set_container(conn, dx_server->container_name); pn_connector_set_connection(cxtr, conn); pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; @@ -557,7 +558,7 @@ static void cxtr_try_open(void *context) } -dx_server_t *dx_server(int thread_count) +dx_server_t *dx_server(int thread_count, const char *container_name) { int i; @@ -566,6 +567,7 @@ dx_server_t *dx_server(int thread_count) return 0; dx_server->thread_count = thread_count; + dx_server->container_name = container_name; dx_server->driver = pn_driver(); dx_server->start_handler = 0; dx_server->conn_handler = 0; @@ -596,6 +598,12 @@ dx_server_t *dx_server(int thread_count) } +void dx_server_setup_agent(dx_dispatch_t *dx) +{ + // TODO +} + + void dx_server_free(dx_server_t *dx_server) { int i; diff --git a/qpid/extras/dispatch/tests/CMakeLists.txt b/qpid/extras/dispatch/tests/CMakeLists.txt index b92f66939f..362aac08e7 100644 --- a/qpid/extras/dispatch/tests/CMakeLists.txt +++ b/qpid/extras/dispatch/tests/CMakeLists.txt @@ -22,8 +22,9 @@ ## set(unit_test_SOURCES alloc_test.c + field_test.c message_test.c - run_tests.c + run_unit_tests.c server_test.c timer_test.c tool_test.c @@ -32,7 +33,7 @@ set(unit_test_SOURCES add_executable(unit_tests ${unit_test_SOURCES}) target_link_libraries(unit_tests qpid-dispatch) -add_test(unit_tests_buf_512 unit_tests 512) -add_test(unit_tests_buf_10K unit_tests 10000) -add_test(unit_tests_buf_10 unit_tests 10) -add_test(unit_tests_buf_1 unit_tests 1) +add_test(unit_tests_buf_10000 unit_tests 10000) +add_test(unit_tests_buf_512 unit_tests 512) +add_test(unit_tests_buf_10 unit_tests 10) +add_test(unit_tests_buf_1 unit_tests 1) diff --git a/qpid/extras/dispatch/tests/field_test.c b/qpid/extras/dispatch/tests/field_test.c new file mode 100644 index 0000000000..59fc3c4cfb --- /dev/null +++ b/qpid/extras/dispatch/tests/field_test.c @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "test_case.h" +#include <stdio.h> +#include <string.h> +#include <qpid/dispatch/iterator.h> + +#define FAIL_TEXT_SIZE 10000 +static char fail_text[FAIL_TEXT_SIZE]; + +static char* test_view_global_dns(void *context) +{ + dx_field_iterator_t *iter = dx_field_iterator_string("amqp://host/global/sub", ITER_VIEW_ALL); + if (!dx_field_iterator_equal(iter, (unsigned char*) "amqp://host/global/sub")) + return "ITER_VIEW_ALL failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub")) + return "ITER_VIEW_NO_HOST failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_ID); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global")) + return "ITER_VIEW_NODE_ID failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_SPECIFIC); + if (!dx_field_iterator_equal(iter, (unsigned char*) "sub")) + return "ITER_VIEW_NODE_SPECIFIC failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + if (!dx_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub")) + return "ITER_VIEW_ADDRESS_HASH failed"; + + return 0; +} + + +static char* test_view_global_non_dns(void *context) +{ + dx_field_iterator_t *iter = dx_field_iterator_string("amqp:/global/sub", ITER_VIEW_ALL); + if (!dx_field_iterator_equal(iter, (unsigned char*) "amqp:/global/sub")) + return "ITER_VIEW_ALL failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub")) + return "ITER_VIEW_NO_HOST failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_ID); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global")) + return "ITER_VIEW_NODE_ID failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_SPECIFIC); + if (!dx_field_iterator_equal(iter, (unsigned char*) "sub")) + return "ITER_VIEW_NODE_SPECIFIC failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + if (!dx_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub")) + return "ITER_VIEW_ADDRESS_HASH failed"; + + return 0; +} + + +static char* test_view_global_no_host(void *context) +{ + dx_field_iterator_t *iter = dx_field_iterator_string("global/sub", ITER_VIEW_ALL); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub")) + return "ITER_VIEW_ALL failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global/sub")) + return "ITER_VIEW_NO_HOST failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_ID); + if (!dx_field_iterator_equal(iter, (unsigned char*) "global")) + return "ITER_VIEW_NODE_ID failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_NODE_SPECIFIC); + if (!dx_field_iterator_equal(iter, (unsigned char*) "sub")) + return "ITER_VIEW_NODE_SPECIFIC failed"; + + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + if (!dx_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub")) + return "ITER_VIEW_ADDRESS_HASH failed"; + + return 0; +} + + +static char* test_view_address_hash(void *context) +{ + struct {const char *addr; const char *view;} cases[] = { + {"amqp:/_local/my-addr/sub", "Lmy-addr/sub"}, + {"amqp:/_local/my-addr", "Lmy-addr"}, + {"amqp:/_topo/area/router/local/sub", "Aarea"}, + {"amqp:/_topo/my-area/router/local/sub", "Rrouter"}, + {"amqp:/_topo/my-area/my-router/local/sub", "Llocal/sub"}, + {"amqp:/_topo/area/all/local/sub", "Aarea"}, + {"amqp:/_topo/my-area/all/local/sub", "Llocal/sub"}, + {"amqp:/_topo/all/all/local/sub", "Llocal/sub"}, + {"amqp://host:port/_local/my-addr", "Lmy-addr"}, + {0, 0} + }; + int idx; + + for (idx = 0; cases[idx].addr; idx++) { + dx_field_iterator_t *iter = dx_field_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH); + if (!dx_field_iterator_equal(iter, (unsigned char*) cases[idx].view)) { + char *got = (char*) dx_field_iterator_copy(iter); + snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed. Expected '%s', got '%s'", + cases[idx].addr, cases[idx].view, got); + return fail_text; + } + } + + return 0; +} + + +int field_tests(void) +{ + int result = 0; + + dx_field_iterator_set_address("my-area", "my-router"); + + TEST_CASE(test_view_global_dns, 0); + TEST_CASE(test_view_global_non_dns, 0); + TEST_CASE(test_view_global_no_host, 0); + TEST_CASE(test_view_address_hash, 0); + + return result; +} + diff --git a/qpid/extras/dispatch/tests/run_tests.c b/qpid/extras/dispatch/tests/run_unit_tests.c index 765dad5c66..01a8ae16b3 100644 --- a/qpid/extras/dispatch/tests/run_tests.c +++ b/qpid/extras/dispatch/tests/run_unit_tests.c @@ -24,6 +24,7 @@ int timer_tests(); int alloc_tests(); int server_tests(); int message_tests(); +int field_tests(); int main(int argc, char** argv) { @@ -43,6 +44,7 @@ int main(int argc, char** argv) result += alloc_tests(); result += server_tests(); result += message_tests(); + result += field_tests(); return result; } diff --git a/qpid/extras/dispatch/tests/server_test.c b/qpid/extras/dispatch/tests/server_test.c index 5cacd47bae..4f68ecb421 100644 --- a/qpid/extras/dispatch/tests/server_test.c +++ b/qpid/extras/dispatch/tests/server_test.c @@ -116,7 +116,7 @@ static char* test_start_handler(void *context) { int i; - dx = dx_dispatch(THREAD_COUNT); + dx = dx_dispatch(THREAD_COUNT, 0, 0, 0); expected_context = (void*) 0x00112233; stored_error[0] = 0x0; @@ -139,7 +139,7 @@ static char* test_start_handler(void *context) static char *test_server_start(void *context) { - dx = dx_dispatch(THREAD_COUNT); + dx = dx_dispatch(THREAD_COUNT, 0, 0, 0); dx_server_start(dx); dx_server_stop(dx); dx_dispatch_free(dx); @@ -153,7 +153,7 @@ static char* test_user_fd(void *context) int res; dx_timer_t *timer; - dx = dx_dispatch(THREAD_COUNT); + dx = dx_dispatch(THREAD_COUNT, 0, 0, 0); dx_server_set_user_fd_handler(dx, ufd_handler); timer = dx_timer(dx, fd_test_start, 0); dx_timer_schedule(timer, 0); |