summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/python/qpid/messaging/driver.py51
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py2
2 files changed, 35 insertions, 18 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index ff801c2282..74ed038b0e 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -122,9 +122,9 @@ class SessionState:
self.destinations = {}
- def write_query(self, query, handler):
+ def write_query(self, query, handler, obj):
id = self.sent
- self.write_cmd(query, lambda: handler(self.results.pop(id)))
+ self.write_cmd(query, lambda: handler(self.results.pop(id), obj))
def apply_overrides(self, cmd, overrides):
for k, v in overrides.items():
@@ -1011,7 +1011,7 @@ class Engine:
return
self.resolve(sst, lnk.name, do_resolved, node_type=requested_type, force=declare)
- def resolve(self, sst, name, action, force=False, node_type=None):
+ def resolve(self, sst, name, action, force=False, node_type=None, delete=False):
if not force and not node_type:
try:
type, subtype = self.address_cache[name]
@@ -1020,35 +1020,50 @@ class Engine:
except KeyError:
pass
- args = []
- def do_result(r):
- args.append(r)
- def do_action(r):
- do_result(r)
- er, qr = args
- if node_type == "topic" and not er.not_found:
+ args = { "topic":None, "queue":None }
+ def do_result(r, obj):
+ args[obj] = r
+ def do_action():
+ er = args["topic"]
+ qr = args["queue"]
+ if node_type == "topic" and er and not er.not_found:
type, subtype = "topic", er.type
- elif node_type == "queue" and qr.queue:
+ elif node_type == "queue" and qr and qr.queue:
type, subtype = "queue", None
- elif er.not_found and not qr.queue:
+ elif (er and er.not_found) and qr and not qr.queue:
type, subtype = None, None
- elif qr.queue:
+ elif (qr and qr.queue):
if node_type == "topic" and force:
type, subtype = None, None
else:
type, subtype = "queue", None
- elif not er.not_found:
+ elif (er and not er.not_found):
if node_type == "queue" and force:
type, subtype = None, None
else:
type, subtype = "topic", er.type
+ elif er:
+ if er.not_found:
+ type, subtype = None, None
+ else:
+ type, subtype = "topic", er.type
else:
- type, subtype = "topic", er.type
+ type, subtype = None, None
if type is not None:
self.address_cache[name] = (type, subtype)
action(type, subtype)
- sst.write_query(ExchangeQuery(name), do_result)
- sst.write_query(QueueQuery(name), do_action)
+ def do_result_and_action(r, obj):
+ do_result(r, obj)
+ do_action()
+ if (node_type is None): # we don't know the type, let check broker
+ sst.write_query(ExchangeQuery(name), do_result, "topic")
+ sst.write_query(QueueQuery(name), do_result_and_action, "queue")
+ elif force and not delete: # we forcefully declare known type, dont ask broker
+ do_action()
+ elif node_type == "topic":
+ sst.write_query(ExchangeQuery(name), do_result_and_action, "topic")
+ else:
+ sst.write_query(QueueQuery(name), do_result_and_action, "queue")
def declare(self, sst, lnk, action, create_node):
name = lnk.name
@@ -1102,7 +1117,7 @@ class Engine:
action()
else:
raise ValueError(type)
- self.resolve(sst, name, do_delete, force=True, node_type=node_type)
+ self.resolve(sst, name, do_delete, force=True, node_type=node_type, delete=True)
def process(self, ssn):
if ssn.closed or ssn.closing: return
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 56722374e5..d0103ee32c 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -1230,6 +1230,8 @@ test-link-bindings-queue; {
assert 0, "assertion failed to trigger"
except AssertionFailed, e:
pass
+ except NotFound, e: # queue named amp.topic not found
+ pass
def testAssert2(self):
snd = self.ssn.sender("amq.topic; {assert: always}")