diff options
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 51 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 2 |
2 files changed, 35 insertions, 18 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index ff801c2282..74ed038b0e 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -122,9 +122,9 @@ class SessionState: self.destinations = {} - def write_query(self, query, handler): + def write_query(self, query, handler, obj): id = self.sent - self.write_cmd(query, lambda: handler(self.results.pop(id))) + self.write_cmd(query, lambda: handler(self.results.pop(id), obj)) def apply_overrides(self, cmd, overrides): for k, v in overrides.items(): @@ -1011,7 +1011,7 @@ class Engine: return self.resolve(sst, lnk.name, do_resolved, node_type=requested_type, force=declare) - def resolve(self, sst, name, action, force=False, node_type=None): + def resolve(self, sst, name, action, force=False, node_type=None, delete=False): if not force and not node_type: try: type, subtype = self.address_cache[name] @@ -1020,35 +1020,50 @@ class Engine: except KeyError: pass - args = [] - def do_result(r): - args.append(r) - def do_action(r): - do_result(r) - er, qr = args - if node_type == "topic" and not er.not_found: + args = { "topic":None, "queue":None } + def do_result(r, obj): + args[obj] = r + def do_action(): + er = args["topic"] + qr = args["queue"] + if node_type == "topic" and er and not er.not_found: type, subtype = "topic", er.type - elif node_type == "queue" and qr.queue: + elif node_type == "queue" and qr and qr.queue: type, subtype = "queue", None - elif er.not_found and not qr.queue: + elif (er and er.not_found) and qr and not qr.queue: type, subtype = None, None - elif qr.queue: + elif (qr and qr.queue): if node_type == "topic" and force: type, subtype = None, None else: type, subtype = "queue", None - elif not er.not_found: + elif (er and not er.not_found): if node_type == "queue" and force: type, subtype = None, None else: type, subtype = "topic", er.type + elif er: + if er.not_found: + type, subtype = None, None + else: + type, subtype = "topic", er.type else: - type, subtype = "topic", er.type + type, subtype = None, None if type is not None: self.address_cache[name] = (type, subtype) action(type, subtype) - sst.write_query(ExchangeQuery(name), do_result) - sst.write_query(QueueQuery(name), do_action) + def do_result_and_action(r, obj): + do_result(r, obj) + do_action() + if (node_type is None): # we don't know the type, let check broker + sst.write_query(ExchangeQuery(name), do_result, "topic") + sst.write_query(QueueQuery(name), do_result_and_action, "queue") + elif force and not delete: # we forcefully declare known type, dont ask broker + do_action() + elif node_type == "topic": + sst.write_query(ExchangeQuery(name), do_result_and_action, "topic") + else: + sst.write_query(QueueQuery(name), do_result_and_action, "queue") def declare(self, sst, lnk, action, create_node): name = lnk.name @@ -1102,7 +1117,7 @@ class Engine: action() else: raise ValueError(type) - self.resolve(sst, name, do_delete, force=True, node_type=node_type) + self.resolve(sst, name, do_delete, force=True, node_type=node_type, delete=True) def process(self, ssn): if ssn.closed or ssn.closing: return diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 56722374e5..d0103ee32c 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -1230,6 +1230,8 @@ test-link-bindings-queue; { assert 0, "assertion failed to trigger" except AssertionFailed, e: pass + except NotFound, e: # queue named amp.topic not found + pass def testAssert2(self): snd = self.ssn.sender("amq.topic; {assert: always}") |