diff options
author | Ted Ross <tross@apache.org> | 2013-08-01 20:19:24 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2013-08-01 20:19:24 +0000 |
commit | 2d0b9f9886d1b32c0f3379f0b5d6592ee98dc289 (patch) | |
tree | 8d7eb785765d3fbf3b6c75b365cbe1d65004433e | |
parent | dc9fb9d54bc1593935d3f3a90c494421ba043ffe (diff) | |
download | qpid-python-2d0b9f9886d1b32c0f3379f0b5d6592ee98dc289.tar.gz |
QPID-4967 - Updates to the router
- The router module now tracks other router nodes (neighbors and non-neighbors)
- Tracked nodes are communicated to the router_node.c fast-path
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509415 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | extras/dispatch/src/py/router/link.py | 3 | ||||
-rw-r--r-- | extras/dispatch/src/py/router/neighbor.py | 2 | ||||
-rw-r--r-- | extras/dispatch/src/py/router/node.py | 103 | ||||
-rw-r--r-- | extras/dispatch/src/py/router/router_engine.py | 25 | ||||
-rw-r--r-- | extras/dispatch/src/python_embedded.c | 1 | ||||
-rw-r--r-- | extras/dispatch/src/router_node.c | 74 | ||||
-rw-r--r-- | extras/dispatch/tests/router_engine_test.py | 7 |
7 files changed, 191 insertions, 24 deletions
diff --git a/extras/dispatch/src/py/router/link.py b/extras/dispatch/src/py/router/link.py index 1e06d161f6..ce97794b8c 100644 --- a/extras/dispatch/src/py/router/link.py +++ b/extras/dispatch/src/py/router/link.py @@ -87,6 +87,7 @@ class LinkStateEngine(object): self.collection[msg.id] = ls self.collection_changed = True ls.last_seen = now + self.container.new_node(msg.id) self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) # Schedule LSRs for any routers referenced in this LS that we don't know about for _id in msg.ls.peers: @@ -108,6 +109,7 @@ class LinkStateEngine(object): self.collection_changed = True self._send_ra() + def set_mobile_sequence(self, seq): self.mobile_seq = seq @@ -124,6 +126,7 @@ class LinkStateEngine(object): for key in to_delete: ls = self.collection.pop(key) self.collection_changed = True + self.container.lost_node(key) self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) diff --git a/extras/dispatch/src/py/router/neighbor.py b/extras/dispatch/src/py/router/neighbor.py index 55c6bab62f..8d0dfceecf 100644 --- a/extras/dispatch/src/py/router/neighbor.py +++ b/extras/dispatch/src/py/router/neighbor.py @@ -64,6 +64,7 @@ class NeighborEngine(object): if msg.is_seen(self.id): if self.link_state.add_peer(msg.id): self.link_state_changed = True + self.container.new_neighbor(msg.id) self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id) ## ## TODO - Use this function to detect area boundaries @@ -78,6 +79,7 @@ class NeighborEngine(object): self.hellos.pop(key) if self.link_state.del_peer(key): self.link_state_changed = True + self.container.lost_neighbor(key) self.container.log(LOG_INFO, "Neighbor lost: %s" % key) diff --git a/extras/dispatch/src/py/router/node.py b/extras/dispatch/src/py/router/node.py new file mode 100644 index 0000000000..482a83dfe8 --- /dev/null +++ b/extras/dispatch/src/py/router/node.py @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +try: + from dispatch import * +except ImportError: + from stubs import * + + +class NodeTracker(object): + """ + This module is responsible for tracking the set of router nodes that are known to this + router. It tracks whether they are neighbor or remote and whether they are reachable. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.nodes = {} # id => RemoteNode + + + def tick(self, now): + pass + + + def new_neighbor(self, node_id): + if node_id not in self.nodes: + self.nodes[node_id] = RemoteNode(node_id) + self.nodes[node_id].set_neighbor() + self._notify(self.nodes[node_id]) + + + def lost_neighbor(self, node_id): + node = self.nodes[node_id] + node.clear_neighbor() + self._notify(node) + if node.to_delete(): + self.nodes.pop(node_id) + + + def new_node(self, node_id): + if node_id not in self.nodes: + self.nodes[node_id] = RemoteNode(node_id) + self.nodes[node_id].set_remote() + self._notify(self.nodes[node_id]) + + + def lost_node(self, node_id): + node = self.nodes[node_id] + node.clear_remote() + self._notify(node) + if node.to_delete(): + self.nodes.pop(node_id) + + + def _notify(self, node): + if node.to_delete(): + self.container.adapter.node_updated("R%s" % node.id, 0, 0) + else: + is_neighbor = 0 + if node.neighbor: + is_neighbor = 1 + self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor) + + +class RemoteNode(object): + + def __init__(self, node_id): + self.id = node_id + self.neighbor = None + self.remote = None + + def set_neighbor(self): + self.neighbor = True + + def set_remote(self): + self.remote = True + + def clear_neighbor(self): + self.neighbor = None + + def clear_remote(self): + self.remote = None + + def to_delete(self): + return self.neighbor or self.remote + diff --git a/extras/dispatch/src/py/router/router_engine.py b/extras/dispatch/src/py/router/router_engine.py index 065204ad62..675d9c32e5 100644 --- a/extras/dispatch/src/py/router/router_engine.py +++ b/extras/dispatch/src/py/router/router_engine.py @@ -29,6 +29,7 @@ from mobile import MobileAddressEngine from routing import RoutingTableEngine from binding import BindingEngine from adapter import AdapterEngine +from node import NodeTracker ## ## Import the Dispatch adapters from the environment. If they are not found @@ -79,6 +80,7 @@ class RouterEngine: self.routing_table_engine = RoutingTableEngine(self) self.binding_engine = BindingEngine(self) self.adapter_engine = AdapterEngine(self) + self.node_tracker = NodeTracker(self) @@ -125,6 +127,7 @@ class RouterEngine: self.routing_table_engine.tick(now) self.binding_engine.tick(now) self.adapter_engine.tick(now) + self.node_tracker.tick(now) except Exception, e: self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) @@ -222,6 +225,12 @@ class RouterEngine: self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) + def node_updated(self, addr, reachable, neighbor): + """ + """ + self.router_adapter(addr, reachable, neighbor) + + ##======================================================================================== ## Interconnect between the Sub-Modules ##======================================================================================== @@ -253,3 +262,19 @@ class RouterEngine: self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) self.adapter_engine.remote_routes_changed(key_class, routes) + def new_neighbor(self, rid): + self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid) + self.node_tracker.new_neighbor(rid) + + def lost_neighbor(self, rid): + self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) + self.node_tracker.lost_neighbor(rid) + + def new_node(self, rid): + self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) + self.node_tracker.new_node(rid) + + def lost_node(self, rid): + self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) + self.node_tracker.lost_node(rid) + diff --git a/extras/dispatch/src/python_embedded.c b/extras/dispatch/src/python_embedded.c index ffa5bda6b8..b497cbf7bd 100644 --- a/extras/dispatch/src/python_embedded.c +++ b/extras/dispatch/src/python_embedded.c @@ -444,7 +444,6 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg) // dx_parsed_field_t *body_map = dx_parse(body); if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) { - printf("XXXX %s\n", dx_parse_error(body_map)); dx_field_iterator_free(ap); dx_field_iterator_free(body); dx_parse_free(ap_map); diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c index e16b654d85..9938ab113e 100644 --- a/extras/dispatch/src/router_node.c +++ b/extras/dispatch/src/router_node.c @@ -20,6 +20,7 @@ #include <qpid/dispatch/python_embedded.h> #include <stdio.h> #include <string.h> +#include <stdbool.h> #include <qpid/dispatch.h> #include "dispatch_private.h" @@ -35,16 +36,16 @@ static char *local_prefix = "_local/"; /** * Address Types and Processing: * - * Address Hash Key onReceive onEmit - * ============================================================================= - * _local/<local> L<local> handler forward - * _topo/<area>/<router>/<local> A<area> forward forward - * _topo/<my-area>/<router>/<local> R<router> forward forward - * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward - * _topo/<area>/all/<local> A<area> forward forward - * _topo/<my-area>/all/<local> L<local> forward+handler forward - * _topo/all/all/<local> L<local> forward+handler forward - * <mobile> M<mobile> forward+handler forward + * Address Hash Key onReceive + * =================================================================== + * _local/<local> L<local> handler + * _topo/<area>/<router>/<local> A<area> forward + * _topo/<my-area>/<router>/<local> R<router> forward + * _topo/<my-area>/<my-router>/<local> L<local> handler + * _topo/<area>/all/<local> A<area> forward + * _topo/<my-area>/all/<local> L<local> forward handler + * _topo/all/all/<local> L<local> forward handler + * <mobile> M<mobile> forward handler */ @@ -59,6 +60,14 @@ typedef enum { } dx_link_type_t; +typedef struct dx_routed_event_t { + DEQ_LINKS(struct dx_routed_event_t); + dx_message_t *message; + bool settled; + uint64_t disposition; +} dx_routed_event_t; + + struct dx_router_link_t { DEQ_LINKS(dx_router_link_t); dx_direction_t link_direction; @@ -67,7 +76,7 @@ struct dx_router_link_t { dx_link_t *link; // [own] Link pointer dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_message_list_t out_fifo; // Message FIFO for outgoing messages + dx_message_list_t out_fifo; // Message FIFO for outgoing messages. Unused for incoming links }; ALLOC_DECLARE(dx_router_link_t); @@ -662,19 +671,19 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router_node.type_context = router; - router->dx = dx; - router->router_area = area; - router->router_id = id; - router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); + router->dx = dx; + router->router_area = area; + router->router_id = id; + router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); DEQ_INIT(router->in_links); DEQ_INIT(router->routers); DEQ_INIT(router->in_fifo); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->out_hash = hash(10, 32, 0); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; // @@ -821,6 +830,24 @@ typedef struct { } RouterAdapter; +static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) +{ + //RouterAdapter *adapter = (RouterAdapter*) self; + //dx_router_t *router = adapter->router; + const char *address; + int is_reachable; + int is_neighbor; + + if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) + return 0; + + // TODO + + Py_INCREF(Py_None); + return Py_None; +} + + static PyObject* dx_router_add_route(PyObject *self, PyObject *args) { //RouterAdapter *adapter = (RouterAdapter*) self; @@ -854,8 +881,9 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args) static PyMethodDef RouterAdapter_methods[] = { - {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, - {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, + {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"}, + {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, + {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, {0, 0, 0, 0} }; diff --git a/extras/dispatch/tests/router_engine_test.py b/extras/dispatch/tests/router_engine_test.py index fbfe2a2155..0427879248 100644 --- a/extras/dispatch/tests/router_engine_test.py +++ b/extras/dispatch/tests/router_engine_test.py @@ -99,12 +99,19 @@ class NeighborTest(unittest.TestCase): def local_link_state_changed(self, link_state): self.local_link_state = link_state + def new_neighbor(self, rid): + self.neighbors[rid] = None + + def lost_neighbor(self, rid): + self.neighbors.pop(rid) + def setUp(self): self.sent = [] self.local_link_state = None self.id = "R1" self.area = "area" self.config = Configuration() + self.neighbors = {} def test_hello_sent(self): self.sent = [] |