diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-10-21 22:04:51 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-10-21 22:04:51 +0000 |
| commit | 888581cb9781259073d190edede25e6253ec7dd9 (patch) | |
| tree | ca719eb54a498aebb5c59c527b08178491e4ad4c /qpid/extras/dispatch/python | |
| parent | 6d5d782b504677fcc4392086cb628dbbb79c800a (diff) | |
| download | qpid-python-888581cb9781259073d190edede25e6253ec7dd9.tar.gz | |
QPID-4984: WIP - Merge from trunk r.1534385.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/python')
12 files changed, 1196 insertions, 1300 deletions
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/config/schema.py b/qpid/extras/dispatch/python/qpid/dispatch/config/schema.py index 545139f0df..7b00000c99 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/config/schema.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/config/schema.py @@ -57,6 +57,7 @@ config_schema = { 'addr' : (str, 0, "M"), 'port' : (str, 1, "M"), 'label' : (str, None, "", None), + 'role' : (str, None, "", 'normal'), 'sasl-mechanisms' : (str, None, "M"), 'ssl-profile' : (str, None, "E", None), 'require-peer-auth' : (bool, None, "", True), @@ -66,11 +67,13 @@ config_schema = { 'addr' : (str, 0, "M"), 'port' : (str, 1, "M"), 'label' : (str, None, "", None), + 'role' : (str, None, "", 'normal'), 'sasl-mechanisms' : (str, None, "M"), 'ssl-profile' : (str, None, "E", None), 'allow-redirect' : (bool, None, "", True) }), 'router' : (True, { + 'mode' : (str, None, "", 'standalone'), 'router-id' : (str, None, "M"), 'area' : (str, None, "", None), 'hello-interval' : (int, None, "", 1), diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py deleted file mode 100644 index 7f7f6c9e8e..0000000000 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from ..stubs import * - -ENTRY_OLD = 1 -ENTRY_CURRENT = 2 -ENTRY_NEW = 3 - -class AdapterEngine(object): - """ - This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop). - Key binding lists are kept in disjoint key-classes that can come from different parts of the router - (i.e. topological keys for inter-router communication and mobile keys for end users). - - For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the - routing tables to be efficiently communicated to the adapter in the form of table deltas. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.key_classes = {} # map [key_class] => (addr-key, next-hop) - - - def tick(self, now): - """ - There is no periodic processing needed for this module. - """ - pass - - - def remote_routes_changed(self, key_class, new_table): - old_table = [] - if key_class in self.key_classes: - old_table = self.key_classes[key_class] - - # flag all of the old entries - old_flags = {} - for a,b in old_table: - old_flags[(a,b)] = ENTRY_OLD - - # flag the new entries - new_flags = {} - for a,b in new_table: - new_flags[(a,b)] = ENTRY_NEW - - # calculate the differences from old to new - for a,b in new_table: - if old_table.count((a,b)) > 0: - old_flags[(a,b)] = ENTRY_CURRENT - new_flags[(a,b)] = ENTRY_CURRENT - - # make to_add and to_delete lists - to_add = [] - to_delete = [] - for (a,b),f in old_flags.items(): - if f == ENTRY_OLD: - to_delete.append((a,b)) - for (a,b),f in new_flags.items(): - if f == ENTRY_NEW: - to_add.append((a,b)) - - # set the routing table to the new contents - self.key_classes[key_class] = new_table - - # update the adapter's routing tables - # Note: Do deletions before adds to avoid overlapping routes that may cause - # messages to be duplicated. It's better to have gaps in the routing - # tables momentarily because unroutable messages are stored for retry. - for a,b in to_delete: - self.container.router_adapter.remote_unbind(a, b) - for a,b in to_add: - self.container.router_adapter.remote_bind(a, b) - - self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class) - for a,b in new_table: - self.container.log(LOG_INFO, " %s => %s" % (a, b)) - - diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py deleted file mode 100644 index a37b585e3e..0000000000 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from ..stubs import * - - -class BindingEngine(object): - """ - This module is responsible for responding to two different events: - 1) The learning of new remote mobile addresses - 2) The change of topology (i.e. different next-hops for remote routers) - When these occur, this module converts the mobile routing table (address => router) - to a next-hop routing table (address => next-hop), compresses the keys in case there - are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.current_keys = {} - - - def tick(self, now): - pass - - - def mobile_keys_changed(self, keys): - self.current_keys = keys - next_hop_keys = self._convert_ids_to_next_hops(keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def next_hops_changed(self): - next_hop_keys = self._convert_ids_to_next_hops(self.current_keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def _convert_ids_to_next_hops(self, keys): - next_hops = self.container.get_next_hops() - new_keys = {} - for _id, value in keys.items(): - if _id in next_hops: - next_hop = next_hops[_id] - if next_hop not in new_keys: - new_keys[next_hop] = [] - new_keys[next_hop].extend(value) - return new_keys - - - def _compress_keys(self, keys): - trees = {} - for _id, key_list in keys.items(): - trees[_id] = TopicElementList() - for key in key_list: - trees[_id].add_key(key) - routing_table = [] - for _id, tree in trees.items(): - tree_keys = tree.get_list() - for tk in tree_keys: - routing_table.append((tk, _id)) - return routing_table - - -class TopicElementList(object): - """ - """ - def __init__(self): - self.elements = {} # map text => (terminal, sub-list) - - def __repr__(self): - return "%r" % self.elements - - def add_key(self, key): - self.add_tokens(key.split('.')) - - def add_tokens(self, tokens): - first = tokens.pop(0) - terminal = len(tokens) == 0 - - if terminal and first == '#': - ## Optimization #1A (A.B.C.D followed by A.B.#) - self.elements = {'#':(True, TopicElementList())} - return - - if '#' in self.elements: - _t,_el = self.elements['#'] - if _t: - ## Optimization #1B (A.B.# followed by A.B.C.D) - return - - if first not in self.elements: - self.elements[first] = (terminal, TopicElementList()) - else: - _t,_el = self.elements[first] - if terminal and not _t: - self.elements[first] = (terminal, _el) - - if not terminal: - _t,_el = self.elements[first] - _el.add_tokens(tokens) - - def get_list(self): - keys = [] - for token, (_t,_el) in self.elements.items(): - if _t: keys.append(token) - _el.build_list(token, keys) - return keys - - def build_list(self, prefix, keys): - for token, (_t,_el) in self.elements.items(): - if _t: keys.append("%s.%s" % (prefix, token)) - _el.build_list("%s.%s" % (prefix, token), keys) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py b/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py index f87d2ee7d2..e0fd060b82 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py @@ -18,30 +18,30 @@ # class Configuration(object): - """ - This module manages and holds the configuration and tuning parameters for a router. - """ - def __init__(self, overrides={}): - ## - ## Load default values - ## - self.values = { 'hello_interval' : 1.0, - 'hello_max_age' : 3.0, - 'ra_interval' : 30.0, - 'remote_ls_max_age' : 60.0, - 'mobile_addr_max_age' : 60.0 } + """ + This module manages and holds the configuration and tuning parameters for a router. + """ + def __init__(self, overrides={}): + ## + ## Load default values + ## + self.values = { 'hello_interval' : 1.0, + 'hello_max_age' : 3.0, + 'ra_interval' : 30.0, + 'remote_ls_max_age' : 60.0, + 'mobile_addr_max_age' : 60.0 } - ## - ## Apply supplied overrides - ## - for k, v in overrides.items(): - self.values[k] = v + ## + ## Apply supplied overrides + ## + for k, v in overrides.items(): + self.values[k] = v - def __getattr__(self, key): - if key in self.values: - return self.values[key] - raise KeyError + def __getattr__(self, key): + if key in self.values: + return self.values[key] + raise KeyError - def __repr__(self): - return "%r" % self.values + def __repr__(self): + return "%r" % self.values diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/data.py b/qpid/extras/dispatch/python/qpid/dispatch/router/data.py index 89e347a29e..812369768f 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/data.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/data.py @@ -19,257 +19,257 @@ try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * def getMandatory(data, key, cls=None): - """ - Get the value mapped to the requested key. If it's not present, raise an exception. - """ - if key in data: - value = data[key] - if cls and value.__class__ != cls: - raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) - return value - raise Exception("Mandatory protocol field missing: '%s'" % key) + """ + Get the value mapped to the requested key. If it's not present, raise an exception. + """ + if key in data: + value = data[key] + if cls and value.__class__ != cls: + raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) + return value + raise Exception("Mandatory protocol field missing: '%s'" % key) def getOptional(data, key, default=None, cls=None): - """ - Get the value mapped to the requested key. If it's not present, return the default value. - """ - if key in data: - value = data[key] - if cls and value.__class__ != cls: - raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) - return value - return default + """ + Get the value mapped to the requested key. If it's not present, return the default value. + """ + if key in data: + value = data[key] + if cls and value.__class__ != cls: + raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) + return value + return default class LinkState(object): - """ - The link-state of a single router. The link state consists of a list of neighbor routers reachable from - the reporting router. The link-state-sequence number is incremented each time the link state changes. - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None): - self.last_seen = 0 - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.peers = getMandatory(body, 'peers', list) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.peers = _peers - - def __repr__(self): - return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'peers' : self.peers} - - def add_peer(self, _id): - if self.peers.count(_id) == 0: - self.peers.append(_id) - return True - return False - - def del_peer(self, _id): - if self.peers.count(_id) > 0: - self.peers.remove(_id) - return True - return False - - def bump_sequence(self): - self.ls_seq += 1 + """ + The link-state of a single router. The link state consists of a list of neighbor routers reachable from + the reporting router. The link-state-sequence number is incremented each time the link state changes. + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None): + self.last_seen = 0 + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.peers = getMandatory(body, 'peers', list) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.peers = _peers + + def __repr__(self): + return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'peers' : self.peers} + + def add_peer(self, _id): + if self.peers.count(_id) == 0: + self.peers.append(_id) + return True + return False + + def del_peer(self, _id): + if self.peers.count(_id) > 0: + self.peers.remove(_id) + return True + return False + + def bump_sequence(self): + self.ls_seq += 1 class MessageHELLO(object): - """ - HELLO Message - scope: neighbors only - HELLO messages travel at most one hop - This message is used by directly connected routers to determine with whom they have - bidirectional connectivity. - """ - def __init__(self, body, _id=None, _area=None, _seen_peers=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.seen_peers = getMandatory(body, 'seen', list) - else: - self.id = _id - self.area = _area - self.seen_peers = _seen_peers - - def __repr__(self): - return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers) - - def get_opcode(self): - return 'HELLO' - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'seen' : self.seen_peers} - - def is_seen(self, _id): - return self.seen_peers.count(_id) > 0 + """ + HELLO Message + scope: neighbors only - HELLO messages travel at most one hop + This message is used by directly connected routers to determine with whom they have + bidirectional connectivity. + """ + def __init__(self, body, _id=None, _area=None, _seen_peers=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.seen_peers = getMandatory(body, 'seen', list) + else: + self.id = _id + self.area = _area + self.seen_peers = _seen_peers + + def __repr__(self): + return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers) + + def get_opcode(self): + return 'HELLO' + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'seen' : self.seen_peers} + + def is_seen(self, _id): + return self.seen_peers.count(_id) > 0 class MessageRA(object): - """ - Router Advertisement (RA) Message - scope: all routers in the area and all designated routers - This message is sent periodically to indicate the originating router's sequence numbers - for link-state and mobile-address-state. - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.mobile_seq = getMandatory(body, 'mobile_seq', long) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.mobile_seq = long(_mobile_seq) - - def get_opcode(self): - return 'RA' - - def __repr__(self): - return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \ - (self.id, self.area, self.ls_seq, self.mobile_seq) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'mobile_seq' : self.mobile_seq} + """ + Router Advertisement (RA) Message + scope: all routers in the area and all designated routers + This message is sent periodically to indicate the originating router's sequence numbers + for link-state and mobile-address-state. + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.mobile_seq = getMandatory(body, 'mobile_seq', long) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.mobile_seq = long(_mobile_seq) + + def get_opcode(self): + return 'RA' + + def __repr__(self): + return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \ + (self.id, self.area, self.ls_seq, self.mobile_seq) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'mobile_seq' : self.mobile_seq} class MessageLSU(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.ls = LinkState(getMandatory(body, 'ls', dict)) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.ls = _ls - - def get_opcode(self): - return 'LSU' - - def __repr__(self): - return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \ - (self.id, self.area, self.ls_seq, self.ls) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'ls' : self.ls.to_dict()} + """ + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.ls = LinkState(getMandatory(body, 'ls', dict)) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.ls = _ls + + def get_opcode(self): + return 'LSU' + + def __repr__(self): + return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \ + (self.id, self.area, self.ls_seq, self.ls) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'ls' : self.ls.to_dict()} class MessageLSR(object): - """ - """ - def __init__(self, body, _id=None, _area=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - else: - self.id = _id - self.area = _area + """ + """ + def __init__(self, body, _id=None, _area=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + else: + self.id = _id + self.area = _area - def get_opcode(self): - return 'LSR' + def get_opcode(self): + return 'LSR' - def __repr__(self): - return "LSR(id=%s area=%s)" % (self.id, self.area) + def __repr__(self): + return "LSR(id=%s area=%s)" % (self.id, self.area) - def to_dict(self): - return {'id' : self.id, - 'area' : self.area} + def to_dict(self): + return {'id' : self.id, + 'area' : self.area} class MessageMAU(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.mobile_seq = getMandatory(body, 'mobile_seq', long) - self.add_list = getOptional(body, 'add', None, list) - self.del_list = getOptional(body, 'del', None, list) - self.exist_list = getOptional(body, 'exist', None, list) - else: - self.id = _id - self.area = _area - self.mobile_seq = long(_seq) - self.add_list = _add_list - self.del_list = _del_list - self.exist_list = _exist_list - - def get_opcode(self): - return 'MAU' - - def __repr__(self): - _add = '' - _del = '' - _exist = '' - if self.add_list: _add = ' add=%r' % self.add_list - if self.del_list: _del = ' del=%r' % self.del_list - if self.exist_list: _exist = ' exist=%r' % self.exist_list - return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ - (self.id, self.area, self.mobile_seq, _add, _del, _exist) - - def to_dict(self): - body = { 'id' : self.id, - 'area' : self.area, - 'mobile_seq' : self.mobile_seq } - if self.add_list: body['add'] = self.add_list - if self.del_list: body['del'] = self.del_list - if self.exist_list: body['exist'] = self.exist_list - return body + """ + """ + def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.mobile_seq = getMandatory(body, 'mobile_seq', long) + self.add_list = getOptional(body, 'add', None, list) + self.del_list = getOptional(body, 'del', None, list) + self.exist_list = getOptional(body, 'exist', None, list) + else: + self.id = _id + self.area = _area + self.mobile_seq = long(_seq) + self.add_list = _add_list + self.del_list = _del_list + self.exist_list = _exist_list + + def get_opcode(self): + return 'MAU' + + def __repr__(self): + _add = '' + _del = '' + _exist = '' + if self.add_list: _add = ' add=%r' % self.add_list + if self.del_list: _del = ' del=%r' % self.del_list + if self.exist_list: _exist = ' exist=%r' % self.exist_list + return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ + (self.id, self.area, self.mobile_seq, _add, _del, _exist) + + def to_dict(self): + body = { 'id' : self.id, + 'area' : self.area, + 'mobile_seq' : self.mobile_seq } + if self.add_list: body['add'] = self.add_list + if self.del_list: body['del'] = self.del_list + if self.exist_list: body['exist'] = self.exist_list + return body class MessageMAR(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _have_seq=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.have_seq = getMandatory(body, 'have_seq', long) - else: - self.id = _id - self.area = _area - self.have_seq = long(_have_seq) - - def get_opcode(self): - return 'MAR' - - def __repr__(self): - return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'have_seq' : self.have_seq} + """ + """ + def __init__(self, body, _id=None, _area=None, _have_seq=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.have_seq = getMandatory(body, 'have_seq', long) + else: + self.id = _id + self.area = _area + self.have_seq = long(_have_seq) + + def get_opcode(self): + return 'MAR' + + def __repr__(self): + return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'have_seq' : self.have_seq} diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py index fb23177e2f..11307cd079 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py @@ -21,123 +21,124 @@ from data import MessageRA, MessageLSU, MessageLSR from time import time try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class LinkStateEngine(object): - """ - This module is responsible for running the Link State protocol and maintaining the set - of link states that are gathered from the domain. It notifies outbound when changes to - the link-state-collection are detected. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.ra_interval = self.container.config.ra_interval - self.remote_ls_max_age = self.container.config.remote_ls_max_age - self.last_ra_time = 0 - self.collection = {} - self.collection_changed = False - self.mobile_seq = 0 - self.needed_lsrs = {} - - - def tick(self, now): - self._expire_ls(now) - self._send_lsrs() - - if now - self.last_ra_time >= self.ra_interval: - self.last_ra_time = now - self._send_ra() - - if self.collection_changed: - self.collection_changed = False - self.container.log(LOG_INFO, "New Link-State Collection:") - for a,b in self.collection.items(): - self.container.log(LOG_INFO, " %s => %r" % (a, b.peers)) - self.container.ls_collection_changed(self.collection) - - - def handle_ra(self, msg, now): - if msg.id == self.id: - return - if msg.id in self.collection: - ls = self.collection[msg.id] - ls.last_seen = now - if ls.ls_seq < msg.ls_seq: - self.needed_lsrs[(msg.area, msg.id)] = None - else: - self.needed_lsrs[(msg.area, msg.id)] = None - - - def handle_lsu(self, msg, now): - if msg.id == self.id: - return - if msg.id in self.collection: - ls = self.collection[msg.id] - if ls.ls_seq < msg.ls_seq: - ls = msg.ls - self.collection[msg.id] = ls + """ + This module is responsible for running the Link State protocol and maintaining the set + of link states that are gathered from the domain. It notifies outbound when changes to + the link-state-collection are detected. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.ra_interval = self.container.config.ra_interval + self.remote_ls_max_age = self.container.config.remote_ls_max_age + self.last_ra_time = 0 + self.collection = {} + self.collection_changed = False + self.mobile_seq = 0 + self.needed_lsrs = {} + + + def tick(self, now): + self._expire_ls(now) + self._send_lsrs() + + if now - self.last_ra_time >= self.ra_interval: + self.last_ra_time = now + self._send_ra() + + if self.collection_changed: + self.collection_changed = False + self.container.log(LOG_INFO, "New Link-State Collection:") + for a,b in self.collection.items(): + self.container.log(LOG_INFO, " %s => %r" % (a, b.peers)) + self.container.ls_collection_changed(self.collection) + + + def handle_ra(self, msg, now): + if msg.id == self.id: + return + if msg.id in self.collection: + ls = self.collection[msg.id] + ls.last_seen = now + if ls.ls_seq < msg.ls_seq: + self.needed_lsrs[(msg.area, msg.id)] = None + else: + self.needed_lsrs[(msg.area, msg.id)] = None + + + def handle_lsu(self, msg, now): + if msg.id == self.id: + return + if msg.id in self.collection: + ls = self.collection[msg.id] + if ls.ls_seq < msg.ls_seq: + ls = msg.ls + self.collection[msg.id] = ls + self.collection_changed = True + ls.last_seen = now + else: + ls = msg.ls + self.collection[msg.id] = ls + self.collection_changed = True + ls.last_seen = now + self.container.new_node(msg.id) + self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) + # Schedule LSRs for any routers referenced in this LS that we don't know about + for _id in msg.ls.peers: + if _id not in self.collection: + self.container.new_node(_id) + self.needed_lsrs[(msg.area, _id)] = None + + + def handle_lsr(self, msg, now): + if msg.id == self.id: + return + if self.id not in self.collection: + return + my_ls = self.collection[self.id] + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) + + + def new_local_link_state(self, link_state): + self.collection[self.id] = link_state self.collection_changed = True - ls.last_seen = now - else: - ls = msg.ls - self.collection[msg.id] = ls - self.collection_changed = True - ls.last_seen = now - self.container.new_node(msg.id) - self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) - # Schedule LSRs for any routers referenced in this LS that we don't know about - for _id in msg.ls.peers: - if _id not in self.collection: - self.needed_lsrs[(msg.area, _id)] = None - - - def handle_lsr(self, msg, now): - if msg.id == self.id: - return - if self.id not in self.collection: - return - my_ls = self.collection[self.id] - self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) - - - def new_local_link_state(self, link_state): - self.collection[self.id] = link_state - self.collection_changed = True - self._send_ra() - - - def set_mobile_sequence(self, seq): - self.mobile_seq = seq - - - def get_collection(self): - return self.collection - - - def _expire_ls(self, now): - to_delete = [] - for key, ls in self.collection.items(): - if key != self.id and now - ls.last_seen > self.remote_ls_max_age: - to_delete.append(key) - for key in to_delete: - ls = self.collection.pop(key) - self.collection_changed = True - self.container.lost_node(key) - self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) - - - def _send_lsrs(self): - for (_area, _id) in self.needed_lsrs.keys(): - self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area)) - self.needed_lsrs = {} - - - def _send_ra(self): - ls_seq = 0 - if self.id in self.collection: - ls_seq = self.collection[self.id].ls_seq - self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) + self._send_ra() + + + def set_mobile_sequence(self, seq): + self.mobile_seq = seq + + + def get_collection(self): + return self.collection + + + def _expire_ls(self, now): + to_delete = [] + for key, ls in self.collection.items(): + if key != self.id and now - ls.last_seen > self.remote_ls_max_age: + to_delete.append(key) + for key in to_delete: + ls = self.collection.pop(key) + self.collection_changed = True + self.container.lost_node(key) + self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) + + + def _send_lsrs(self): + for (_area, _id) in self.needed_lsrs.keys(): + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageLSR(None, self.id, self.area)) + self.needed_lsrs = {} + + + def _send_ra(self): + ls_seq = 0 + if self.id in self.collection: + ls_seq = self.collection[self.id].ls_seq + self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py index ce80f38780..117cf98c22 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py @@ -20,169 +20,170 @@ from data import MessageRA, MessageMAR, MessageMAU try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class MobileAddressEngine(object): - """ - This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. - It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. - Note that this routing table maps from the mobile address to the remote router where that address - is directly bound. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.mobile_addr_max_age = self.container.config.mobile_addr_max_age - self.mobile_seq = 0 - self.local_keys = [] - self.added_keys = [] - self.deleted_keys = [] - self.remote_lists = {} # map router_id => (sequence, list of keys) - self.remote_last_seen = {} # map router_id => time of last seen advertizement/update - self.remote_changed = False - self.needed_mars = {} - - - def tick(self, now): - self._expire_remotes(now) - self._send_mars() - - ## - ## If local keys have changed, collect the changes and send a MAU with the diffs - ## Note: it is important that the differential-MAU be sent before a RA is sent - ## - if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: - self.mobile_seq += 1 - self.container.send('_topo.%s.all' % self.area, - MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) - self.local_keys.extend(self.added_keys) - for key in self.deleted_keys: - self.local_keys.remove(key) - self.added_keys = [] - self.deleted_keys = [] - self.container.mobile_sequence_changed(self.mobile_seq) - - ## - ## If remotes have changed, start the process of updating local bindings - ## - if self.remote_changed: - self.remote_changed = False - self._update_remote_keys() - - - def add_local_address(self, key): """ + This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. + It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. + Note that this routing table maps from the mobile address to the remote router where that address + is directly bound. """ - if self.local_keys.count(key) == 0: - if self.added_keys.count(key) == 0: - self.added_keys.append(key) - else: - if self.deleted_keys.count(key) > 0: - self.deleted_keys.remove(key) - - - def del_local_address(self, key): - """ - """ - if self.local_keys.count(key) > 0: - if self.deleted_keys.count(key) == 0: - self.deleted_keys.append(key) - else: - if self.added_keys.count(key) > 0: - self.added_keys.remove(key) - - - def handle_ra(self, msg, now): - if msg.id == self.id: - return - - if msg.mobile_seq == 0: - return - - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - self.remote_last_seen[msg.id] = now - if _seq < msg.mobile_seq: - self.needed_mars[(msg.id, msg.area, _seq)] = None - else: - self.needed_mars[(msg.id, msg.area, 0)] = None - - - def handle_mau(self, msg, now): - ## - ## If the MAU is differential, we can only use it if its sequence is exactly one greater - ## than our stored sequence. If not, we will ignore the content and schedule a MAR. - ## - ## If the MAU is absolute, we can use it in all cases. - ## - if msg.id == self.id: - return - - if msg.exist_list: - ## - ## Absolute MAU - ## - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - if _seq >= msg.mobile_seq: # ignore duplicates - return - self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) - self.remote_last_seen[msg.id] = now - self.remote_changed = True - else: - ## - ## Differential MAU - ## - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - if _seq == msg.mobile_seq: # ignore duplicates - return - self.remote_last_seen[msg.id] = now - if _seq + 1 == msg.mobile_seq: - ## - ## This is one greater than our stored value, incorporate the deltas - ## - if msg.add_list and msg.add_list.__class__ == list: - _list.extend(msg.add_list) - if msg.del_list and msg.del_list.__class__ == list: - for key in msg.del_list: - _list.remove(key) - self.remote_lists[msg.id] = (msg.mobile_seq, _list) - self.remote_changed = True + def __init__(self, container, node_tracker): + self.container = container + self.node_tracker = node_tracker + self.id = self.container.id + self.area = self.container.area + self.mobile_addr_max_age = self.container.config.mobile_addr_max_age + self.mobile_seq = 0 + self.local_addrs = [] + self.added_addrs = [] + self.deleted_addrs = [] + self.remote_lists = {} # map router_id => (sequence, list of addrs) + self.remote_last_seen = {} # map router_id => time of last seen advertizement/update + self.needed_mars = {} + + + def tick(self, now): + self._expire_remotes(now) + self._send_mars() + + ## + ## If local addrs have changed, collect the changes and send a MAU with the diffs + ## Note: it is important that the differential-MAU be sent before a RA is sent + ## + if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0: + self.mobile_seq += 1 + self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, + MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs)) + self.local_addrs.extend(self.added_addrs) + for addr in self.deleted_addrs: + self.local_addrs.remove(addr) + self.added_addrs = [] + self.deleted_addrs = [] + self.container.mobile_sequence_changed(self.mobile_seq) + + + def add_local_address(self, addr): + """ + """ + if self.local_addrs.count(addr) == 0: + if self.added_addrs.count(addr) == 0: + self.added_addrs.append(addr) else: - self.needed_mars[(msg.id, msg.area, _seq)] = None - else: - self.needed_mars[(msg.id, msg.area, 0)] = None - + if self.deleted_addrs.count(addr) > 0: + self.deleted_addrs.remove(addr) - def handle_mar(self, msg, now): - if msg.id == self.id: - return - if msg.have_seq < self.mobile_seq: - self.container.send('_topo.%s.%s' % (msg.area, msg.id), - MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) - - def _update_remote_keys(self): - keys = {} - for _id,(seq,key_list) in self.remote_lists.items(): - keys[_id] = key_list - self.container.mobile_keys_changed(keys) + def del_local_address(self, addr): + """ + """ + if self.local_addrs.count(addr) > 0: + if self.deleted_addrs.count(addr) == 0: + self.deleted_addrs.append(addr) + else: + if self.added_addrs.count(addr) > 0: + self.added_addrs.remove(addr) - def _expire_remotes(self, now): - for _id, t in self.remote_last_seen.items(): - if now - t > self.mobile_addr_max_age: - self.remote_lists.pop(_id) - self.remote_last_seen.pop(_id) - self.remote_changed = True + def handle_ra(self, msg, now): + if msg.id == self.id: + return + if msg.mobile_seq == 0: + return - def _send_mars(self): - for _id, _area, _seq in self.needed_mars.keys(): - self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) - self.needed_mars = {} + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + self.remote_last_seen[msg.id] = now + if _seq < msg.mobile_seq: + self.needed_mars[(msg.id, msg.area, _seq)] = None + else: + self.needed_mars[(msg.id, msg.area, 0)] = None + + + def handle_mau(self, msg, now): + ## + ## If the MAU is differential, we can only use it if its sequence is exactly one greater + ## than our stored sequence. If not, we will ignore the content and schedule a MAR. + ## + ## If the MAU is absolute, we can use it in all cases. + ## + if msg.id == self.id: + return + + if msg.exist_list: + ## + ## Absolute MAU + ## + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + if _seq >= msg.mobile_seq: # ignore duplicates + return + self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) + self.remote_last_seen[msg.id] = now + (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list) + self._activate_remotes(msg.id, add_list, del_list) + else: + ## + ## Differential MAU + ## + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + if _seq == msg.mobile_seq: # ignore duplicates + return + self.remote_last_seen[msg.id] = now + if _seq + 1 == msg.mobile_seq: + ## + ## This is one greater than our stored value, incorporate the deltas + ## + if msg.add_list and msg.add_list.__class__ == list: + _list.extend(msg.add_list) + if msg.del_list and msg.del_list.__class__ == list: + for addr in msg.del_list: + _list.remove(addr) + self.remote_lists[msg.id] = (msg.mobile_seq, _list) + if msg.add_list: + self.node_tracker.add_addresses(msg.id, msg.add_list) + if msg.del_list: + self.node_tracker.del_addresses(msg.id, msg.del_list) + self._activate_remotes(msg.id, msg.add_list, msg.del_list) + else: + self.needed_mars[(msg.id, msg.area, _seq)] = None + else: + self.needed_mars[(msg.id, msg.area, 0)] = None + + + def handle_mar(self, msg, now): + if msg.id == self.id: + return + if msg.have_seq < self.mobile_seq: + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), + MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs)) + + + def _expire_remotes(self, now): + for _id, t in self.remote_last_seen.items(): + if now - t > self.mobile_addr_max_age: + self.remote_lists.pop(_id) + self.remote_last_seen.pop(_id) + self.remote_changed = True + + + def _send_mars(self): + for _id, _area, _seq in self.needed_mars.keys(): + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) + self.needed_mars = {} + + + def _activate_remotes(self, _id, added, deleted): + bit = self.node_tracker.maskbit_for_node(_id) + if added: + for a in added: + self.container.router_adapter.map_destination(a, bit) + if deleted: + for d in deleted: + self.container.router_adapter.unmap_destination(d, bit) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py index ea8bacd660..37b7888efe 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py @@ -21,63 +21,63 @@ from data import LinkState, MessageHELLO from time import time try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class NeighborEngine(object): - """ - This module is responsible for maintaining this router's link-state. It runs the HELLO protocol - with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the - link-state) changes. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.last_hello_time = 0.0 - self.hello_interval = container.config.hello_interval - self.hello_max_age = container.config.hello_max_age - self.hellos = {} - self.link_state_changed = False - self.link_state = LinkState(None, self.id, self.area, 0, []) + """ + This module is responsible for maintaining this router's link-state. It runs the HELLO protocol + with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the + link-state) changes. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.last_hello_time = 0.0 + self.hello_interval = container.config.hello_interval + self.hello_max_age = container.config.hello_max_age + self.hellos = {} + self.link_state_changed = False + self.link_state = LinkState(None, self.id, self.area, 0, []) - def tick(self, now): - self._expire_hellos(now) + def tick(self, now): + self._expire_hellos(now) - if now - self.last_hello_time >= self.hello_interval: - self.last_hello_time = now - self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys())) + if now - self.last_hello_time >= self.hello_interval: + self.last_hello_time = now + self.container.send('amqp:/_local/qdxhello', MessageHELLO(None, self.id, self.area, self.hellos.keys())) - if self.link_state_changed: - self.link_state_changed = False - self.link_state.bump_sequence() - self.container.local_link_state_changed(self.link_state) + if self.link_state_changed: + self.link_state_changed = False + self.link_state.bump_sequence() + self.container.local_link_state_changed(self.link_state) - def handle_hello(self, msg, now): - if msg.id == self.id: - return - self.hellos[msg.id] = now - if msg.is_seen(self.id): - if self.link_state.add_peer(msg.id): - self.link_state_changed = True - self.container.new_neighbor(msg.id) - self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id) - ## - ## TODO - Use this function to detect area boundaries - ## + def handle_hello(self, msg, now, link_id): + if msg.id == self.id: + return + self.hellos[msg.id] = now + if msg.is_seen(self.id): + if self.link_state.add_peer(msg.id): + self.link_state_changed = True + self.container.new_neighbor(msg.id, link_id) + self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id)) + ## + ## TODO - Use this function to detect area boundaries + ## - def _expire_hellos(self, now): - to_delete = [] - for key, last_seen in self.hellos.items(): - if now - last_seen > self.hello_max_age: - to_delete.append(key) - for key in to_delete: - self.hellos.pop(key) - if self.link_state.del_peer(key): - self.link_state_changed = True - self.container.lost_neighbor(key) - self.container.log(LOG_INFO, "Neighbor lost: %s" % key) + def _expire_hellos(self, now): + to_delete = [] + for key, last_seen in self.hellos.items(): + if now - last_seen > self.hello_max_age: + to_delete.append(key) + for key in to_delete: + self.hellos.pop(key) + if self.link_state.del_peer(key): + self.link_state_changed = True + self.container.lost_neighbor(key) + self.container.log(LOG_INFO, "Neighbor lost: %s" % key) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py index c90f7f4232..ef697428da 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py @@ -18,86 +18,157 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class NodeTracker(object): - """ - This module is responsible for tracking the set of router nodes that are known to this - router. It tracks whether they are neighbor or remote and whether they are reachable. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.nodes = {} # id => RemoteNode - - - def tick(self, now): - pass - - - def new_neighbor(self, node_id): - if node_id not in self.nodes: - self.nodes[node_id] = RemoteNode(node_id) - self.nodes[node_id].set_neighbor() - self._notify(self.nodes[node_id]) - - - def lost_neighbor(self, node_id): - node = self.nodes[node_id] - node.clear_neighbor() - self._notify(node) - if node.to_delete(): - self.nodes.pop(node_id) - - - def new_node(self, node_id): - if node_id not in self.nodes: - self.nodes[node_id] = RemoteNode(node_id) - self.nodes[node_id].set_remote() - self._notify(self.nodes[node_id]) - - - def lost_node(self, node_id): - node = self.nodes[node_id] - node.clear_remote() - self._notify(node) - if node.to_delete(): - self.nodes.pop(node_id) - - - def _notify(self, node): - if node.to_delete(): - self.container.adapter.node_updated("R%s" % node.id, 0, 0) - else: - is_neighbor = 0 - if node.neighbor: - is_neighbor = 1 - self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor) + """ + This module is responsible for tracking the set of router nodes that are known to this + router. It tracks whether they are neighbor or remote and whether they are reachable. + + This module is also responsible for assigning a unique mask bit value to each router. + The mask bit is used in the main router to represent sets of valid destinations for addresses. + """ + def __init__(self, container, max_routers): + self.container = container + self.max_routers = max_routers + self.nodes = {} # id => RemoteNode + self.maskbits = [] + self.next_maskbit = 1 # Reserve bit '0' to represent this router + for i in range(max_routers): + self.maskbits.append(None) + self.maskbits[0] = True + + + def tick(self, now): + pass + + + def new_neighbor(self, node_id, link_maskbit): + """ + A node, designated by node_id, has been discovered as a neighbor over a link with + a maskbit of link_maskbit. + """ + if node_id in self.nodes: + node = self.nodes[node_id] + if node.neighbor: + return + self.container.del_remote_router(node.maskbit) + node.neighbor = True + else: + node = RemoteNode(node_id, self._allocate_maskbit(), True) + self.nodes[node_id] = node + self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit) + + + def lost_neighbor(self, node_id): + """ + We have lost contact with a neighboring node node_id. + """ + node = self.nodes[node_id] + node.neighbor = False + self.container.del_neighbor_router(node.maskbit) + if node.remote: + self.container.add_remote_router(self._address(node.id), node.maskbit) + else: + self._free_maskbit(node.maskbit) + self.nodes.pop(node_id) + + + def new_node(self, node_id): + """ + A node, designated by node_id, has been discovered through the an advertisement from a + remote peer. + """ + if node_id not in self.nodes: + node = RemoteNode(node_id, self._allocate_maskbit(), False) + self.nodes[node_id] = node + self.container.add_remote_router(self._address(node.id), node.maskbit) + else: + node = self.nodes[node_id] + node.remote = True + + + def lost_node(self, node_id): + """ + A remote node, node_id, has not been heard from for too long and is being deemed lost. + """ + node = self.nodes[node_id] + if node.remote: + node.remote = False + if not node.neighbor: + self.container.del_remote_router(node.maskbit) + self._free_maskbit(node.maskbit) + self.nodes.pop(node_id) + + + def maskbit_for_node(self, node_id): + """ + """ + node = self.nodes[node_id] + if node: + return node.maskbit + return None + + + def add_addresses(self, node_id, addrs): + node = self.nodes[node_id] + for a in addrs: + node.addrs[a] = 1 + + + def del_addresses(self, node_id, addrs): + node = self.nodes[node_id] + for a in addrs: + node.addrs.pop(a) + + + def overwrite_addresses(self, node_id, addrs): + node = self.nodes[node_id] + added = [] + deleted = [] + for a in addrs: + if a not in node.addrs.keys(): + added.append(a) + for a in node.addrs.keys(): + if a not in addrs: + deleted.append(a) + for a in addrs: + node.addrs[a] = 1 + return (added, deleted) + + + def _allocate_maskbit(self): + if self.next_maskbit == None: + raise Exception("Exceeded Maximum Router Count") + result = self.next_maskbit + self.next_maskbit = None + self.maskbits[result] = True + for n in range(result + 1, self.max_routers): + if self.maskbits[n] == None: + self.next_maskbit = n + break + return result + + + def _free_maskbit(self, i): + self.maskbits[i] = None + if self.next_maskbit == None or i < self.next_maskbit: + self.next_maskbit = i + + + def _address(self, node_id): + return "amqp:/_topo/%s/%s" % (self.container.area, node_id) class RemoteNode(object): - def __init__(self, node_id): - self.id = node_id - self.neighbor = None - self.remote = None - - def set_neighbor(self): - self.neighbor = True - - def set_remote(self): - self.remote = True - - def clear_neighbor(self): - self.neighbor = None - - def clear_remote(self): - self.remote = None - - def to_delete(self): - return self.neighbor or self.remote + def __init__(self, node_id, maskbit, neighbor): + self.id = node_id + self.maskbit = maskbit + self.neighbor = neighbor + self.remote = not neighbor + self.addrs = {} # Address => Count at Node (1 only for the present) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/path.py b/qpid/extras/dispatch/python/qpid/dispatch/router/path.py index c051dbe7fc..da04474e75 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/path.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/path.py @@ -18,185 +18,218 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class PathEngine(object): - """ - This module is responsible for computing the next-hop for every router/area in the domain - based on the collection of link states that have been gathered. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.recalculate = False - self.collection = None - - - def tick(self, now_unused): - if self.recalculate: - self.recalculate = False - self._calculate_routes() - - - def ls_collection_changed(self, collection): - self.recalculate = True - self.collection = collection - - - def _calculate_tree_from_root(self, root): - ## - ## Make a copy of the current collection of link-states that contains - ## an empty link-state for nodes that are known-peers but are not in the - ## collection currently. This is needed to establish routes to those nodes - ## so we can trade link-state information with them. - ## - link_states = {} - for _id, ls in self.collection.items(): - link_states[_id] = ls.peers - for p in ls.peers: - if p not in link_states: - link_states[p] = [] - - ## - ## Setup Dijkstra's Algorithm - ## - cost = {} - prev = {} - for _id in link_states: - cost[_id] = None # infinite - prev[_id] = None # undefined - cost[root] = 0 # no cost to the root node - unresolved = NodeSet(cost) - - ## - ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found. - ## - while not unresolved.empty(): - u = unresolved.lowest_cost() - if cost[u] == None: - # There are no more reachable nodes in unresolved - break - for v in link_states[u]: - if unresolved.contains(v): - alt = cost[u] + 1 # TODO - Use link cost instead of 1 - if cost[v] == None or alt < cost[v]: - cost[v] = alt - prev[v] = u - unresolved.set_cost(v, alt) - - ## - ## Remove unreachable nodes from the map. Note that this will also remove the - ## root node (has no previous node) from the map. - ## - for u, val in prev.items(): - if not val: - prev.pop(u) - - ## - ## Return previous-node map. This is a map of all reachable, remote nodes to - ## their predecessor node. - ## - return prev - - - def _calculate_routes(self): - ## - ## Generate the shortest-path tree with the local node as root - ## - prev = self._calculate_tree_from_root(self.id) - nodes = prev.keys() - - ## - ## Distill the path tree into a map of next hops for each node - ## - next_hops = {} - while len(nodes) > 0: - u = nodes[0] # pick any destination - path = [u] - nodes.remove(u) - v = prev[u] - while v != self.id: # build a list of nodes in the path back to the root - if v in nodes: - path.append(v) - nodes.remove(v) - u = v - v = prev[u] - for w in path: # mark each node in the path as reachable via the next hop - next_hops[w] = u - - ## - ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest - ## for which the path from origin to dest passes through us. This is the set - ## of valid origins for forwarding to the destination. - ## - - self.container.next_hops_changed(next_hops) + """ + This module is responsible for computing the next-hop for every router/area in the domain + based on the collection of link states that have been gathered. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.recalculate = False + self.collection = None -class NodeSet(object): - """ - This data structure is an ordered list of node IDs, sorted in increasing order by their cost. - Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and - repeatable ordering. - """ - def __init__(self, cost_map): - self.nodes = [] - for _id, cost in cost_map.items(): - ## - ## Assume that nodes are either unreachable (cost = None) or local (cost = 0) - ## during this initialization. - ## - if cost == 0: - self.nodes.insert(0, (_id, cost)) - else: + def tick(self, now_unused): + if self.recalculate: + self.recalculate = False + self._calculate_routes() + + + def ls_collection_changed(self, collection): + self.recalculate = True + self.collection = collection + + + def _calculate_tree_from_root(self, root): ## - ## There is no need to sort unreachable nodes by ID + ## Make a copy of the current collection of link-states that contains + ## a fake link-state for nodes that are known-peers but are not in the + ## collection currently. This is needed to establish routes to those nodes + ## so we can trade link-state information with them. ## - self.nodes.append((_id, cost)) + link_states = {} + for _id, ls in self.collection.items(): + link_states[_id] = ls.peers + for p in ls.peers: + if p not in link_states: + link_states[p] = [_id] + ## + ## Setup Dijkstra's Algorithm + ## + cost = {} + prev = {} + for _id in link_states: + cost[_id] = None # infinite + prev[_id] = None # undefined + cost[root] = 0 # no cost to the root node + unresolved = NodeSet(cost) - def __repr__(self): - return self.nodes.__repr__() + ## + ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found. + ## + while not unresolved.empty(): + u = unresolved.lowest_cost() + if cost[u] == None: + # There are no more reachable nodes in unresolved + break + for v in link_states[u]: + if unresolved.contains(v): + alt = cost[u] + 1 # TODO - Use link cost instead of 1 + if cost[v] == None or alt < cost[v]: + cost[v] = alt + prev[v] = u + unresolved.set_cost(v, alt) + + ## + ## Remove unreachable nodes from the map. Note that this will also remove the + ## root node (has no previous node) from the map. + ## + for u, val in prev.items(): + if not val: + prev.pop(u) + ## + ## Return previous-node map. This is a map of all reachable, remote nodes to + ## their predecessor node. + ## + return prev - def empty(self): - return len(self.nodes) == 0 + def _calculate_valid_origins(self, nodeset): + ## + ## Calculate the tree from each origin, determine the set of origins-per-dest + ## for which the path from origin to dest passes through us. This is the set + ## of valid origins for forwarding to the destination. + ## + valid_origin = {} # Map of destination => List of Valid Origins + for node in nodeset: + if node != self.id: + valid_origin[node] = [] + + for root in valid_origin.keys(): + prev = self._calculate_tree_from_root(root) + nodes = prev.keys() + while len(nodes) > 0: + u = nodes[0] + path = [u] + nodes.remove(u) + v = prev[u] + while v != root: + if v in nodes: + if v != self.id: + path.append(v) + nodes.remove(v) + if v == self.id: + valid_origin[root].extend(path) + u = v + v = prev[u] + return valid_origin + + + def _calculate_routes(self): + ## + ## Generate the shortest-path tree with the local node as root + ## + prev = self._calculate_tree_from_root(self.id) + nodes = prev.keys() - def contains(self, _id): - for a, b in self.nodes: - if a == _id: - return True - return False + ## + ## Distill the path tree into a map of next hops for each node + ## + next_hops = {} + while len(nodes) > 0: + u = nodes[0] # pick any destination + path = [u] + nodes.remove(u) + v = prev[u] + while v != self.id: # build a list of nodes in the path back to the root + if v in nodes: + path.append(v) + nodes.remove(v) + u = v + v = prev[u] + for w in path: # mark each node in the path as reachable via the next hop + next_hops[w] = u + + self.container.next_hops_changed(next_hops) + ## + ## Calculate the valid origins for remote routers + ## + valid_origin = self._calculate_valid_origins(prev.keys()) + self.container.valid_origins_changed(valid_origin) - def lowest_cost(self): - """ - Remove and return the lowest cost node ID. - """ - _id, cost = self.nodes.pop(0) - return _id - def set_cost(self, _id, new_cost): +class NodeSet(object): """ - Set the cost for an ID in the NodeSet and re-insert the ID so that the list - remains sorted in increasing cost order. + This data structure is an ordered list of node IDs, sorted in increasing order by their cost. + Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and + repeatable ordering. """ - index = 0 - for i, c in self.nodes: - if i == _id: - break - index += 1 - self.nodes.pop(index) - - index = 0 - for i, c in self.nodes: - if c == None or new_cost < c or (new_cost == c and _id < i): - break - index += 1 - - self.nodes.insert(index, (_id, new_cost)) + def __init__(self, cost_map): + self.nodes = [] + for _id, cost in cost_map.items(): + ## + ## Assume that nodes are either unreachable (cost = None) or local (cost = 0) + ## during this initialization. + ## + if cost == 0: + self.nodes.insert(0, (_id, cost)) + else: + ## + ## There is no need to sort unreachable nodes by ID + ## + self.nodes.append((_id, cost)) + + + def __repr__(self): + return self.nodes.__repr__() + + + def empty(self): + return len(self.nodes) == 0 + + + def contains(self, _id): + for a, b in self.nodes: + if a == _id: + return True + return False + + + def lowest_cost(self): + """ + Remove and return the lowest cost node ID. + """ + _id, cost = self.nodes.pop(0) + return _id + + + def set_cost(self, _id, new_cost): + """ + Set the cost for an ID in the NodeSet and re-insert the ID so that the list + remains sorted in increasing cost order. + """ + index = 0 + for i, c in self.nodes: + if i == _id: + break + index += 1 + self.nodes.pop(index) + + index = 0 + for i, c in self.nodes: + if c == None or new_cost < c or (new_cost == c and _id < i): + break + index += 1 + + self.nodes.insert(index, (_id, new_cost)) + diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py index 18b48379c5..f8b2dd9c94 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py @@ -27,254 +27,267 @@ from link import LinkStateEngine from path import PathEngine from mobile import MobileAddressEngine from routing import RoutingTableEngine -from binding import BindingEngine -from adapter import AdapterEngine from node import NodeTracker +import sys +import traceback + ## ## Import the Dispatch adapters from the environment. If they are not found ## (i.e. we are in a test bench, etc.), load the stub versions. ## try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class RouterEngine: - """ - """ - - def __init__(self, router_adapter, router_id=None, area='area', config_override={}): - """ - Initialize an instance of a router for a domain. - """ - ## - ## Record important information about this router instance - ## - self.domain = "domain" - self.router_adapter = router_adapter - self.log_adapter = LogAdapter("dispatch.router") - self.io_adapter = IoAdapter(self, "qdxrouter") - - if router_id: - self.id = router_id - else: - self.id = str(uuid4()) - self.area = area - self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) - - ## - ## Setup configuration - ## - self.config = Configuration(config_override) - self.log(LOG_INFO, "Config: %r" % self.config) - - ## - ## Launch the sub-module engines - ## - self.neighbor_engine = NeighborEngine(self) - self.link_state_engine = LinkStateEngine(self) - self.path_engine = PathEngine(self) - self.mobile_address_engine = MobileAddressEngine(self) - self.routing_table_engine = RoutingTableEngine(self) - self.binding_engine = BindingEngine(self) - self.adapter_engine = AdapterEngine(self) - self.node_tracker = NodeTracker(self) - - - - ##======================================================================================== - ## Adapter Entry Points - invoked from the adapter - ##======================================================================================== - def getId(self): - """ - Return the router's ID - """ - return self.id - - - def addLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.add_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) - - def delLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.del_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) - - - def handleTimerTick(self): - """ - """ - try: - now = time() - self.neighbor_engine.tick(now) - self.link_state_engine.tick(now) - self.path_engine.tick(now) - self.mobile_address_engine.tick(now) - self.routing_table_engine.tick(now) - self.binding_engine.tick(now) - self.adapter_engine.tick(now) - self.node_tracker.tick(now) - except Exception, e: - self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) - - - def handleControlMessage(self, opcode, body): - """ - """ - try: - now = time() - if opcode == 'HELLO': - msg = MessageHELLO(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.neighbor_engine.handle_hello(msg, now) - - elif opcode == 'RA': - msg = MessageRA(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_ra(msg, now) - self.mobile_address_engine.handle_ra(msg, now) - - elif opcode == 'LSU': - msg = MessageLSU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsu(msg, now) - - elif opcode == 'LSR': - msg = MessageLSR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsr(msg, now) - - elif opcode == 'MAU': - msg = MessageMAU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mau(msg, now) - - elif opcode == 'MAR': - msg = MessageMAR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mar(msg, now) - - except Exception, e: - self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) - - - def receive(self, message_properties, body): - """ - This is the IoAdapter message-receive handler - """ - try: - self.handleControlMessage(message_properties['opcode'], body) - except Exception, e: - self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % - (message_properties, body, e)) - - def getRouterData(self, kind): """ """ - if kind == 'help': - return { 'help' : "Get list of supported values for kind", - 'link-state' : "This router's link state", - 'link-state-set' : "The set of link states from known routers", - 'next-hops' : "Next hops to each known router", - 'topo-table' : "Topological routing table", - 'mobile-table' : "Mobile key routing table" - } - if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() - if kind == 'next-hops' : return self.routing_table_engine.next_hops - if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} - if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} - if kind == 'link-state-set' : - copy = {} - for _id,_ls in self.link_state_engine.collection.items(): - copy[_id] = _ls.to_dict() - return copy - - return {'notice':'Use kind="help" to get a list of possibilities'} - - - ##======================================================================================== - ## Adapter Calls - outbound calls to Dispatch - ##======================================================================================== - def log(self, level, text): - """ - Emit a log message to the host's event log - """ - self.log_adapter.log(level, text) - - - def send(self, dest, msg): - """ - Send a control message to another router. - """ - app_props = {'opcode' : msg.get_opcode() } - self.io_adapter.send(dest, app_props, msg.to_dict()) - self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) - - - def node_updated(self, addr, reachable, neighbor): - """ - """ - self.router_adapter(addr, reachable, neighbor) - - - ##======================================================================================== - ## Interconnect between the Sub-Modules - ##======================================================================================== - def local_link_state_changed(self, link_state): - self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) - self.link_state_engine.new_local_link_state(link_state) - - def ls_collection_changed(self, collection): - self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) - self.path_engine.ls_collection_changed(collection) - - def next_hops_changed(self, next_hop_table): - self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) - self.routing_table_engine.next_hops_changed(next_hop_table) - self.binding_engine.next_hops_changed() - - def mobile_sequence_changed(self, mobile_seq): - self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) - self.link_state_engine.set_mobile_sequence(mobile_seq) - - def mobile_keys_changed(self, keys): - self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) - self.binding_engine.mobile_keys_changed(keys) - - def get_next_hops(self): - return self.routing_table_engine.get_next_hops() - - def remote_routes_changed(self, key_class, routes): - self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) - self.adapter_engine.remote_routes_changed(key_class, routes) - - def new_neighbor(self, rid): - self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid) - self.node_tracker.new_neighbor(rid) - - def lost_neighbor(self, rid): - self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) - self.node_tracker.lost_neighbor(rid) - - def new_node(self, rid): - self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) - self.node_tracker.new_node(rid) - def lost_node(self, rid): - self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) - self.node_tracker.lost_node(rid) + def __init__(self, router_adapter, router_id, area, max_routers, config_override={}): + """ + Initialize an instance of a router for a domain. + """ + ## + ## Record important information about this router instance + ## + self.domain = "domain" + self.router_adapter = router_adapter + self.log_adapter = LogAdapter("dispatch.router") + self.io_adapter = IoAdapter(self, ("qdxrouter", "qdxhello")) + self.max_routers = max_routers + self.id = router_id + self.area = area + self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" % + (self.area, self.id, self.max_routers)) + + ## + ## Setup configuration + ## + self.config = Configuration(config_override) + self.log(LOG_INFO, "Config: %r" % self.config) + + ## + ## Launch the sub-module engines + ## + self.node_tracker = NodeTracker(self, self.max_routers) + self.neighbor_engine = NeighborEngine(self) + self.link_state_engine = LinkStateEngine(self) + self.path_engine = PathEngine(self) + self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker) + self.routing_table_engine = RoutingTableEngine(self, self.node_tracker) + + + + ##======================================================================================== + ## Adapter Entry Points - invoked from the adapter + ##======================================================================================== + def getId(self): + """ + Return the router's ID + """ + return self.id + + + def addressAdded(self, addr): + """ + """ + try: + if addr.find('Mtemp.') == 0: + return + if addr.find('M') == 0: + self.mobile_address_engine.add_local_address(addr[1:]) + except Exception, e: + self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) + + + def addressRemoved(self, addr): + """ + """ + try: + if addr.find('Mtemp.') == 0: + return + if addr.find('M') == 0: + self.mobile_address_engine.del_local_address(addr[1:]) + except Exception, e: + self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) + + + def handleTimerTick(self): + """ + """ + try: + now = time() + self.neighbor_engine.tick(now) + self.link_state_engine.tick(now) + self.path_engine.tick(now) + self.mobile_address_engine.tick(now) + self.routing_table_engine.tick(now) + self.node_tracker.tick(now) + except Exception, e: + self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) + + + def handleControlMessage(self, opcode, body, link_id): + """ + """ + try: + now = time() + if opcode == 'HELLO': + msg = MessageHELLO(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.neighbor_engine.handle_hello(msg, now, link_id) + + elif opcode == 'RA': + msg = MessageRA(body) + self.log(LOG_DEBUG, "RCVD: %r" % msg) + self.link_state_engine.handle_ra(msg, now) + self.mobile_address_engine.handle_ra(msg, now) + + elif opcode == 'LSU': + msg = MessageLSU(body) + self.log(LOG_DEBUG, "RCVD: %r" % msg) + self.link_state_engine.handle_lsu(msg, now) + + elif opcode == 'LSR': + msg = MessageLSR(body) + self.log(LOG_DEBUG, "RCVD: %r" % msg) + self.link_state_engine.handle_lsr(msg, now) + + elif opcode == 'MAU': + msg = MessageMAU(body) + self.log(LOG_DEBUG, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mau(msg, now) + + elif opcode == 'MAR': + msg = MessageMAR(body) + self.log(LOG_DEBUG, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mar(msg, now) + + except Exception, e: + self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) + + + def receive(self, message_properties, body, link_id): + """ + This is the IoAdapter message-receive handler + """ + try: + #self.log(LOG_DEBUG, "Raw Receive: mp=%r body=%r link_id=%r" % (message_properties, body, link_id)) + self.handleControlMessage(message_properties['opcode'], body, link_id) + except Exception, e: + self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % + (message_properties, body, e)) + + + def getRouterData(self, kind): + """ + """ + if kind == 'help': + return { 'help' : "Get list of supported values for kind", + 'link-state' : "This router's link state", + 'link-state-set' : "The set of link states from known routers", + 'next-hops' : "Next hops to each known router" + } + if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() + if kind == 'next-hops' : return self.routing_table_engine.next_hops + if kind == 'link-state-set' : + copy = {} + for _id,_ls in self.link_state_engine.collection.items(): + copy[_id] = _ls.to_dict() + return copy + + return {'notice':'Use kind="help" to get a list of possibilities'} + + + ##======================================================================================== + ## Adapter Calls - outbound calls to Dispatch + ##======================================================================================== + def log(self, level, text): + """ + Emit a log message to the host's event log + """ + self.log_adapter.log(level, text) + + + def send(self, dest, msg): + """ + Send a control message to another router. + """ + app_props = {'opcode' : msg.get_opcode() } + self.io_adapter.send(dest, app_props, msg.to_dict()) + if "qdxhello" in dest: + self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) + else: + self.log(LOG_DEBUG, "SENT: %r dest=%s" % (msg, dest)) + + + def node_updated(self, addr, reachable, neighbor): + """ + """ + self.router_adapter(addr, reachable, neighbor) + + + ##======================================================================================== + ## Interconnect between the Sub-Modules + ##======================================================================================== + def local_link_state_changed(self, link_state): + self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) + self.link_state_engine.new_local_link_state(link_state) + + def ls_collection_changed(self, collection): + self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) + self.path_engine.ls_collection_changed(collection) + + def next_hops_changed(self, next_hop_table): + self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) + self.routing_table_engine.next_hops_changed(next_hop_table) + + def valid_origins_changed(self, valid_origins): + self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins) + self.routing_table_engine.valid_origins_changed(valid_origins) + + def mobile_sequence_changed(self, mobile_seq): + self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) + self.link_state_engine.set_mobile_sequence(mobile_seq) + + def get_next_hops(self): + return self.routing_table_engine.get_next_hops() + + def new_neighbor(self, rid, link_id): + self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id)) + self.node_tracker.new_neighbor(rid, link_id) + + def lost_neighbor(self, rid): + self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) + self.node_tracker.lost_neighbor(rid) + + def new_node(self, rid): + self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) + self.node_tracker.new_node(rid) + + def lost_node(self, rid): + self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) + self.node_tracker.lost_node(rid) + + def add_neighbor_router(self, address, router_bit, link_bit): + self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, router_bit=%d, link_bit=%d" % \ + (address, router_bit, link_bit)) + self.router_adapter.add_neighbor_router(address, router_bit, link_bit) + + def del_neighbor_router(self, router_bit): + self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % router_bit) + self.router_adapter.del_neighbor_router(router_bit) + + def add_remote_router(self, address, router_bit): + self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, router_bit=%d" % (address, router_bit)) + self.router_adapter.add_remote_router(address, router_bit) + + def del_remote_router(self, router_bit): + self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % router_bit) + self.router_adapter.del_remote_router(router_bit) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py index 8030582177..a4b3e5484a 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py @@ -18,39 +18,45 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class RoutingTableEngine(object): - """ - This module is responsible for converting the set of next hops to remote routers to a routing - table in the "topological" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.next_hops = {} - - - def tick(self, now): - pass - - - def next_hops_changed(self, next_hops): - # Convert next_hops into routing table - self.next_hops = next_hops - new_table = [] - for _id, next_hop in next_hops.items(): - new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) - pair = ('_topo.%s.all' % (self.area), next_hop) - if new_table.count(pair) == 0: - new_table.append(pair) - - self.container.remote_routes_changed('topological', new_table) - - - def get_next_hops(self): - return self.next_hops + """ + This module is responsible for converting the set of next hops to remote routers to a routing + table in the "topological" address class. + """ + def __init__(self, container, node_tracker): + self.container = container + self.node_tracker = node_tracker + self.id = self.container.id + self.area = self.container.area + self.next_hops = {} + + + def tick(self, now): + pass + + + def next_hops_changed(self, next_hops): + # Convert next_hops into routing table + self.next_hops = next_hops + for _id, next_hop in next_hops.items(): + mb_id = self.node_tracker.maskbit_for_node(_id) + mb_nh = self.node_tracker.maskbit_for_node(next_hop) + self.container.router_adapter.set_next_hop(mb_id, mb_nh) + + + def valid_origins_changed(self, valid_origins): + for _id, vo in valid_origins.items(): + mb_id = self.node_tracker.maskbit_for_node(_id) + mb_vo = [] + for o in vo: + mb_vo.append(self.node_tracker.maskbit_for_node(o)) + self.container.router_adapter.set_valid_origins(mb_id, mb_vo) + + + def get_next_hops(self): + return self.next_hops |
