From a7cb15a4d29c9df1394db91fb1c80c75a708139a Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 17 Feb 2010 05:00:22 +0000 Subject: combined duplicate logic between link_in/link_out git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910823 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid/driver.py | 246 +++++++++++++++++++++------------------------ 1 file changed, 115 insertions(+), 131 deletions(-) diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index 2eef9db06c..d4f5d3bb93 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -132,6 +132,73 @@ class SessionState: op.channel = self.channel self.driver.write_op(op) +class LinkIn: + + ADDR_NAME = "source" + DIR_NAME = "receiver" + + def init_link(self, sst, rcv, _rcv): + _rcv.destination = str(rcv.id) + sst.destinations[_rcv.destination] = _rcv + _rcv.closing = False + _rcv.draining = False + + def do_link(self, sst, rcv, _rcv, type, subtype): + if type == "topic": + _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) + filter = _rcv.options.get("filter") + if _rcv.subject is None and filter is None: + f = FILTER_DEFAULTS[subtype] + elif _rcv.subject and filter: + # XXX + raise Exception("can't supply both subject and filter") + elif _rcv.subject: + # XXX + from messaging import Pattern + f = Pattern(_rcv.subject) + else: + f = filter + f._bind(sst, _rcv.name, _rcv._queue) + elif type == "queue": + _rcv._queue = _rcv.name + + def done(): + rcv.linked = True + + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), done) + + def do_unlink(self, sst, rcv, _rcv, action=noop): + sst.write_cmd(MessageCancel(_rcv.destination), action) + + def del_link(self, sst, rcv, _rcv): + del sst.destinations[_rcv.destination] + +class LinkOut: + + ADDR_NAME = "target" + DIR_NAME = "sender" + + def init_link(self, sst, snd, _snd): + _snd.closing = False + + def do_link(self, sst, snd, _snd, type, subtype): + if type == "topic": + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + elif type == "queue": + _snd._exchange = "" + _snd._routing_key = _snd.name + + snd.linked = True + + def do_unlink(self, sst, snd, _snd, action=noop): + action() + + def del_link(self, sst, snd, _snd): + pass + # XXX HEADER="!4s4B" @@ -148,6 +215,9 @@ class Driver: self.log_id = "%x" % id(self.connection) self._lock = self.connection._lock + self._in = LinkIn() + self._out = LinkOut() + self._selector = Selector.default() self._attempts = 0 self._hosts = [(self.connection.host, self.connection.port)] + \ @@ -486,9 +556,9 @@ class Driver: self._sessions[sst.channel] = sst for snd in ssn.senders: - self.link_out(snd) + self.link(snd, self._out, snd.target) for rcv in ssn.receivers: - self.link_in(rcv) + self.link(rcv, self._in, rcv.source) if sst is not None and ssn.closing and not sst.detached: sst.detached = True @@ -508,135 +578,54 @@ class Driver: sst.write_op(SessionDetached(name=dtc.name)) self.do_session_detached(dtc) - def link_out(self, snd): - sst = self._attachments.get(snd.session) - _snd = self._attachments.get(snd) - if _snd is None and not snd.closing and not snd.closed: - _snd = Attachment(snd) - _snd.closing = False - - if snd.target is None: - snd.error = ("target is None",) - snd.closed = True - return - - try: - _snd.name, _snd.subject, _snd.options = address.parse(snd.target) - except address.LexError, e: - snd.error = (e,) - snd.closed = True - return - except address.ParseError, e: - snd.error = (e,) - snd.closed = True - return - - # XXX: subject - if _snd.options is None: - _snd.options = {} - - if not self.validate_options(_snd): - return + def link(self, lnk, dir, addr): + sst = self._attachments.get(lnk.session) + _lnk = self._attachments.get(lnk) - def do_link(type, subtype): - if type == "topic": - _snd._exchange = _snd.name - _snd._routing_key = _snd.subject - elif type == "queue": - _snd._exchange = "" - _snd._routing_key = _snd.name - - snd.linked = True - - self.resolve_declare(sst, _snd, "sender", do_link) - self._attachments[snd] = _snd - - if snd.linked and snd.closing and not (snd.closed or _snd.closing): - _snd.closing = True - def do_unlink(): - del self._attachments[snd] - snd.closed = True - if _snd.options.get("delete") in ("always", "sender"): - self.delete(sst, _snd.name, do_unlink) - else: - do_unlink() - elif not snd.linked and snd.closing and not snd.closed: - snd.closed = True + if _lnk is None and not lnk.closing and not lnk.closed: + _lnk = Attachment(lnk) + dir.init_link(sst, lnk, _lnk) - def link_in(self, rcv): - sst = self._attachments.get(rcv.session) - _rcv = self._attachments.get(rcv) - if _rcv is None and not rcv.closing and not rcv.closed: - _rcv = Attachment(rcv) - _rcv.destination = str(rcv.id) - sst.destinations[_rcv.destination] = _rcv - _rcv.canceled = False - _rcv.draining = False - - if rcv.source is None: - rcv.error = ("source is None",) - rcv.closed = True + err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk) + if err: + lnk.error = (err,) + lnk.closed = True return + def resolved(type, subtype): + dir.do_link(sst, lnk, _lnk, type, subtype) + + self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) + self._attachments[lnk] = _lnk + + if lnk.linked and lnk.closing and not lnk.closed: + if not _lnk.closing: + def done(): + dir.del_link(sst, lnk, _lnk) + del self._attachments[lnk] + lnk.closed = True + if _lnk.options.get("delete") in ("always", dir.DIR_NAME): + dir.do_unlink(sst, lnk, _lnk) + self.delete(sst, _lnk.name, done) + else: + dir.do_unlink(sst, lnk, _lnk, done) + _lnk.closing = True + elif not lnk.linked and lnk.closing and not lnk.closed: + lnk.closed = True + + def parse_address(self, lnk, dir, addr): + if addr is None: + return "%s is None" % dir.ADDR_NAME + else: try: - _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source) + lnk.name, lnk.subject, lnk.options = address.parse(addr) + # XXX: subject + if lnk.options is None: + lnk.options = {} except address.LexError, e: - rcv.error = (e,) - rcv.closed = True - return + return e except address.ParseError, e: - rcv.error = (e,) - rcv.closed = True - return - - # XXX: subject - if _rcv.options is None: - _rcv.options = {} - - if not self.validate_options(_rcv): - return - - def do_link(type, subtype): - if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) - filter = _rcv.options.get("filter") - if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[subtype] - elif _rcv.subject and filter: - # XXX - raise Exception("can't supply both subject and filter") - elif _rcv.subject: - # XXX - from messaging import Pattern - f = Pattern(_rcv.subject) - else: - f = filter - f._bind(sst, _rcv.name, _rcv._queue) - elif type == "queue": - _rcv._queue = _rcv.name - - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) - sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit)) - rcv.linked = True - - self.resolve_declare(sst, _rcv, "receiver", do_link) - self._attachments[rcv] = _rcv - - if rcv.linked and rcv.closing and not rcv.closed: - if not _rcv.canceled: - def do_unlink(): - del self._attachments[rcv] - del sst.destinations[_rcv.destination] - rcv.closed = True - if _rcv.options.get("delete") in ("always", "receiver"): - sst.write_cmd(MessageCancel(_rcv.destination)) - self.delete(sst, _rcv.name, do_unlink) - else: - sst.write_cmd(MessageCancel(_rcv.destination), do_unlink) - _rcv.canceled = True - elif not rcv.linked and rcv.closing and not rcv.closed: - rcv.closed = True + return e POLICIES = Values("always", "sender", "receiver", "never") @@ -665,12 +654,7 @@ class Driver: def validate_options(self, lnk): ctx = Context() err = Driver.OPTS.validate(lnk.options, ctx) - if err: - lnk.target.error = ("error in options: %s" % err,) - lnk.target.closed = True - return False - else: - return True + if err: return "error in options: %s" % err def resolve_declare(self, sst, lnk, dir, action): def do_resolved(er, qr): @@ -848,7 +832,7 @@ class Driver: def grant(self, rcv): sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) - if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining: + if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: return if rcv.granted is UNLIMITED: -- cgit v1.2.1