diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 05:11:33 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 05:11:33 +0000 |
commit | 7714d05b36c95a7c59e1b84f0221bd7d95263b3a (patch) | |
tree | 22c075a382f6a5e28e9d68db7bf4034d374349d7 | |
parent | a7cb15a4d29c9df1394db91fb1c80c75a708139a (diff) | |
download | qpid-python-7714d05b36c95a7c59e1b84f0221bd7d95263b3a.tar.gz |
tweaks to link
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/driver.py | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index d4f5d3bb93..9978a27f5c 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -140,10 +140,9 @@ class LinkIn: def init_link(self, sst, rcv, _rcv): _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv - _rcv.closing = False _rcv.draining = False - def do_link(self, sst, rcv, _rcv, type, subtype): + def do_link(self, sst, rcv, _rcv, type, subtype, action): if type == "topic": _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) @@ -163,11 +162,8 @@ class LinkIn: elif type == "queue": _rcv._queue = _rcv.name - def done(): - rcv.linked = True - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) - sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), done) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): sst.write_cmd(MessageCancel(_rcv.destination), action) @@ -183,15 +179,14 @@ class LinkOut: def init_link(self, sst, snd, _snd): _snd.closing = False - def do_link(self, sst, snd, _snd, type, subtype): + def do_link(self, sst, snd, _snd, type, subtype, action): if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - - snd.linked = True + action() def do_unlink(self, sst, snd, _snd, action=noop): action() @@ -584,6 +579,7 @@ class Driver: if _lnk is None and not lnk.closing and not lnk.closed: _lnk = Attachment(lnk) + _lnk.closing = False dir.init_link(sst, lnk, _lnk) err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk) @@ -592,23 +588,26 @@ class Driver: lnk.closed = True return + def linked(): + lnk.linked = True + def resolved(type, subtype): - dir.do_link(sst, lnk, _lnk, type, subtype) + dir.do_link(sst, lnk, _lnk, type, subtype, linked) self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) self._attachments[lnk] = _lnk if lnk.linked and lnk.closing and not lnk.closed: if not _lnk.closing: - def done(): + def unlinked(): dir.del_link(sst, lnk, _lnk) del self._attachments[lnk] lnk.closed = True if _lnk.options.get("delete") in ("always", dir.DIR_NAME): dir.do_unlink(sst, lnk, _lnk) - self.delete(sst, _lnk.name, done) + self.delete(sst, _lnk.name, unlinked) else: - dir.do_unlink(sst, lnk, _lnk, done) + dir.do_unlink(sst, lnk, _lnk, unlinked) _lnk.closing = True elif not lnk.linked and lnk.closing and not lnk.closed: lnk.closed = True |