summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-01-19 21:29:33 +0000
committerRafael H. Schloming <rhs@apache.org>2010-01-19 21:29:33 +0000
commit89e6f4eea1d172046c806ca3c01244bb0b9cfeee (patch)
treedac0125945c56e9a8a62406c6d180cf7735014a8 /python/qpid/driver.py
parent07b7a61c7876b92b6ea0d6355d97cae4437df5c2 (diff)
downloadqpid-python-89e6f4eea1d172046c806ca3c01244bb0b9cfeee.tar.gz
fixed bug in destination/receiver correlation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@900967 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py32
1 files changed, 19 insertions, 13 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 345869cc48..858e2922ea 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -101,6 +101,8 @@ class SessionState:
# XXX: need to periodically exchange completion/known_completion
+ self.destinations = {}
+
def write_query(self, query, handler):
id = self.sent
self.write_cmd(query, lambda: handler(self.results.pop(id)))
@@ -500,6 +502,8 @@ class Driver:
_rcv = self._attachments.get(rcv)
if _rcv is None and not rcv.closing and not rcv.closed:
_rcv = Attachment(rcv)
+ _rcv.destination = str(rcv.id)
+ sst.destinations[_rcv.destination] = _rcv
_rcv.canceled = False
_rcv.draining = False
@@ -525,7 +529,7 @@ class Driver:
def do_link(type, subtype):
if type == "topic":
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
filter = _rcv.options.get("filter")
if _rcv.subject is None and filter is None:
@@ -543,8 +547,8 @@ class Driver:
elif type == "queue":
_rcv._queue = _rcv.name
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit))
rcv.linked = True
self.resolve_declare(sst, _rcv, "receiver", do_link)
@@ -554,12 +558,13 @@ class Driver:
if not _rcv.canceled:
def do_unlink():
del self._attachments[rcv]
+ del sst.destinations[_rcv.destination]
rcv.closed = True
if _rcv.options.get("delete") in ("always", "receiver"):
- sst.write_cmd(MessageCancel(rcv.destination))
+ sst.write_cmd(MessageCancel(_rcv.destination))
self.delete(sst, _rcv.name, do_unlink)
else:
- sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
+ sst.write_cmd(MessageCancel(_rcv.destination), do_unlink)
_rcv.canceled = True
def resolve_declare(self, sst, lnk, dir, action):
@@ -725,7 +730,8 @@ class Driver:
sst.aborting = False
for rcv in ssn.receivers:
- sst.write_cmd(MessageStop(rcv.destination))
+ _rcv = self._attachments[rcv]
+ sst.write_cmd(MessageStop(_rcv.destination))
sst.write_cmd(ExecutionSync(), do_rb)
def grant(self, rcv):
@@ -745,12 +751,12 @@ class Driver:
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
- sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
rcv.impending += delta
elif delta < 0 and not rcv.draining:
_rcv.draining = True
@@ -758,7 +764,7 @@ class Driver:
rcv.impending = rcv.received
_rcv.draining = False
self.grant(rcv)
- sst.write_cmd(MessageStop(rcv.destination), do_stop)
+ sst.write_cmd(MessageStop(_rcv.destination), do_stop)
if rcv.draining:
_rcv.draining = True
@@ -767,7 +773,7 @@ class Driver:
rcv.granted = rcv.impending
_rcv.draining = False
rcv.draining = False
- sst.write_cmd(MessageFlush(rcv.destination), do_flush)
+ sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
def process_receiver(self, rcv):
@@ -821,7 +827,7 @@ class Driver:
ssn = sst.session
msg = self._decode(xfr)
- rcv = ssn.receivers[int(xfr.destination)]
+ rcv = sst.destinations[xfr.destination].target
msg._receiver = rcv
if rcv.impending is not UNLIMITED:
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)