diff options
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
-rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 1107 |
1 files changed, 622 insertions, 485 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index d2704c4bd5..931f4551bb 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -21,17 +21,17 @@ #include <stdio.h> #include <string.h> #include <stdbool.h> +#include <stdlib.h> #include <qpid/dispatch.h> #include "dispatch_private.h" +#include "router_private.h" static char *module = "ROUTER"; -static void dx_router_python_setup(dx_router_t *router); -static void dx_pyrouter_tick(dx_router_t *router); - -static char *router_address = "_local/qdxrouter"; -static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *router_role = "inter-router"; +static char *local_prefix = "_local/"; +static char *topo_prefix = "_topo/"; +static char *direct_prefix; /** * Address Types and Processing: @@ -48,86 +48,197 @@ static char *local_prefix = "_local/"; * <mobile> M<mobile> forward handler */ +ALLOC_DEFINE(dx_routed_event_t); +ALLOC_DEFINE(dx_router_link_t); +ALLOC_DEFINE(dx_router_node_t); +ALLOC_DEFINE(dx_router_ref_t); +ALLOC_DEFINE(dx_router_link_ref_t); +ALLOC_DEFINE(dx_address_t); +ALLOC_DEFINE(dx_router_conn_t); -typedef struct dx_router_link_t dx_router_link_t; -typedef struct dx_router_node_t dx_router_node_t; +void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); + DEQ_ITEM_INIT(ref); + ref->link = link; + link->ref = ref; + DEQ_INSERT_TAIL(*ref_list, ref); +} -typedef enum { - DX_LINK_ENDPOINT, // A link to a connected endpoint - DX_LINK_ROUTER, // A link to a peer router in the same area - DX_LINK_AREA // A link to a peer router in a different area (area boundary) -} dx_link_type_t; +void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + if (link->ref) { + DEQ_REMOVE(*ref_list, link->ref); + free_dx_router_link_ref_t(link->ref); + link->ref = 0; + } +} -typedef struct dx_routed_event_t { - DEQ_LINKS(struct dx_routed_event_t); - dx_delivery_t *delivery; - dx_message_t *message; - bool settle; - uint64_t disposition; -} dx_routed_event_t; -ALLOC_DECLARE(dx_routed_event_t); -ALLOC_DEFINE(dx_routed_event_t); -DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); - - -struct dx_router_link_t { - DEQ_LINKS(dx_router_link_t); - dx_direction_t link_direction; - dx_link_type_t link_type; - dx_address_t *owning_addr; // [ref] Address record that owns this link - dx_link_t *link; // [own] Link pointer - dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link - dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) - dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries -}; - -ALLOC_DECLARE(dx_router_link_t); -ALLOC_DEFINE(dx_router_link_t); -DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); +void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode) +{ + dx_router_ref_t *ref = new_dx_router_ref_t(); + DEQ_ITEM_INIT(ref); + ref->router = rnode; + rnode->ref_count++; + DEQ_INSERT_TAIL(*ref_list, ref); +} -struct dx_router_node_t { - DEQ_LINKS(dx_router_node_t); - const char *id; - dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node - dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node - // list of valid origins (pointers to router_node) - (bit masks?) -}; -ALLOC_DECLARE(dx_router_node_t); -ALLOC_DEFINE(dx_router_node_t); -DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); +void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode) +{ + dx_router_ref_t *ref = DEQ_HEAD(*ref_list); + while (ref) { + if (ref->router == rnode) { + DEQ_REMOVE(*ref_list, ref); + free_dx_router_ref_t(ref); + rnode->ref_count--; + break; + } + ref = DEQ_NEXT(ref); + } +} -struct dx_address_t { - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_list_t rlinks; // Locally-Connected Consumers - dx_router_node_list_t rnodes; // Remotely-Connected Consumers -}; +/** + * Check an address to see if it no longer has any associated destinations. + * Depending on its policy, the address may be eligible for being closed out + * (i.e. Logging its terminal statistics and freeing its resources). + */ +void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local) +{ + if (addr == 0) + return; -ALLOC_DECLARE(dx_address_t); -ALLOC_DEFINE(dx_address_t); + unsigned char *key = 0; + int to_delete = 0; + int no_more_locals = 0; + + sys_mutex_lock(router->lock); + + // + // If the address has no handlers or destinations, it should be deleted. + // + if (addr->handler == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0) + to_delete = 1; + + // + // If we have just removed a local linkage and it was the last local linkage, + // we need to notify the router module that there is no longer a local + // presence of this address. + // + if (was_local && DEQ_SIZE(addr->rlinks) == 0) + no_more_locals = 1; + + if (to_delete) { + // + // Delete the address but grab the hash key so we can use it outside the + // critical section. + // + dx_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key); + DEQ_REMOVE(router->addrs, addr); + dx_hash_handle_free(addr->hash_handle); + free_dx_address_t(addr); + } + + // + // If we're not deleting but there are no more locals, get a copy of the hash key. + // + if (!to_delete && no_more_locals) { + const unsigned char *key_const = dx_hash_key_by_handle(addr->hash_handle); + key = (unsigned char*) malloc(strlen((const char*) key_const) + 1); + strcpy((char*) key, (const char*) key_const); + } + + sys_mutex_unlock(router->lock); + // + // If the address is mobile-class and it was just removed from a local link, + // tell the router module that it is no longer attached locally. + // + if (no_more_locals && key && key[0] == 'M') + dx_router_mobile_removed(router, (const char*) key); -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_router_link_list_t in_links; - dx_router_node_list_t routers; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; + // + // Free the key that was not freed by the hash table. + // + if (key) + free(key); +} + + +/** + * Determine whether a connection is configured in the inter-router role. + */ +static int dx_router_connection_is_inter_router(const dx_connection_t *conn) +{ + if (!conn) + return 0; + + const dx_server_config_t *cf = dx_connection_config(conn); + if (cf && strcmp(cf->role, router_role) == 0) + return 1; + + return 0; +} + + +/** + * Determine whether a terminus has router capability + */ +static int dx_router_terminus_is_router(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); + + pn_data_rewind(cap); + pn_data_next(cap); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + if (sym.size == strlen(DX_CAPABILITY_ROUTER) && + strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0) + return 1; + } + + return 0; +} + + +static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + char discriminator[11]; + long int rnd = random(); + int idx; + + for (idx = 0; idx < 6; idx++) + discriminator[idx] = table[(rnd >> (idx * 6)) & 63]; + discriminator[idx] = '\0'; + + snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator); +} + + +static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link) +{ + dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); + if (shared) + return shared->mask_bit; + + int mask_bit; + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return -1; + } + + shared = new_dx_router_conn_t(); + shared->mask_bit = mask_bit; + dx_link_set_conn_context(link, shared); + return mask_bit; +} /** @@ -191,7 +302,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) DEQ_REMOVE_HEAD(to_send); // - // Get a delivery for the send. This will be the current deliver on the link. + // Get a delivery for the send. This will be the current delivery on the link. // tag++; delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); @@ -215,7 +326,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) pn_link_advance(pn_link); event_count++; - dx_free_message(re->message); + dx_message_free(re->message); free_dx_routed_event_t(re); re = DEQ_HEAD(to_send); } @@ -250,17 +361,18 @@ static int router_writable_link_handler(void* context, dx_link_t *link) } -static void router_annotate_message(dx_router_t *router, dx_message_t *msg) +static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop) { - dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); - dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); + dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); + dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); + dx_field_iterator_t *ingress_iter = 0; dx_parsed_field_t *trace = 0; dx_parsed_field_t *ingress = 0; if (in_da) { - trace = dx_parse_value_by_key(in_da, "qdx.trace"); - ingress = dx_parse_value_by_key(in_da, "qdx.ingress"); + trace = dx_parse_value_by_key(in_da, DX_DA_TRACE); + ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS); } dx_compose_start_map(out_da); @@ -268,38 +380,48 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // // If there is a trace field, append this router's ID to the trace. // - if (trace && dx_parse_is_list(trace)) { - dx_compose_insert_string(out_da, "qdx.trace"); - dx_compose_start_list(out_da); - - uint32_t idx = 0; - dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx); - while (trace_item) { - dx_field_iterator_t *iter = dx_parse_raw(trace_item); - dx_compose_insert_string_iterator(out_da, iter); - idx++; - trace_item = dx_parse_sub_value(trace, idx); + dx_compose_insert_string(out_da, DX_DA_TRACE); + dx_compose_start_list(out_da); + if (trace) { + if (dx_parse_is_list(trace)) { + uint32_t idx = 0; + dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx); + while (trace_item) { + dx_field_iterator_t *iter = dx_parse_raw(trace_item); + if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix)) + *drop = 1; + dx_field_iterator_reset(iter); + dx_compose_insert_string_iterator(out_da, iter); + idx++; + trace_item = dx_parse_sub_value(trace, idx); + } } - - dx_compose_insert_string(out_da, router->router_id); - dx_compose_end_list(out_da); } + dx_compose_insert_string(out_da, direct_prefix); + dx_compose_end_list(out_da); + // // If there is no ingress field, annotate the ingress as this router else // keep the original field. // - dx_compose_insert_string(out_da, "qdx.ingress"); + dx_compose_insert_string(out_da, DX_DA_INGRESS); if (ingress && dx_parse_is_scalar(ingress)) { - dx_field_iterator_t *iter = dx_parse_raw(ingress); - dx_compose_insert_string_iterator(out_da, iter); + ingress_iter = dx_parse_raw(ingress); + dx_compose_insert_string_iterator(out_da, ingress_iter); } else - dx_compose_insert_string(out_da, router->router_id); + dx_compose_insert_string(out_da, direct_prefix); dx_compose_end_map(out_da); dx_message_set_delivery_annotations(msg, out_da); dx_compose_free(out_da); + + // + // Return the iterator to the ingress field _if_ it was present. + // If we added the ingress, return NULL. + // + return ingress_iter; } @@ -318,10 +440,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // Receive the message into a local representation. If the returned message // pointer is NULL, we have not yet received a complete message. // - sys_mutex_lock(router->lock); msg = dx_message_receive(delivery); - sys_mutex_unlock(router->lock); - if (!msg) return; @@ -380,84 +499,155 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - hash_retrieve(router->out_hash, iter, (void*) &addr); + + // + // Note: This function is going to need to be refactored so we can put an + // asynchronous address lookup here. In the event there is a translation + // of the address (via namespace), it will have to be done here after + // obtaining the iterator and before doing the hash lookup. + // + // Note that this lookup is only done for global/mobile class addresses. + // + + dx_hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); - int is_local = dx_field_iterator_prefix(iter, local_prefix); + int is_local = dx_field_iterator_prefix(iter, local_prefix); + int is_direct = dx_field_iterator_prefix(iter, direct_prefix); dx_field_iterator_free(iter); if (addr) { // + // If the incoming link is an endpoint link, count this as an ingress delivery. + // + if (rlink->link_type == DX_LINK_ENDPOINT) + addr->deliveries_ingress++; + + // // To field is valid and contains a known destination. Handle the various // cases for forwarding. // // - // Interpret and update the delivery annotations of the message + // Interpret and update the delivery annotations of the message. As a convenience, + // this function returns the iterator to the ingress field (if it exists). // - router_annotate_message(router, msg); + int drop = 0; + dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop); // // Forward to the in-process handler for this message if there is one. The // actual invocation of the handler will occur later after we've released // the lock. // - if (addr->handler) { + if (!drop && addr->handler) { in_process_copy = dx_message_copy(msg); handler = addr->handler; handler_context = addr->handler_context; + addr->deliveries_to_container++; } // // If the address form is local (i.e. is prefixed by _local), don't forward // outside of the router process. // - if (!is_local) { + if (!drop && !is_local) { // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); fanout++; if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + addr->deliveries_egress++; + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // - // Forward to the next-hops for remote destinations. + // If the address form is direct to this router node, don't relay it on + // to any other part of the network. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; - else - dest_link = dest_node->peer_link; - if (dest_link) { - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = dx_message_copy(msg); - re->settle = 0; - re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); - - fanout++; - if (fanout == 1) - re->delivery = delivery; - - dx_link_activate(dest_link->link); + if (!is_direct) { + // + // Get the mask bit associated with the ingress router for the message. + // This will be compared against the "valid_origin" masks for each + // candidate destination router. + // + int origin = -1; + if (ingress_iter) { + dx_field_iterator_reset_view(ingress_iter, ITER_VIEW_ADDRESS_HASH); + dx_address_t *origin_addr; + dx_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr); + if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) { + dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes); + origin = rref->router->mask_bit; + } + } else + origin = 0; + + // + // Forward to the next-hops for remote destinations. + // + if (origin >= 0) { + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + dx_bitmask_t *link_set = dx_bitmask(0); + + // + // Loop over the target nodes for this address. Build a set of outgoing links + // for which there are valid targets. We do this to avoid sending more than one + // message down a given link. It's possible that there are multiple destinations + // for this address that are all reachable over the same link. In this case, we + // will send only one copy of the message over the link and allow a downstream + // router to fan the message out. + // + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; + else + dest_link = dest_node_ref->router->peer_link; + if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) + dx_bitmask_set_bit(link_set, dest_link->mask_bit); + dest_node_ref = DEQ_NEXT(dest_node_ref); + } + + // + // Send a copy of the message outbound on each identified link. + // + int link_bit; + while (dx_bitmask_first_set(link_set, &link_bit)) { + dx_bitmask_clear_bit(link_set, link_bit); + dest_link = router->out_links_by_mask_bit[link_bit]; + if (dest_link) { + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = dx_message_copy(msg); + re->settle = 0; + re->disposition = 0; + DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + + fanout++; + if (fanout == 1 && !dx_delivery_settled(delivery)) + re->delivery = delivery; + + addr->deliveries_transit++; + dx_link_activate(dest_link->link); + } + } + + dx_bitmask_free(link_set); } - dest_node = DEQ_NEXT(dest_node); } } } @@ -470,8 +660,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del dx_delivery_free(delivery, PN_ACCEPTED); } else if (fanout == 0) { dx_delivery_free(delivery, PN_RELEASED); - } else if (fanout > 1) - dx_delivery_free(delivery, PN_ACCEPTED); + } } } else { // @@ -481,13 +670,15 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del } sys_mutex_unlock(router->lock); - dx_free_message(msg); + dx_message_free(msg); // // Invoke the in-process handler now that the lock is released. // - if (handler) - handler(handler_context, in_process_copy); + if (handler) { + handler(handler_context, in_process_copy, rlink->mask_bit); + dx_message_free(in_process_copy); + } } @@ -538,28 +729,37 @@ static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *d */ static int router_incoming_link_handler(void* context, dx_link_t *link) { - dx_router_t *router = (dx_router_t*) context; - dx_router_link_t *rlink = new_dx_router_link_t(); - pn_link_t *pn_link = dx_link_pn(link); + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); + + if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) { + dx_log(module, LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection"); + pn_link_close(pn_link); + return 0; + } + dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_INCOMING; - rlink->link_type = DX_LINK_ENDPOINT; rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); + rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0; + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); pn_link_flow(pn_link, 1000); pn_link_open(pn_link); @@ -579,52 +779,125 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) { dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); + int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); + int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); + int propagate = 0; + dx_field_iterator_t *iter = 0; + + if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) { + dx_log(module, LOG_WARNING, "Outgoing link claims router capability but is not on an inter-router connection"); + pn_link_close(pn_link); + return 0; + } - if (!r_tgt) { + // + // If this link is not a router link and it has no source address, we can't + // accept it. + // + if (r_src == 0 && !is_router && !is_dynamic) { pn_link_close(pn_link); return 0; } - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink = new_dx_router_link_t(); - int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); + // + // If this is an endpoint link with a source address, make sure the address is + // appropriate for endpoint links. If it is not mobile address, it cannot be + // bound to an endpoint link. + // + if(r_src && !is_router && !is_dynamic) { + iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + unsigned char prefix = dx_field_iterator_octet(iter); + dx_field_iterator_reset(iter); + if (prefix != 'M') { + dx_field_iterator_free(iter); + pn_link_close(pn_link); + dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src); + return 0; + } + } + + // + // Create a router_link record for this link. Some of the fields will be + // modified in the different cases below. + // + dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); - - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0; + + if (is_router) { + // + // If this is a router link, put it in the hello_address link-list. + // + dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink); + rlink->owning_addr = router->hello_addr; + router->out_links_by_mask_bit[rlink->mask_bit] = rlink; + + } else { + // + // If this is an endpoint link, check the source. If it is dynamic, we will + // assign it an ephemeral and routable address. If it has a non-dymanic + // address, that address needs to be set up in the address list. + // + char temp_addr[1000]; // FIXME + dx_address_t *addr; + + if (is_dynamic) { + dx_router_generate_temp_addr(router, temp_addr, 1000); + iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + pn_terminus_set_address(dx_link_source(link), temp_addr); + dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); + } else + dx_log(module, LOG_INFO, "Registered local address: %s", r_src); + + dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); + DEQ_ITEM_INIT(addr); + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(router->addrs, addr); + } + + rlink->owning_addr = addr; + dx_router_add_link_ref_LH(&addr->rlinks, rlink); + + // + // If this is not a dynamic address and it is the first local subscription + // to the address, supply the address to the router module for propagation + // to other nodes. + // + propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1); } - dx_field_iterator_free(iter); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + DEQ_INSERT_TAIL(router->links, rlink); + sys_mutex_unlock(router->lock); + + if (propagate) + dx_router_mobile_added(router, iter); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + if (iter) + dx_field_iterator_free(iter); pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -634,40 +907,60 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link); + dx_address_t *oaddr = 0; + + if (shared) { + dx_link_set_conn_context(link, 0); + free_dx_router_conn_t(shared); + } if (!rlink) return 0; sys_mutex_lock(router->lock); - if (pn_link_is_sender(pn_link)) { - DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); - - if ((rlink->owning_addr->handler == 0) && - (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && - (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr == rlink->owning_addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); - } - dx_field_iterator_free(iter); - } - } - } else { - DEQ_REMOVE(router->in_links, rlink); - free_dx_router_link_t(rlink); + + // + // If the link is outgoing, we must disassociate it from its address. + // + if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { + dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); + oaddr = rlink->owning_addr; } + // + // If this is an outgoing inter-router link, we must remove the by-mask-bit + // index reference to this link. + // + if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_OUTGOING) { + if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink) + router->out_links_by_mask_bit[rlink->mask_bit] = 0; + else + dx_log(module, LOG_CRITICAL, "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit); + } + + // + // If this is an incoming inter-router link, we must free the mask_bit. + // + if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING) + dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit); + + // + // Remove the link from the master list-of-links. + // + DEQ_REMOVE(router->links, rlink); sys_mutex_unlock(router->lock); + + // + // Check to see if the owning address should be deleted + // + dx_router_check_addr(router, oaddr, 1); + + // TODO - wrap the free to handle the recursive items + free_dx_router_link_t(rlink); + return 0; } @@ -679,28 +972,47 @@ static void router_inbound_open_handler(void *type_context, dx_connection_t *con static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) { - // TODO - Make sure this connection is annotated as an inter-router transport. - // Ignore otherwise + // + // Check the configured role of this connection. If it is not the inter-router + // role, ignore it. + // + if (!dx_router_connection_is_inter_router(conn)) { + dx_log(module, LOG_WARNING, "Outbound connection set up without inter-router role"); + return; + } dx_router_t *router = (dx_router_t*) type_context; - dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); dx_link_t *sender; dx_link_t *receiver; dx_router_link_t *rlink; + int mask_bit = 0; + size_t clen = strlen(DX_CAPABILITY_ROUTER); // - // Create an incoming link and put it in the in-links collection. The address - // of the remote source of this link is '_local/qdxrouter'. + // Allocate a mask bit to designate the pair of links connected to the neighbor router // - receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); - pn_terminus_set_address(dx_link_remote_source(receiver), router_address); - pn_terminus_set_address(dx_link_target(receiver), router_address); + sys_mutex_lock(router->lock); + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + sys_mutex_unlock(router->lock); + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return; + } - rlink = new_dx_router_link_t(); + // + // Create an incoming link with router source capability + // + receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER)); + rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_INCOMING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_INCOMING; rlink->owning_addr = 0; rlink->link = receiver; rlink->connected_link = 0; @@ -709,53 +1021,46 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co DEQ_INIT(rlink->msg_fifo); dx_link_set_context(receiver, rlink); - - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); - sys_mutex_unlock(router->lock); + DEQ_INSERT_TAIL(router->links, rlink); // - // Create an outgoing link with a local source of '_local/qdxrouter' and place - // it in the routing table. + // Create an outgoing link with router target capability // - sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); - pn_terminus_set_address(dx_link_remote_target(sender), router_address); - pn_terminus_set_address(dx_link_source(sender), router_address); + sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = router->hello_addr; rlink->link = sender; rlink->connected_link = 0; rlink->peer_link = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); - dx_link_set_context(sender, rlink); - - dx_address_t *addr; + // + // Add the new outgoing link to the hello_address's list of links. + // + dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink); - sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, aiter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, aiter, addr); - } + // + // Index this link from the by-maskbit index so we can later find it quickly + // when provided with the mask bit. + // + router->out_links_by_mask_bit[mask_bit] = rlink; - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + dx_link_set_context(sender, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); pn_link_open(dx_link_pn(receiver)); pn_link_open(dx_link_pn(sender)); pn_link_flow(dx_link_pn(receiver), 1000); - dx_field_iterator_free(aiter); } @@ -767,7 +1072,6 @@ static void dx_router_timer_handler(void *context) // Periodic processing. // dx_pyrouter_tick(router); - dx_timer_schedule(router->timer, 1000); } @@ -786,31 +1090,61 @@ static dx_node_type_t router_node = {"router", 0, 0, static int type_registered = 0; -dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) +dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *area, const char *id) { if (!type_registered) { type_registered = 1; dx_container_register_node_type(dx, &router_node); } + size_t dplen = 9 + strlen(area) + strlen(id); + direct_prefix = (char*) malloc(dplen); + strcpy(direct_prefix, "_topo/"); + strcat(direct_prefix, area); + strcat(direct_prefix, "/"); + strcat(direct_prefix, id); + strcat(direct_prefix, "/"); + dx_router_t *router = NEW(dx_router_t); router_node.type_context = router; + dx->router = router; router->dx = dx; + router->router_mode = mode; router->router_area = area; router->router_id = id; router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->in_links); + DEQ_INIT(router->addrs); + router->addr_hash = dx_hash(10, 32, 0); + + DEQ_INIT(router->links); DEQ_INIT(router->routers); - DEQ_INIT(router->in_fifo); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; + router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width()); + router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width()); + for (int idx = 0; idx < dx_bitmask_width(); idx++) { + router->out_links_by_mask_bit[idx] = 0; + router->routers_by_mask_bit[idx] = 0; + } + + router->neighbor_free_mask = dx_bitmask(1); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; + router->pyAdded = 0; + router->pyRemoved = 0; + + // + // Create addresses for all of the routers in the topology. It will be registered + // locally later in the initialization sequence. + // + if (router->router_mode == DX_ROUTER_MODE_INTERIOR) { + router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); + router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0); + } // // Inform the field iterator module of this router's id and area. The field iterator @@ -823,18 +1157,21 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) // dx_python_start(); - dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); + switch (router->router_mode) { + case DX_ROUTER_MODE_STANDALONE: dx_log(module, LOG_INFO, "Router started in Standalone mode"); break; + case DX_ROUTER_MODE_INTERIOR: dx_log(module, LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break; + case DX_ROUTER_MODE_EDGE: dx_log(module, LOG_INFO, "Router started in Edge mode"); break; + } return router; } -void dx_router_setup_agent(dx_dispatch_t *dx) +void dx_router_setup_late(dx_dispatch_t *dx) { + dx_router_agent_setup(dx->router); dx_router_python_setup(dx->router); dx_timer_schedule(dx->router->timer, 1000); - - // TODO } @@ -849,8 +1186,7 @@ void dx_router_free(dx_router_t *router) const char *dx_router_id(const dx_dispatch_t *dx) { - dx_router_t *router = dx->router; - return router->router_id; + return direct_prefix; } @@ -869,14 +1205,16 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); + dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; + memset(addr, 0, sizeof(dx_address_t)); + DEQ_ITEM_INIT(addr); DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); } dx_field_iterator_free(iter); @@ -885,7 +1223,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + if (handler) + dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address); return addr; } @@ -905,34 +1244,49 @@ void dx_router_send(dx_dispatch_t *dx, dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, address, (void*) &addr); + dx_hash_retrieve(router->addr_hash, address, (void*) &addr); if (addr) { // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + addr->deliveries_from_container++; + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + addr->deliveries_egress++; + + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + dx_bitmask_t *link_set = dx_bitmask(0); + + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; + if (dest_link) + dx_bitmask_set_bit(link_set, dest_link->mask_bit); + dest_node_ref = DEQ_NEXT(dest_node_ref); + } + + int link_bit; + while (dx_bitmask_first_set(link_set, &link_bit)) { + dx_bitmask_clear_bit(link_set, link_bit); + dest_link = router->out_links_by_mask_bit[link_bit]; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -942,9 +1296,11 @@ void dx_router_send(dx_dispatch_t *dx, re->disposition = 0; DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); + addr->deliveries_transit++; } - dest_node = DEQ_NEXT(dest_node); } + + dx_bitmask_free(link_set); } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? } @@ -959,222 +1315,3 @@ void dx_router_send2(dx_dispatch_t *dx, dx_field_iterator_free(iter); } - -//=============================================================================== -// Python Router Adapter -//=============================================================================== - -typedef struct { - PyObject_HEAD - dx_router_t *router; -} RouterAdapter; - - -static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) -{ - //RouterAdapter *adapter = (RouterAdapter*) self; - //dx_router_t *router = adapter->router; - const char *address; - int is_reachable; - int is_neighbor; - - if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) - return 0; - - // TODO - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_router_add_route(PyObject *self, PyObject *args) -{ - //RouterAdapter *adapter = (RouterAdapter*) self; - const char *addr; - const char *peer; - - if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) - return 0; - - // TODO - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyObject* dx_router_del_route(PyObject *self, PyObject *args) -{ - //RouterAdapter *adapter = (RouterAdapter*) self; - const char *addr; - const char *peer; - - if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) - return 0; - - // TODO - - Py_INCREF(Py_None); - return Py_None; -} - - -static PyMethodDef RouterAdapter_methods[] = { - {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"}, - {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, - {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, - {0, 0, 0, 0} -}; - -static PyTypeObject RouterAdapterType = { - PyObject_HEAD_INIT(0) - 0, /* ob_size*/ - "dispatch.RouterAdapter", /* tp_name*/ - sizeof(RouterAdapter), /* tp_basicsize*/ - 0, /* tp_itemsize*/ - 0, /* tp_dealloc*/ - 0, /* tp_print*/ - 0, /* tp_getattr*/ - 0, /* tp_setattr*/ - 0, /* tp_compare*/ - 0, /* tp_repr*/ - 0, /* tp_as_number*/ - 0, /* tp_as_sequence*/ - 0, /* tp_as_mapping*/ - 0, /* tp_hash */ - 0, /* tp_call*/ - 0, /* tp_str*/ - 0, /* tp_getattro*/ - 0, /* tp_setattro*/ - 0, /* tp_as_buffer*/ - Py_TPFLAGS_DEFAULT, /* tp_flags*/ - "Dispatch Router Adapter", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - RouterAdapter_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0, /* tp_alloc */ - 0, /* tp_new */ - 0, /* tp_free */ - 0, /* tp_is_gc */ - 0, /* tp_bases */ - 0, /* tp_mro */ - 0, /* tp_cache */ - 0, /* tp_subclasses */ - 0, /* tp_weaklist */ - 0, /* tp_del */ - 0 /* tp_version_tag */ -}; - - -static void dx_router_python_setup(dx_router_t *router) -{ - PyObject *pDispatchModule = dx_python_module(); - - RouterAdapterType.tp_new = PyType_GenericNew; - if (PyType_Ready(&RouterAdapterType) < 0) { - PyErr_Print(); - dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter"); - return; - } - - Py_INCREF(&RouterAdapterType); - PyModule_AddObject(pDispatchModule, "RouterAdapter", (PyObject*) &RouterAdapterType); - - // - // Attempt to import the Python Router module - // - PyObject* pName; - PyObject* pId; - PyObject* pArea; - PyObject* pModule; - PyObject* pClass; - PyObject* pArgs; - - pName = PyString_FromString("qpid.dispatch.router"); - pModule = PyImport_Import(pName); - Py_DECREF(pName); - if (!pModule) { - dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module"); - return; - } - - pClass = PyObject_GetAttrString(pModule, "RouterEngine"); - if (!pClass || !PyClass_Check(pClass)) { - dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module"); - return; - } - - PyObject *adapterType = PyObject_GetAttrString(pDispatchModule, "RouterAdapter"); - PyObject *adapterInstance = PyObject_CallObject(adapterType, 0); - assert(adapterInstance); - - ((RouterAdapter*) adapterInstance)->router = router; - - // - // Constructor Arguments for RouterEngine - // - pArgs = PyTuple_New(3); - - // arg 0: adapter instance - PyTuple_SetItem(pArgs, 0, adapterInstance); - - // arg 1: router_id - pId = PyString_FromString(router->router_id); - PyTuple_SetItem(pArgs, 1, pId); - - // arg 2: area id - pArea = PyString_FromString(router->router_area); - PyTuple_SetItem(pArgs, 2, pArea); - - // - // Instantiate the router - // - router->pyRouter = PyInstance_New(pClass, pArgs, 0); - Py_DECREF(pArgs); - Py_DECREF(adapterType); - - if (!router->pyRouter) { - PyErr_Print(); - dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated"); - return; - } - - router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick"); - if (!router->pyTick || !PyCallable_Check(router->pyTick)) { - dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method"); - return; - } -} - - -static void dx_pyrouter_tick(dx_router_t *router) -{ - PyObject *pArgs; - PyObject *pValue; - - if (router->pyTick) { - pArgs = PyTuple_New(0); - pValue = PyObject_CallObject(router->pyTick, pArgs); - if (PyErr_Occurred()) { - PyErr_Print(); - } - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); - } - } -} - |