diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-11-16 12:05:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-16 12:05:50 +0000 |
commit | b08f16897e3f5c65c155d6d63c380003510430c9 (patch) | |
tree | 56434bda6a4135e0ae209a19424916dd21779232 /python/qpid/driver.py | |
parent | efc6473096622a01b2a3907093431b49d8ebfb1e (diff) | |
download | qpid-python-b08f16897e3f5c65c155d6d63c380003510430c9.tar.gz |
added address support for specifying node type and properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 190 |
1 files changed, 101 insertions, 89 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 3e45045bec..21d2b432f4 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -471,43 +471,20 @@ class Driver: if _snd.options is None: _snd.options = {} - def do_link(): - snd.linked = True - - def do_queue_q(result): - if sst.detached: - return - - if result.queue: + def do_link(type, subtype): + if type == "topic": + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - do_link() - else: - snd.error = ("no such queue: %s" % _snd.name,) - del self._attachments[snd] - snd.closed = True - - def do_exchange_q(result): - if sst.detached: - return - if result.not_found: - if _snd.options.get("create") in ("always", "sender"): - sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) - _snd._exchange = "" - _snd._routing_key = _snd.name - else: - sst.write_query(QueueQuery(queue=_snd.name), do_queue_q) - return - else: - _snd._exchange = _snd.name - _snd._routing_key = _snd.subject - do_link() + snd.linked = True - sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) + self.resolve_declare(sst, _snd, "sender", do_link) self._attachments[snd] = _snd - if snd.closing and not (snd.closed or _snd.closing): + if snd.linked and snd.closing and not (snd.closed or _snd.closing): _snd.closing = True def do_unlink(): del self._attachments[snd] @@ -545,38 +522,13 @@ class Driver: if _rcv.options is None: _rcv.options = {} - def do_link(): - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) - rcv.linked = True - - def do_queue_q(result): - if sst.detached: - return - if result.queue: - _rcv._queue = _rcv.name - do_link() - else: - rcv.error = ("no such queue: %s" % _rcv.name,) - del self._attachments[rcv] - rcv.closed = True - - def do_exchange_q(result): - if sst.detached: - return - if result.not_found: - if _rcv.options.get("create") in ("always", "receiver"): - _rcv._queue = _rcv.name - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) - else: - sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q) - return - else: + def do_link(type, subtype): + if type == "topic": _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) filter = _rcv.options.get("filter") if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[result.type] + f = FILTER_DEFAULTS[subtype] elif _rcv.subject and filter: # XXX raise Exception("can't supply both subject and filter") @@ -587,41 +539,101 @@ class Driver: else: f = filter f._bind(sst, _rcv.name, _rcv._queue) - do_link() - sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q) + elif type == "queue": + _rcv._queue = _rcv.name + + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + rcv.linked = True + + self.resolve_declare(sst, _rcv, "receiver", do_link) self._attachments[rcv] = _rcv - if rcv.closing and not rcv.closed: - if rcv.linked: - if not _rcv.canceled: - def do_unlink(): - del self._attachments[rcv] - rcv.closed = True - if _rcv.options.get("delete") in ("always", "receiver"): - sst.write_cmd(MessageCancel(rcv.destination)) - self.delete(sst, _rcv.name, do_unlink) - else: - sst.write_cmd(MessageCancel(rcv.destination), do_unlink) - _rcv.canceled = True - else: - rcv.closed = True + if rcv.linked and rcv.closing and not rcv.closed: + if not _rcv.canceled: + def do_unlink(): + del self._attachments[rcv] + rcv.closed = True + if _rcv.options.get("delete") in ("always", "receiver"): + sst.write_cmd(MessageCancel(rcv.destination)) + self.delete(sst, _rcv.name, do_unlink) + else: + sst.write_cmd(MessageCancel(rcv.destination), do_unlink) + _rcv.canceled = True + + def resolve_declare(self, sst, lnk, dir, action): + def do_resolved(er, qr): + if er.not_found and not qr.queue: + if lnk.options.get("create") in ("always", dir): + err = self.declare(sst, lnk.name, lnk.options, action) + else: + err = ("no such queue: %s" % lnk.name,) - def delete(self, sst, name, completion): - def do_queue_delq(result): - if sst.detached: - return - if result.queue: - sst.write_cmd(QueueDelete(name), completion) + if err: + tgt = lnk.target + tgt.error = err + del self._attachments[tgt] + tgt.closed = True + return + elif qr.queue: + action("queue", None) else: - completion() - def do_exchange_delq(result): - if sst.detached: - return - if result.not_found: - sst.write_query(QueueQuery(name), do_queue_delq) + action("topic", er.type) + self.resolve(sst, lnk.name, do_resolved) + + def resolve(self, sst, name, action): + args = [] + def do_result(r): + args.append(r) + def do_action(r): + do_result(r) + action(*args) + sst.write_query(ExchangeQuery(name), do_result) + sst.write_query(QueueQuery(name), do_action) + + def declare(self, sst, name, options, action): + opts = dict(options) + props = dict(opts.pop("node-properties", {})) + durable = props.pop("durable", DURABLE_DEFAULT) + type = props.pop("type", "queue") + xprops = dict(props.pop("x-properties", {})) + + if props: + return ("unrecognized option(s): %s" % "".join(props.keys()),) + + if type == "topic": + cmd = ExchangeDeclare(exchange=name, durable=durable) + elif type == "queue": + cmd = QueueDeclare(queue=name, durable=durable) + else: + return ("unrecognized type, must be topic or queue: %s" % type,) + + for f in cmd.FIELDS: + if f.name != "arguments" and xprops.has_key(f.name): + cmd[f.name] = xprops.pop(f.name) + if xprops: + cmd.arguments = xprops + + if type == "topic": + if cmd.type is None: + cmd.type = "topic" + subtype = cmd.type + else: + subtype = None + + def do_action(): + action(type, subtype) + sst.write_cmd(cmd, do_action) + + def delete(self, sst, name, action): + def do_delete(er, qr): + if not er.not_found: + sst.write_cmd(ExchangeDelete(name), action) + elif qr.queue: + sst.write_cmd(QueueDelete(name), action) else: - sst.write_cmd(ExchangeDelete(name), completion) - sst.write_query(ExchangeQuery(name), do_exchange_delq) + action() + self.resolve(sst, name, do_delete) def process(self, ssn): if ssn.closed or ssn.closing: return |