diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-01-19 21:29:33 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-01-19 21:29:33 +0000 |
commit | 89e6f4eea1d172046c806ca3c01244bb0b9cfeee (patch) | |
tree | dac0125945c56e9a8a62406c6d180cf7735014a8 /python/qpid/driver.py | |
parent | 07b7a61c7876b92b6ea0d6355d97cae4437df5c2 (diff) | |
download | qpid-python-89e6f4eea1d172046c806ca3c01244bb0b9cfeee.tar.gz |
fixed bug in destination/receiver correlation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@900967 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 32 |
1 files changed, 19 insertions, 13 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 345869cc48..858e2922ea 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -101,6 +101,8 @@ class SessionState: # XXX: need to periodically exchange completion/known_completion + self.destinations = {} + def write_query(self, query, handler): id = self.sent self.write_cmd(query, lambda: handler(self.results.pop(id))) @@ -500,6 +502,8 @@ class Driver: _rcv = self._attachments.get(rcv) if _rcv is None and not rcv.closing and not rcv.closed: _rcv = Attachment(rcv) + _rcv.destination = str(rcv.id) + sst.destinations[_rcv.destination] = _rcv _rcv.canceled = False _rcv.draining = False @@ -525,7 +529,7 @@ class Driver: def do_link(type, subtype): if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) filter = _rcv.options.get("filter") if _rcv.subject is None and filter is None: @@ -543,8 +547,8 @@ class Driver: elif type == "queue": _rcv._queue = _rcv.name - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit)) rcv.linked = True self.resolve_declare(sst, _rcv, "receiver", do_link) @@ -554,12 +558,13 @@ class Driver: if not _rcv.canceled: def do_unlink(): del self._attachments[rcv] + del sst.destinations[_rcv.destination] rcv.closed = True if _rcv.options.get("delete") in ("always", "receiver"): - sst.write_cmd(MessageCancel(rcv.destination)) + sst.write_cmd(MessageCancel(_rcv.destination)) self.delete(sst, _rcv.name, do_unlink) else: - sst.write_cmd(MessageCancel(rcv.destination), do_unlink) + sst.write_cmd(MessageCancel(_rcv.destination), do_unlink) _rcv.canceled = True def resolve_declare(self, sst, lnk, dir, action): @@ -725,7 +730,8 @@ class Driver: sst.aborting = False for rcv in ssn.receivers: - sst.write_cmd(MessageStop(rcv.destination)) + _rcv = self._attachments[rcv] + sst.write_cmd(MessageStop(_rcv.destination)) sst.write_cmd(ExecutionSync(), do_rb) def grant(self, rcv): @@ -745,12 +751,12 @@ class Driver: delta = max(rcv.granted, rcv.received) - rcv.impending if delta is UNLIMITED: - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) rcv.impending = UNLIMITED elif delta > 0: - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) - sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) rcv.impending += delta elif delta < 0 and not rcv.draining: _rcv.draining = True @@ -758,7 +764,7 @@ class Driver: rcv.impending = rcv.received _rcv.draining = False self.grant(rcv) - sst.write_cmd(MessageStop(rcv.destination), do_stop) + sst.write_cmd(MessageStop(_rcv.destination), do_stop) if rcv.draining: _rcv.draining = True @@ -767,7 +773,7 @@ class Driver: rcv.granted = rcv.impending _rcv.draining = False rcv.draining = False - sst.write_cmd(MessageFlush(rcv.destination), do_flush) + sst.write_cmd(MessageFlush(_rcv.destination), do_flush) def process_receiver(self, rcv): @@ -821,7 +827,7 @@ class Driver: ssn = sst.session msg = self._decode(xfr) - rcv = ssn.receivers[int(xfr.destination)] + rcv = sst.destinations[xfr.destination].target msg._receiver = rcv if rcv.impending is not UNLIMITED: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) |