summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-08-01 20:19:24 +0000
committerTed Ross <tross@apache.org>2013-08-01 20:19:24 +0000
commit2d0b9f9886d1b32c0f3379f0b5d6592ee98dc289 (patch)
tree8d7eb785765d3fbf3b6c75b365cbe1d65004433e
parentdc9fb9d54bc1593935d3f3a90c494421ba043ffe (diff)
downloadqpid-python-2d0b9f9886d1b32c0f3379f0b5d6592ee98dc289.tar.gz
QPID-4967 - Updates to the router
- The router module now tracks other router nodes (neighbors and non-neighbors) - Tracked nodes are communicated to the router_node.c fast-path git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509415 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--extras/dispatch/src/py/router/link.py3
-rw-r--r--extras/dispatch/src/py/router/neighbor.py2
-rw-r--r--extras/dispatch/src/py/router/node.py103
-rw-r--r--extras/dispatch/src/py/router/router_engine.py25
-rw-r--r--extras/dispatch/src/python_embedded.c1
-rw-r--r--extras/dispatch/src/router_node.c74
-rw-r--r--extras/dispatch/tests/router_engine_test.py7
7 files changed, 191 insertions, 24 deletions
diff --git a/extras/dispatch/src/py/router/link.py b/extras/dispatch/src/py/router/link.py
index 1e06d161f6..ce97794b8c 100644
--- a/extras/dispatch/src/py/router/link.py
+++ b/extras/dispatch/src/py/router/link.py
@@ -87,6 +87,7 @@ class LinkStateEngine(object):
self.collection[msg.id] = ls
self.collection_changed = True
ls.last_seen = now
+ self.container.new_node(msg.id)
self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id)
# Schedule LSRs for any routers referenced in this LS that we don't know about
for _id in msg.ls.peers:
@@ -108,6 +109,7 @@ class LinkStateEngine(object):
self.collection_changed = True
self._send_ra()
+
def set_mobile_sequence(self, seq):
self.mobile_seq = seq
@@ -124,6 +126,7 @@ class LinkStateEngine(object):
for key in to_delete:
ls = self.collection.pop(key)
self.collection_changed = True
+ self.container.lost_node(key)
self.container.log(LOG_INFO, "Expired link-state from router: %s" % key)
diff --git a/extras/dispatch/src/py/router/neighbor.py b/extras/dispatch/src/py/router/neighbor.py
index 55c6bab62f..8d0dfceecf 100644
--- a/extras/dispatch/src/py/router/neighbor.py
+++ b/extras/dispatch/src/py/router/neighbor.py
@@ -64,6 +64,7 @@ class NeighborEngine(object):
if msg.is_seen(self.id):
if self.link_state.add_peer(msg.id):
self.link_state_changed = True
+ self.container.new_neighbor(msg.id)
self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id)
##
## TODO - Use this function to detect area boundaries
@@ -78,6 +79,7 @@ class NeighborEngine(object):
self.hellos.pop(key)
if self.link_state.del_peer(key):
self.link_state_changed = True
+ self.container.lost_neighbor(key)
self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
diff --git a/extras/dispatch/src/py/router/node.py b/extras/dispatch/src/py/router/node.py
new file mode 100644
index 0000000000..482a83dfe8
--- /dev/null
+++ b/extras/dispatch/src/py/router/node.py
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+try:
+ from dispatch import *
+except ImportError:
+ from stubs import *
+
+
+class NodeTracker(object):
+ """
+ This module is responsible for tracking the set of router nodes that are known to this
+ router. It tracks whether they are neighbor or remote and whether they are reachable.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.nodes = {} # id => RemoteNode
+
+
+ def tick(self, now):
+ pass
+
+
+ def new_neighbor(self, node_id):
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RemoteNode(node_id)
+ self.nodes[node_id].set_neighbor()
+ self._notify(self.nodes[node_id])
+
+
+ def lost_neighbor(self, node_id):
+ node = self.nodes[node_id]
+ node.clear_neighbor()
+ self._notify(node)
+ if node.to_delete():
+ self.nodes.pop(node_id)
+
+
+ def new_node(self, node_id):
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RemoteNode(node_id)
+ self.nodes[node_id].set_remote()
+ self._notify(self.nodes[node_id])
+
+
+ def lost_node(self, node_id):
+ node = self.nodes[node_id]
+ node.clear_remote()
+ self._notify(node)
+ if node.to_delete():
+ self.nodes.pop(node_id)
+
+
+ def _notify(self, node):
+ if node.to_delete():
+ self.container.adapter.node_updated("R%s" % node.id, 0, 0)
+ else:
+ is_neighbor = 0
+ if node.neighbor:
+ is_neighbor = 1
+ self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor)
+
+
+class RemoteNode(object):
+
+ def __init__(self, node_id):
+ self.id = node_id
+ self.neighbor = None
+ self.remote = None
+
+ def set_neighbor(self):
+ self.neighbor = True
+
+ def set_remote(self):
+ self.remote = True
+
+ def clear_neighbor(self):
+ self.neighbor = None
+
+ def clear_remote(self):
+ self.remote = None
+
+ def to_delete(self):
+ return self.neighbor or self.remote
+
diff --git a/extras/dispatch/src/py/router/router_engine.py b/extras/dispatch/src/py/router/router_engine.py
index 065204ad62..675d9c32e5 100644
--- a/extras/dispatch/src/py/router/router_engine.py
+++ b/extras/dispatch/src/py/router/router_engine.py
@@ -29,6 +29,7 @@ from mobile import MobileAddressEngine
from routing import RoutingTableEngine
from binding import BindingEngine
from adapter import AdapterEngine
+from node import NodeTracker
##
## Import the Dispatch adapters from the environment. If they are not found
@@ -79,6 +80,7 @@ class RouterEngine:
self.routing_table_engine = RoutingTableEngine(self)
self.binding_engine = BindingEngine(self)
self.adapter_engine = AdapterEngine(self)
+ self.node_tracker = NodeTracker(self)
@@ -125,6 +127,7 @@ class RouterEngine:
self.routing_table_engine.tick(now)
self.binding_engine.tick(now)
self.adapter_engine.tick(now)
+ self.node_tracker.tick(now)
except Exception, e:
self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
@@ -222,6 +225,12 @@ class RouterEngine:
self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
+ def node_updated(self, addr, reachable, neighbor):
+ """
+ """
+ self.router_adapter(addr, reachable, neighbor)
+
+
##========================================================================================
## Interconnect between the Sub-Modules
##========================================================================================
@@ -253,3 +262,19 @@ class RouterEngine:
self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
self.adapter_engine.remote_routes_changed(key_class, routes)
+ def new_neighbor(self, rid):
+ self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid)
+ self.node_tracker.new_neighbor(rid)
+
+ def lost_neighbor(self, rid):
+ self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
+ self.node_tracker.lost_neighbor(rid)
+
+ def new_node(self, rid):
+ self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
+ self.node_tracker.new_node(rid)
+
+ def lost_node(self, rid):
+ self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
+ self.node_tracker.lost_node(rid)
+
diff --git a/extras/dispatch/src/python_embedded.c b/extras/dispatch/src/python_embedded.c
index ffa5bda6b8..b497cbf7bd 100644
--- a/extras/dispatch/src/python_embedded.c
+++ b/extras/dispatch/src/python_embedded.c
@@ -444,7 +444,6 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg)
//
dx_parsed_field_t *body_map = dx_parse(body);
if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
- printf("XXXX %s\n", dx_parse_error(body_map));
dx_field_iterator_free(ap);
dx_field_iterator_free(body);
dx_parse_free(ap_map);
diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c
index e16b654d85..9938ab113e 100644
--- a/extras/dispatch/src/router_node.c
+++ b/extras/dispatch/src/router_node.c
@@ -20,6 +20,7 @@
#include <qpid/dispatch/python_embedded.h>
#include <stdio.h>
#include <string.h>
+#include <stdbool.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
@@ -35,16 +36,16 @@ static char *local_prefix = "_local/";
/**
* Address Types and Processing:
*
- * Address Hash Key onReceive onEmit
- * =============================================================================
- * _local/<local> L<local> handler forward
- * _topo/<area>/<router>/<local> A<area> forward forward
- * _topo/<my-area>/<router>/<local> R<router> forward forward
- * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward
- * _topo/<area>/all/<local> A<area> forward forward
- * _topo/<my-area>/all/<local> L<local> forward+handler forward
- * _topo/all/all/<local> L<local> forward+handler forward
- * <mobile> M<mobile> forward+handler forward
+ * Address Hash Key onReceive
+ * ===================================================================
+ * _local/<local> L<local> handler
+ * _topo/<area>/<router>/<local> A<area> forward
+ * _topo/<my-area>/<router>/<local> R<router> forward
+ * _topo/<my-area>/<my-router>/<local> L<local> handler
+ * _topo/<area>/all/<local> A<area> forward
+ * _topo/<my-area>/all/<local> L<local> forward handler
+ * _topo/all/all/<local> L<local> forward handler
+ * <mobile> M<mobile> forward handler
*/
@@ -59,6 +60,14 @@ typedef enum {
} dx_link_type_t;
+typedef struct dx_routed_event_t {
+ DEQ_LINKS(struct dx_routed_event_t);
+ dx_message_t *message;
+ bool settled;
+ uint64_t disposition;
+} dx_routed_event_t;
+
+
struct dx_router_link_t {
DEQ_LINKS(dx_router_link_t);
dx_direction_t link_direction;
@@ -67,7 +76,7 @@ struct dx_router_link_t {
dx_link_t *link; // [own] Link pointer
dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
- dx_message_list_t out_fifo; // Message FIFO for outgoing messages
+ dx_message_list_t out_fifo; // Message FIFO for outgoing messages. Unused for incoming links
};
ALLOC_DECLARE(dx_router_link_t);
@@ -662,19 +671,19 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router_node.type_context = router;
- router->dx = dx;
- router->router_area = area;
- router->router_id = id;
- router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
+ router->dx = dx;
+ router->router_area = area;
+ router->router_id = id;
+ router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
DEQ_INIT(router->in_links);
DEQ_INIT(router->routers);
DEQ_INIT(router->in_fifo);
- router->lock = sys_mutex();
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
- router->pyTick = 0;
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->out_hash = hash(10, 32, 0);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
//
@@ -821,6 +830,24 @@ typedef struct {
} RouterAdapter;
+static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+{
+ //RouterAdapter *adapter = (RouterAdapter*) self;
+ //dx_router_t *router = adapter->router;
+ const char *address;
+ int is_reachable;
+ int is_neighbor;
+
+ if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+ return 0;
+
+ // TODO
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
@@ -854,8 +881,9 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
static PyMethodDef RouterAdapter_methods[] = {
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
+ {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
+ {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
{0, 0, 0, 0}
};
diff --git a/extras/dispatch/tests/router_engine_test.py b/extras/dispatch/tests/router_engine_test.py
index fbfe2a2155..0427879248 100644
--- a/extras/dispatch/tests/router_engine_test.py
+++ b/extras/dispatch/tests/router_engine_test.py
@@ -99,12 +99,19 @@ class NeighborTest(unittest.TestCase):
def local_link_state_changed(self, link_state):
self.local_link_state = link_state
+ def new_neighbor(self, rid):
+ self.neighbors[rid] = None
+
+ def lost_neighbor(self, rid):
+ self.neighbors.pop(rid)
+
def setUp(self):
self.sent = []
self.local_link_state = None
self.id = "R1"
self.area = "area"
self.config = Configuration()
+ self.neighbors = {}
def test_hello_sent(self):
self.sent = []