diff options
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r-- | python/qpid/driver.py | 66 |
1 files changed, 43 insertions, 23 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 2feba15918..ccef148c4d 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -119,6 +119,14 @@ class SessionState: self.max_completion = cmd.id self.write_op(cmd) + def write_cmds(self, cmds, action=noop): + if cmds: + for cmd in cmds[:-1]: + self.write_cmd(cmd) + self.write_cmd(cmds[-1], action) + else: + action() + def write_op(self, op): op.channel = self.channel self.driver.write_op(op) @@ -603,22 +611,27 @@ class Driver: def resolve_declare(self, sst, lnk, dir, action): def do_resolved(er, qr): + err = None if er.not_found and not qr.queue: if lnk.options.get("create") in ("always", dir): - err = self.declare(sst, lnk.name, lnk.options, action) + err = self.declare(sst, lnk, action) else: err = ("no such queue: %s" % lnk.name,) - - if err: - tgt = lnk.target - tgt.error = err - del self._attachments[tgt] - tgt.closed = True - return elif qr.queue: - action("queue", None) + try: + cmds = self.bindings(lnk) + sst.write_cmds(cmds, lambda: action("queue", None)) + except address.ParseError, e: + err = (e,) else: action("topic", er.type) + + if err: + tgt = lnk.target + tgt.error = err + del self._attachments[tgt] + tgt.closed = True + return self.resolve(sst, lnk.name, do_resolved) def resolve(self, sst, name, action): @@ -631,8 +644,9 @@ class Driver: sst.write_query(ExchangeQuery(name), do_result) sst.write_query(QueueQuery(name), do_action) - def declare(self, sst, name, options, action): - props = options.get("node-properties", {}) + def declare(self, sst, lnk, action): + name = lnk.name + props = lnk.options.get("node-properties", {}) durable = props.get("durable", DURABLE_DEFAULT) type = props.get("type", "queue") xprops = props.get("x-properties", {}) @@ -641,7 +655,6 @@ class Driver: cmd = ExchangeDeclare(exchange=name, durable=durable) elif type == "queue": cmd = QueueDeclare(queue=name, durable=durable) - bindings = xprops.pop("bindings", []) else: raise ValueError(type) @@ -660,18 +673,22 @@ class Driver: cmds = [cmd] if type == "queue": - for b in bindings: - try: - n, s, o = address.parse(b) - except address.ParseError, e: - return (e,) - cmds.append(ExchangeBind(name, n, s, o)) + try: + cmds.extend(self.bindings(lnk)) + except address.ParseError, e: + return (e,) + + sst.write_cmds(cmds, lambda: action(type, subtype)) - for c in cmds[:-1]: - sst.write_cmd(c) - def do_action(): - action(type, subtype) - sst.write_cmd(cmds[-1], do_action) + def bindings(self, lnk): + props = lnk.options.get("node-properties", {}) + xprops = props.get("x-properties", {}) + bindings = xprops.get("bindings", []) + cmds = [] + for b in bindings: + n, s, o = address.parse(b) + cmds.append(ExchangeBind(lnk.name, n, s, o)) + return cmds def delete(self, sst, name, action): def do_delete(er, qr): @@ -717,6 +734,7 @@ class Driver: if not ssn.transactional: sst.acked.remove(m) sst.write_cmd(MessageAccept(ids), ack_ack) + log.debug("SACK[%s]: %s", ssn.log_id, m) sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -848,9 +866,11 @@ class Driver: snd.acked += 1 m = snd.session.outgoing.pop(0) sst.outgoing_idx -= 1 + log.debug("RACK[%s]: %s", sst.session.log_id, msg) assert msg == m sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), payload=body), msg_acked) + log.debug("SENT[%s]: %s", sst.session.log_id, msg) def do_message_transfer(self, xfr): sst = self.get_sst(xfr) |