summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/router_node.c
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
-rw-r--r--qpid/extras/dispatch/src/router_node.c1107
1 files changed, 622 insertions, 485 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index d2704c4bd5..931f4551bb 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -21,17 +21,17 @@
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
+#include <stdlib.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
+#include "router_private.h"
static char *module = "ROUTER";
-static void dx_router_python_setup(dx_router_t *router);
-static void dx_pyrouter_tick(dx_router_t *router);
-
-static char *router_address = "_local/qdxrouter";
-static char *local_prefix = "_local/";
-//static char *topo_prefix = "_topo/";
+static char *router_role = "inter-router";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
+static char *direct_prefix;
/**
* Address Types and Processing:
@@ -48,86 +48,197 @@ static char *local_prefix = "_local/";
* <mobile> M<mobile> forward handler
*/
+ALLOC_DEFINE(dx_routed_event_t);
+ALLOC_DEFINE(dx_router_link_t);
+ALLOC_DEFINE(dx_router_node_t);
+ALLOC_DEFINE(dx_router_ref_t);
+ALLOC_DEFINE(dx_router_link_ref_t);
+ALLOC_DEFINE(dx_address_t);
+ALLOC_DEFINE(dx_router_conn_t);
-typedef struct dx_router_link_t dx_router_link_t;
-typedef struct dx_router_node_t dx_router_node_t;
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+ dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->link = link;
+ link->ref = ref;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
-typedef enum {
- DX_LINK_ENDPOINT, // A link to a connected endpoint
- DX_LINK_ROUTER, // A link to a peer router in the same area
- DX_LINK_AREA // A link to a peer router in a different area (area boundary)
-} dx_link_type_t;
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+ if (link->ref) {
+ DEQ_REMOVE(*ref_list, link->ref);
+ free_dx_router_link_ref_t(link->ref);
+ link->ref = 0;
+ }
+}
-typedef struct dx_routed_event_t {
- DEQ_LINKS(struct dx_routed_event_t);
- dx_delivery_t *delivery;
- dx_message_t *message;
- bool settle;
- uint64_t disposition;
-} dx_routed_event_t;
-ALLOC_DECLARE(dx_routed_event_t);
-ALLOC_DEFINE(dx_routed_event_t);
-DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
-
-
-struct dx_router_link_t {
- DEQ_LINKS(dx_router_link_t);
- dx_direction_t link_direction;
- dx_link_type_t link_type;
- dx_address_t *owning_addr; // [ref] Address record that owns this link
- dx_link_t *link; // [own] Link pointer
- dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
- dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
- dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
- dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
-};
-
-ALLOC_DECLARE(dx_router_link_t);
-ALLOC_DEFINE(dx_router_link_t);
-DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = new_dx_router_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->router = rnode;
+ rnode->ref_count++;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
-struct dx_router_node_t {
- DEQ_LINKS(dx_router_node_t);
- const char *id;
- dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
- dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
- // list of valid origins (pointers to router_node) - (bit masks?)
-};
-ALLOC_DECLARE(dx_router_node_t);
-ALLOC_DEFINE(dx_router_node_t);
-DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = DEQ_HEAD(*ref_list);
+ while (ref) {
+ if (ref->router == rnode) {
+ DEQ_REMOVE(*ref_list, ref);
+ free_dx_router_ref_t(ref);
+ rnode->ref_count--;
+ break;
+ }
+ ref = DEQ_NEXT(ref);
+ }
+}
-struct dx_address_t {
- dx_router_message_cb handler; // In-Process Consumer
- void *handler_context;
- dx_router_link_list_t rlinks; // Locally-Connected Consumers
- dx_router_node_list_t rnodes; // Remotely-Connected Consumers
-};
+/**
+ * Check an address to see if it no longer has any associated destinations.
+ * Depending on its policy, the address may be eligible for being closed out
+ * (i.e. Logging its terminal statistics and freeing its resources).
+ */
+void dx_router_check_addr(dx_router_t *router, dx_address_t *addr, int was_local)
+{
+ if (addr == 0)
+ return;
-ALLOC_DECLARE(dx_address_t);
-ALLOC_DEFINE(dx_address_t);
+ unsigned char *key = 0;
+ int to_delete = 0;
+ int no_more_locals = 0;
+
+ sys_mutex_lock(router->lock);
+
+ //
+ // If the address has no handlers or destinations, it should be deleted.
+ //
+ if (addr->handler == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0)
+ to_delete = 1;
+
+ //
+ // If we have just removed a local linkage and it was the last local linkage,
+ // we need to notify the router module that there is no longer a local
+ // presence of this address.
+ //
+ if (was_local && DEQ_SIZE(addr->rlinks) == 0)
+ no_more_locals = 1;
+
+ if (to_delete) {
+ //
+ // Delete the address but grab the hash key so we can use it outside the
+ // critical section.
+ //
+ dx_hash_remove_by_handle2(router->addr_hash, addr->hash_handle, &key);
+ DEQ_REMOVE(router->addrs, addr);
+ dx_hash_handle_free(addr->hash_handle);
+ free_dx_address_t(addr);
+ }
+
+ //
+ // If we're not deleting but there are no more locals, get a copy of the hash key.
+ //
+ if (!to_delete && no_more_locals) {
+ const unsigned char *key_const = dx_hash_key_by_handle(addr->hash_handle);
+ key = (unsigned char*) malloc(strlen((const char*) key_const) + 1);
+ strcpy((char*) key, (const char*) key_const);
+ }
+
+ sys_mutex_unlock(router->lock);
+ //
+ // If the address is mobile-class and it was just removed from a local link,
+ // tell the router module that it is no longer attached locally.
+ //
+ if (no_more_locals && key && key[0] == 'M')
+ dx_router_mobile_removed(router, (const char*) key);
-struct dx_router_t {
- dx_dispatch_t *dx;
- const char *router_area;
- const char *router_id;
- dx_node_t *node;
- dx_router_link_list_t in_links;
- dx_router_node_list_t routers;
- dx_message_list_t in_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- hash_t *out_hash;
- uint64_t dtag;
- PyObject *pyRouter;
- PyObject *pyTick;
-};
+ //
+ // Free the key that was not freed by the hash table.
+ //
+ if (key)
+ free(key);
+}
+
+
+/**
+ * Determine whether a connection is configured in the inter-router role.
+ */
+static int dx_router_connection_is_inter_router(const dx_connection_t *conn)
+{
+ if (!conn)
+ return 0;
+
+ const dx_server_config_t *cf = dx_connection_config(conn);
+ if (cf && strcmp(cf->role, router_role) == 0)
+ return 1;
+
+ return 0;
+}
+
+
+/**
+ * Determine whether a terminus has router capability
+ */
+static int dx_router_terminus_is_router(pn_terminus_t *term)
+{
+ pn_data_t *cap = pn_terminus_capabilities(term);
+
+ pn_data_rewind(cap);
+ pn_data_next(cap);
+ if (cap && pn_data_type(cap) == PN_SYMBOL) {
+ pn_bytes_t sym = pn_data_get_symbol(cap);
+ if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
+ strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0)
+ return 1;
+ }
+
+ return 0;
+}
+
+
+static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length)
+{
+ static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
+ char discriminator[11];
+ long int rnd = random();
+ int idx;
+
+ for (idx = 0; idx < 6; idx++)
+ discriminator[idx] = table[(rnd >> (idx * 6)) & 63];
+ discriminator[idx] = '\0';
+
+ snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
+}
+
+
+static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link)
+{
+ dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+ if (shared)
+ return shared->mask_bit;
+
+ int mask_bit;
+ if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+ dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+ } else {
+ dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+ return -1;
+ }
+
+ shared = new_dx_router_conn_t();
+ shared->mask_bit = mask_bit;
+ dx_link_set_conn_context(link, shared);
+ return mask_bit;
+}
/**
@@ -191,7 +302,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
DEQ_REMOVE_HEAD(to_send);
//
- // Get a delivery for the send. This will be the current deliver on the link.
+ // Get a delivery for the send. This will be the current delivery on the link.
//
tag++;
delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
@@ -215,7 +326,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
pn_link_advance(pn_link);
event_count++;
- dx_free_message(re->message);
+ dx_message_free(re->message);
free_dx_routed_event_t(re);
re = DEQ_HEAD(to_send);
}
@@ -250,17 +361,18 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop)
{
- dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
- dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_field_iterator_t *ingress_iter = 0;
dx_parsed_field_t *trace = 0;
dx_parsed_field_t *ingress = 0;
if (in_da) {
- trace = dx_parse_value_by_key(in_da, "qdx.trace");
- ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+ trace = dx_parse_value_by_key(in_da, DX_DA_TRACE);
+ ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS);
}
dx_compose_start_map(out_da);
@@ -268,38 +380,48 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
//
// If there is a trace field, append this router's ID to the trace.
//
- if (trace && dx_parse_is_list(trace)) {
- dx_compose_insert_string(out_da, "qdx.trace");
- dx_compose_start_list(out_da);
-
- uint32_t idx = 0;
- dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
- while (trace_item) {
- dx_field_iterator_t *iter = dx_parse_raw(trace_item);
- dx_compose_insert_string_iterator(out_da, iter);
- idx++;
- trace_item = dx_parse_sub_value(trace, idx);
+ dx_compose_insert_string(out_da, DX_DA_TRACE);
+ dx_compose_start_list(out_da);
+ if (trace) {
+ if (dx_parse_is_list(trace)) {
+ uint32_t idx = 0;
+ dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
+ while (trace_item) {
+ dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix))
+ *drop = 1;
+ dx_field_iterator_reset(iter);
+ dx_compose_insert_string_iterator(out_da, iter);
+ idx++;
+ trace_item = dx_parse_sub_value(trace, idx);
+ }
}
-
- dx_compose_insert_string(out_da, router->router_id);
- dx_compose_end_list(out_da);
}
+ dx_compose_insert_string(out_da, direct_prefix);
+ dx_compose_end_list(out_da);
+
//
// If there is no ingress field, annotate the ingress as this router else
// keep the original field.
//
- dx_compose_insert_string(out_da, "qdx.ingress");
+ dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
- dx_field_iterator_t *iter = dx_parse_raw(ingress);
- dx_compose_insert_string_iterator(out_da, iter);
+ ingress_iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, ingress_iter);
} else
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_map(out_da);
dx_message_set_delivery_annotations(msg, out_da);
dx_compose_free(out_da);
+
+ //
+ // Return the iterator to the ingress field _if_ it was present.
+ // If we added the ingress, return NULL.
+ //
+ return ingress_iter;
}
@@ -318,10 +440,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
// Receive the message into a local representation. If the returned message
// pointer is NULL, we have not yet received a complete message.
//
- sys_mutex_lock(router->lock);
msg = dx_message_receive(delivery);
- sys_mutex_unlock(router->lock);
-
if (!msg)
return;
@@ -380,84 +499,155 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- hash_retrieve(router->out_hash, iter, (void*) &addr);
+
+ //
+ // Note: This function is going to need to be refactored so we can put an
+ // asynchronous address lookup here. In the event there is a translation
+ // of the address (via namespace), it will have to be done here after
+ // obtaining the iterator and before doing the hash lookup.
+ //
+ // Note that this lookup is only done for global/mobile class addresses.
+ //
+
+ dx_hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
- int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_direct = dx_field_iterator_prefix(iter, direct_prefix);
dx_field_iterator_free(iter);
if (addr) {
//
+ // If the incoming link is an endpoint link, count this as an ingress delivery.
+ //
+ if (rlink->link_type == DX_LINK_ENDPOINT)
+ addr->deliveries_ingress++;
+
+ //
// To field is valid and contains a known destination. Handle the various
// cases for forwarding.
//
//
- // Interpret and update the delivery annotations of the message
+ // Interpret and update the delivery annotations of the message. As a convenience,
+ // this function returns the iterator to the ingress field (if it exists).
//
- router_annotate_message(router, msg);
+ int drop = 0;
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
//
// Forward to the in-process handler for this message if there is one. The
// actual invocation of the handler will occur later after we've released
// the lock.
//
- if (addr->handler) {
+ if (!drop && addr->handler) {
in_process_copy = dx_message_copy(msg);
handler = addr->handler;
handler_context = addr->handler_context;
+ addr->deliveries_to_container++;
}
//
// If the address form is local (i.e. is prefixed by _local), don't forward
// outside of the router process.
//
- if (!is_local) {
+ if (!drop && !is_local) {
//
// Forward to all of the local links receiving this address.
//
- dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
- while (dest_link) {
+ dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = dx_message_copy(msg);
re->settle = 0;
re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
fanout++;
if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
- dx_link_activate(dest_link->link);
- dest_link = DEQ_NEXT(dest_link);
+ addr->deliveries_egress++;
+ dx_link_activate(dest_link_ref->link->link);
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
- // Forward to the next-hops for remote destinations.
+ // If the address form is direct to this router node, don't relay it on
+ // to any other part of the network.
//
- dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
- while (dest_node) {
- if (dest_node->next_hop)
- dest_link = dest_node->next_hop->peer_link;
- else
- dest_link = dest_node->peer_link;
- if (dest_link) {
- dx_routed_event_t *re = new_dx_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = dx_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
- fanout++;
- if (fanout == 1)
- re->delivery = delivery;
-
- dx_link_activate(dest_link->link);
+ if (!is_direct) {
+ //
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
+ //
+ int origin = -1;
+ if (ingress_iter) {
+ dx_field_iterator_reset_view(ingress_iter, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *origin_addr;
+ dx_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+ dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+ origin = rref->router->mask_bit;
+ }
+ } else
+ origin = 0;
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ if (origin >= 0) {
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ dx_bitmask_t *link_set = dx_bitmask(0);
+
+ //
+ // Loop over the target nodes for this address. Build a set of outgoing links
+ // for which there are valid targets. We do this to avoid sending more than one
+ // message down a given link. It's possible that there are multiple destinations
+ // for this address that are all reachable over the same link. In this case, we
+ // will send only one copy of the message over the link and allow a downstream
+ // router to fan the message out.
+ //
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
+ else
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin))
+ dx_bitmask_set_bit(link_set, dest_link->mask_bit);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
+
+ //
+ // Send a copy of the message outbound on each identified link.
+ //
+ int link_bit;
+ while (dx_bitmask_first_set(link_set, &link_bit)) {
+ dx_bitmask_clear_bit(link_set, link_bit);
+ dest_link = router->out_links_by_mask_bit[link_bit];
+ if (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1 && !dx_delivery_settled(delivery))
+ re->delivery = delivery;
+
+ addr->deliveries_transit++;
+ dx_link_activate(dest_link->link);
+ }
+ }
+
+ dx_bitmask_free(link_set);
}
- dest_node = DEQ_NEXT(dest_node);
}
}
}
@@ -470,8 +660,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_delivery_free(delivery, PN_ACCEPTED);
} else if (fanout == 0) {
dx_delivery_free(delivery, PN_RELEASED);
- } else if (fanout > 1)
- dx_delivery_free(delivery, PN_ACCEPTED);
+ }
}
} else {
//
@@ -481,13 +670,15 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
}
sys_mutex_unlock(router->lock);
- dx_free_message(msg);
+ dx_message_free(msg);
//
// Invoke the in-process handler now that the lock is released.
//
- if (handler)
- handler(handler_context, in_process_copy);
+ if (handler) {
+ handler(handler_context, in_process_copy, rlink->mask_bit);
+ dx_message_free(in_process_copy);
+ }
}
@@ -538,28 +729,37 @@ static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *d
*/
static int router_incoming_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- dx_router_link_t *rlink = new_dx_router_link_t();
- pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ int is_router = dx_router_terminus_is_router(dx_link_remote_source(link));
+
+ if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) {
+ dx_log(module, LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection");
+ pn_link_close(pn_link);
+ return 0;
+ }
+ dx_router_link_t *rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
+ rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_INCOMING;
- rlink->link_type = DX_LINK_ENDPOINT;
rlink->owning_addr = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, rlink);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
+ DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+ pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
pn_link_flow(pn_link, 1000);
pn_link_open(pn_link);
@@ -579,52 +779,125 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
{
dx_router_t *router = (dx_router_t*) context;
pn_link_t *pn_link = dx_link_pn(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ const char *r_src = pn_terminus_get_address(dx_link_remote_source(link));
+ int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link));
+ int is_router = dx_router_terminus_is_router(dx_link_remote_target(link));
+ int propagate = 0;
+ dx_field_iterator_t *iter = 0;
+
+ if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) {
+ dx_log(module, LOG_WARNING, "Outgoing link claims router capability but is not on an inter-router connection");
+ pn_link_close(pn_link);
+ return 0;
+ }
- if (!r_tgt) {
+ //
+ // If this link is not a router link and it has no source address, we can't
+ // accept it.
+ //
+ if (r_src == 0 && !is_router && !is_dynamic) {
pn_link_close(pn_link);
return 0;
}
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- dx_router_link_t *rlink = new_dx_router_link_t();
- int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
+ //
+ // If this is an endpoint link with a source address, make sure the address is
+ // appropriate for endpoint links. If it is not mobile address, it cannot be
+ // bound to an endpoint link.
+ //
+ if(r_src && !is_router && !is_dynamic) {
+ iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
+ unsigned char prefix = dx_field_iterator_octet(iter);
+ dx_field_iterator_reset(iter);
+ if (prefix != 'M') {
+ dx_field_iterator_free(iter);
+ pn_link_close(pn_link);
+ dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
+ return 0;
+ }
+ }
+
+ //
+ // Create a router_link record for this link. Some of the fields will be
+ // modified in the different cases below.
+ //
+ dx_router_link_t *rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_OUTGOING;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+ rlink->link_direction = DX_OUTGOING;
+ rlink->owning_addr = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
-
- dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
+ pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+ pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (!addr) {
- addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, iter, addr);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
+
+ if (is_router) {
+ //
+ // If this is a router link, put it in the hello_address link-list.
+ //
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
+ rlink->owning_addr = router->hello_addr;
+ router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
+
+ } else {
+ //
+ // If this is an endpoint link, check the source. If it is dynamic, we will
+ // assign it an ephemeral and routable address. If it has a non-dymanic
+ // address, that address needs to be set up in the address list.
+ //
+ char temp_addr[1000]; // FIXME
+ dx_address_t *addr;
+
+ if (is_dynamic) {
+ dx_router_generate_temp_addr(router, temp_addr, 1000);
+ iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+ pn_terminus_set_address(dx_link_source(link), temp_addr);
+ dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr);
+ } else
+ dx_log(module, LOG_INFO, "Registered local address: %s", r_src);
+
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
+ DEQ_ITEM_INIT(addr);
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ }
+
+ rlink->owning_addr = addr;
+ dx_router_add_link_ref_LH(&addr->rlinks, rlink);
+
+ //
+ // If this is not a dynamic address and it is the first local subscription
+ // to the address, supply the address to the router module for propagation
+ // to other nodes.
+ //
+ propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
}
- dx_field_iterator_free(iter);
- rlink->owning_addr = addr;
- DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ if (propagate)
+ dx_router_mobile_added(router, iter);
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ if (iter)
+ dx_field_iterator_free(iter);
pn_link_open(pn_link);
- sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
return 0;
}
@@ -634,40 +907,60 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = dx_link_pn(link);
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_router_t *router = (dx_router_t*) context;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+ dx_address_t *oaddr = 0;
+
+ if (shared) {
+ dx_link_set_conn_context(link, 0);
+ free_dx_router_conn_t(shared);
+ }
if (!rlink)
return 0;
sys_mutex_lock(router->lock);
- if (pn_link_is_sender(pn_link)) {
- DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
-
- if ((rlink->owning_addr->handler == 0) &&
- (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
- (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
- if (iter) {
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (addr == rlink->owning_addr) {
- hash_remove(router->out_hash, iter);
- free_dx_router_link_t(rlink);
- free_dx_address_t(addr);
- dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
- }
- dx_field_iterator_free(iter);
- }
- }
- } else {
- DEQ_REMOVE(router->in_links, rlink);
- free_dx_router_link_t(rlink);
+
+ //
+ // If the link is outgoing, we must disassociate it from its address.
+ //
+ if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) {
+ dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
+ oaddr = rlink->owning_addr;
}
+ //
+ // If this is an outgoing inter-router link, we must remove the by-mask-bit
+ // index reference to this link.
+ //
+ if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_OUTGOING) {
+ if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink)
+ router->out_links_by_mask_bit[rlink->mask_bit] = 0;
+ else
+ dx_log(module, LOG_CRITICAL, "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
+ }
+
+ //
+ // If this is an incoming inter-router link, we must free the mask_bit.
+ //
+ if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING)
+ dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
+
+ //
+ // Remove the link from the master list-of-links.
+ //
+ DEQ_REMOVE(router->links, rlink);
sys_mutex_unlock(router->lock);
+
+ //
+ // Check to see if the owning address should be deleted
+ //
+ dx_router_check_addr(router, oaddr, 1);
+
+ // TODO - wrap the free to handle the recursive items
+ free_dx_router_link_t(rlink);
+
return 0;
}
@@ -679,28 +972,47 @@ static void router_inbound_open_handler(void *type_context, dx_connection_t *con
static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
{
- // TODO - Make sure this connection is annotated as an inter-router transport.
- // Ignore otherwise
+ //
+ // Check the configured role of this connection. If it is not the inter-router
+ // role, ignore it.
+ //
+ if (!dx_router_connection_is_inter_router(conn)) {
+ dx_log(module, LOG_WARNING, "Outbound connection set up without inter-router role");
+ return;
+ }
dx_router_t *router = (dx_router_t*) type_context;
- dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
dx_link_t *sender;
dx_link_t *receiver;
dx_router_link_t *rlink;
+ int mask_bit = 0;
+ size_t clen = strlen(DX_CAPABILITY_ROUTER);
//
- // Create an incoming link and put it in the in-links collection. The address
- // of the remote source of this link is '_local/qdxrouter'.
+ // Allocate a mask bit to designate the pair of links connected to the neighbor router
//
- receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
- pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
- pn_terminus_set_address(dx_link_target(receiver), router_address);
+ sys_mutex_lock(router->lock);
+ if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+ dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+ } else {
+ sys_mutex_unlock(router->lock);
+ dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+ return;
+ }
- rlink = new_dx_router_link_t();
+ //
+ // Create an incoming link with router source capability
+ //
+ receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1);
+ // TODO - We don't want to have to cast away the constness of the literal string here!
+ // See PROTON-429
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER));
+ rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_INCOMING;
+ rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
+ rlink->link_direction = DX_INCOMING;
rlink->owning_addr = 0;
rlink->link = receiver;
rlink->connected_link = 0;
@@ -709,53 +1021,46 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(receiver, rlink);
-
- sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, rlink);
- sys_mutex_unlock(router->lock);
+ DEQ_INSERT_TAIL(router->links, rlink);
//
- // Create an outgoing link with a local source of '_local/qdxrouter' and place
- // it in the routing table.
+ // Create an outgoing link with router target capability
//
- sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
- pn_terminus_set_address(dx_link_remote_target(sender), router_address);
- pn_terminus_set_address(dx_link_source(sender), router_address);
+ sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
+ // TODO - We don't want to have to cast away the constness of the literal string here!
+ // See PROTON-429
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
-
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_OUTGOING;
+ rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
+ rlink->link_direction = DX_OUTGOING;
+ rlink->owning_addr = router->hello_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
- dx_link_set_context(sender, rlink);
-
- dx_address_t *addr;
+ //
+ // Add the new outgoing link to the hello_address's list of links.
+ //
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
- sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, aiter, (void**) &addr);
- if (!addr) {
- addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, aiter, addr);
- }
+ //
+ // Index this link from the by-maskbit index so we can later find it quickly
+ // when provided with the mask bit.
+ //
+ router->out_links_by_mask_bit[mask_bit] = rlink;
- rlink->owning_addr = addr;
- DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ dx_link_set_context(sender, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
pn_link_open(dx_link_pn(receiver));
pn_link_open(dx_link_pn(sender));
pn_link_flow(dx_link_pn(receiver), 1000);
- dx_field_iterator_free(aiter);
}
@@ -767,7 +1072,6 @@ static void dx_router_timer_handler(void *context)
// Periodic processing.
//
dx_pyrouter_tick(router);
-
dx_timer_schedule(router->timer, 1000);
}
@@ -786,31 +1090,61 @@ static dx_node_type_t router_node = {"router", 0, 0,
static int type_registered = 0;
-dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
+dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *area, const char *id)
{
if (!type_registered) {
type_registered = 1;
dx_container_register_node_type(dx, &router_node);
}
+ size_t dplen = 9 + strlen(area) + strlen(id);
+ direct_prefix = (char*) malloc(dplen);
+ strcpy(direct_prefix, "_topo/");
+ strcat(direct_prefix, area);
+ strcat(direct_prefix, "/");
+ strcat(direct_prefix, id);
+ strcat(direct_prefix, "/");
+
dx_router_t *router = NEW(dx_router_t);
router_node.type_context = router;
+ dx->router = router;
router->dx = dx;
+ router->router_mode = mode;
router->router_area = area;
router->router_id = id;
router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
- DEQ_INIT(router->in_links);
+ DEQ_INIT(router->addrs);
+ router->addr_hash = dx_hash(10, 32, 0);
+
+ DEQ_INIT(router->links);
DEQ_INIT(router->routers);
- DEQ_INIT(router->in_fifo);
- router->lock = sys_mutex();
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
- router->pyTick = 0;
+ router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width());
+ router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width());
+ for (int idx = 0; idx < dx_bitmask_width(); idx++) {
+ router->out_links_by_mask_bit[idx] = 0;
+ router->routers_by_mask_bit[idx] = 0;
+ }
+
+ router->neighbor_free_mask = dx_bitmask(1);
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
+ router->pyAdded = 0;
+ router->pyRemoved = 0;
+
+ //
+ // Create addresses for all of the routers in the topology. It will be registered
+ // locally later in the initialization sequence.
+ //
+ if (router->router_mode == DX_ROUTER_MODE_INTERIOR) {
+ router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
+ router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0);
+ }
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -823,18 +1157,21 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
//
dx_python_start();
- dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id);
+ switch (router->router_mode) {
+ case DX_ROUTER_MODE_STANDALONE: dx_log(module, LOG_INFO, "Router started in Standalone mode"); break;
+ case DX_ROUTER_MODE_INTERIOR: dx_log(module, LOG_INFO, "Router started in Interior mode, area=%s id=%s", area, id); break;
+ case DX_ROUTER_MODE_EDGE: dx_log(module, LOG_INFO, "Router started in Edge mode"); break;
+ }
return router;
}
-void dx_router_setup_agent(dx_dispatch_t *dx)
+void dx_router_setup_late(dx_dispatch_t *dx)
{
+ dx_router_agent_setup(dx->router);
dx_router_python_setup(dx->router);
dx_timer_schedule(dx->router->timer, 1000);
-
- // TODO
}
@@ -849,8 +1186,7 @@ void dx_router_free(dx_router_t *router)
const char *dx_router_id(const dx_dispatch_t *dx)
{
- dx_router_t *router = dx->router;
- return router->router_id;
+ return direct_prefix;
}
@@ -869,14 +1205,16 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, iter, (void**) &addr);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
+ memset(addr, 0, sizeof(dx_address_t));
+ DEQ_ITEM_INIT(addr);
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, iter, addr);
+ dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -885,7 +1223,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
+ if (handler)
+ dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address);
return addr;
}
@@ -905,34 +1244,49 @@ void dx_router_send(dx_dispatch_t *dx,
dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, address, (void*) &addr);
+ dx_hash_retrieve(router->addr_hash, address, (void*) &addr);
if (addr) {
//
// Forward to all of the local links receiving this address.
//
- dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
- while (dest_link) {
+ addr->deliveries_from_container++;
+ dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = dx_message_copy(msg);
re->settle = 0;
re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
- dx_link_activate(dest_link->link);
- dest_link = DEQ_NEXT(dest_link);
+ dx_link_activate(dest_link_ref->link->link);
+ addr->deliveries_egress++;
+
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
//
- dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
- while (dest_node) {
- if (dest_node->next_hop)
- dest_link = dest_node->next_hop->peer_link;
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ dx_bitmask_t *link_set = dx_bitmask(0);
+
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
else
- dest_link = dest_node->peer_link;
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link)
+ dx_bitmask_set_bit(link_set, dest_link->mask_bit);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
+
+ int link_bit;
+ while (dx_bitmask_first_set(link_set, &link_bit)) {
+ dx_bitmask_clear_bit(link_set, link_bit);
+ dest_link = router->out_links_by_mask_bit[link_bit];
if (dest_link) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
@@ -942,9 +1296,11 @@ void dx_router_send(dx_dispatch_t *dx,
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
dx_link_activate(dest_link->link);
+ addr->deliveries_transit++;
}
- dest_node = DEQ_NEXT(dest_node);
}
+
+ dx_bitmask_free(link_set);
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
}
@@ -959,222 +1315,3 @@ void dx_router_send2(dx_dispatch_t *dx,
dx_field_iterator_free(iter);
}
-
-//===============================================================================
-// Python Router Adapter
-//===============================================================================
-
-typedef struct {
- PyObject_HEAD
- dx_router_t *router;
-} RouterAdapter;
-
-
-static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
-{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *address;
- int is_reachable;
- int is_neighbor;
-
- if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
- return 0;
-
- // TODO
-
- Py_INCREF(Py_None);
- return Py_None;
-}
-
-
-static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
-{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
-
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
- return 0;
-
- // TODO
-
- Py_INCREF(Py_None);
- return Py_None;
-}
-
-
-static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
-{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
-
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
- return 0;
-
- // TODO
-
- Py_INCREF(Py_None);
- return Py_None;
-}
-
-
-static PyMethodDef RouterAdapter_methods[] = {
- {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
- {0, 0, 0, 0}
-};
-
-static PyTypeObject RouterAdapterType = {
- PyObject_HEAD_INIT(0)
- 0, /* ob_size*/
- "dispatch.RouterAdapter", /* tp_name*/
- sizeof(RouterAdapter), /* tp_basicsize*/
- 0, /* tp_itemsize*/
- 0, /* tp_dealloc*/
- 0, /* tp_print*/
- 0, /* tp_getattr*/
- 0, /* tp_setattr*/
- 0, /* tp_compare*/
- 0, /* tp_repr*/
- 0, /* tp_as_number*/
- 0, /* tp_as_sequence*/
- 0, /* tp_as_mapping*/
- 0, /* tp_hash */
- 0, /* tp_call*/
- 0, /* tp_str*/
- 0, /* tp_getattro*/
- 0, /* tp_setattro*/
- 0, /* tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /* tp_flags*/
- "Dispatch Router Adapter", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- RouterAdapter_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- 0, /* tp_init */
- 0, /* tp_alloc */
- 0, /* tp_new */
- 0, /* tp_free */
- 0, /* tp_is_gc */
- 0, /* tp_bases */
- 0, /* tp_mro */
- 0, /* tp_cache */
- 0, /* tp_subclasses */
- 0, /* tp_weaklist */
- 0, /* tp_del */
- 0 /* tp_version_tag */
-};
-
-
-static void dx_router_python_setup(dx_router_t *router)
-{
- PyObject *pDispatchModule = dx_python_module();
-
- RouterAdapterType.tp_new = PyType_GenericNew;
- if (PyType_Ready(&RouterAdapterType) < 0) {
- PyErr_Print();
- dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter");
- return;
- }
-
- Py_INCREF(&RouterAdapterType);
- PyModule_AddObject(pDispatchModule, "RouterAdapter", (PyObject*) &RouterAdapterType);
-
- //
- // Attempt to import the Python Router module
- //
- PyObject* pName;
- PyObject* pId;
- PyObject* pArea;
- PyObject* pModule;
- PyObject* pClass;
- PyObject* pArgs;
-
- pName = PyString_FromString("qpid.dispatch.router");
- pModule = PyImport_Import(pName);
- Py_DECREF(pName);
- if (!pModule) {
- dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module");
- return;
- }
-
- pClass = PyObject_GetAttrString(pModule, "RouterEngine");
- if (!pClass || !PyClass_Check(pClass)) {
- dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module");
- return;
- }
-
- PyObject *adapterType = PyObject_GetAttrString(pDispatchModule, "RouterAdapter");
- PyObject *adapterInstance = PyObject_CallObject(adapterType, 0);
- assert(adapterInstance);
-
- ((RouterAdapter*) adapterInstance)->router = router;
-
- //
- // Constructor Arguments for RouterEngine
- //
- pArgs = PyTuple_New(3);
-
- // arg 0: adapter instance
- PyTuple_SetItem(pArgs, 0, adapterInstance);
-
- // arg 1: router_id
- pId = PyString_FromString(router->router_id);
- PyTuple_SetItem(pArgs, 1, pId);
-
- // arg 2: area id
- pArea = PyString_FromString(router->router_area);
- PyTuple_SetItem(pArgs, 2, pArea);
-
- //
- // Instantiate the router
- //
- router->pyRouter = PyInstance_New(pClass, pArgs, 0);
- Py_DECREF(pArgs);
- Py_DECREF(adapterType);
-
- if (!router->pyRouter) {
- PyErr_Print();
- dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated");
- return;
- }
-
- router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick");
- if (!router->pyTick || !PyCallable_Check(router->pyTick)) {
- dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
- return;
- }
-}
-
-
-static void dx_pyrouter_tick(dx_router_t *router)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (router->pyTick) {
- pArgs = PyTuple_New(0);
- pValue = PyObject_CallObject(router->pyTick, pArgs);
- if (PyErr_Occurred()) {
- PyErr_Print();
- }
- Py_DECREF(pArgs);
- if (pValue) {
- Py_DECREF(pValue);
- }
- }
-}
-