summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py66
1 files changed, 43 insertions, 23 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 2feba15918..ccef148c4d 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -119,6 +119,14 @@ class SessionState:
self.max_completion = cmd.id
self.write_op(cmd)
+ def write_cmds(self, cmds, action=noop):
+ if cmds:
+ for cmd in cmds[:-1]:
+ self.write_cmd(cmd)
+ self.write_cmd(cmds[-1], action)
+ else:
+ action()
+
def write_op(self, op):
op.channel = self.channel
self.driver.write_op(op)
@@ -603,22 +611,27 @@ class Driver:
def resolve_declare(self, sst, lnk, dir, action):
def do_resolved(er, qr):
+ err = None
if er.not_found and not qr.queue:
if lnk.options.get("create") in ("always", dir):
- err = self.declare(sst, lnk.name, lnk.options, action)
+ err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
-
- if err:
- tgt = lnk.target
- tgt.error = err
- del self._attachments[tgt]
- tgt.closed = True
- return
elif qr.queue:
- action("queue", None)
+ try:
+ cmds = self.bindings(lnk)
+ sst.write_cmds(cmds, lambda: action("queue", None))
+ except address.ParseError, e:
+ err = (e,)
else:
action("topic", er.type)
+
+ if err:
+ tgt = lnk.target
+ tgt.error = err
+ del self._attachments[tgt]
+ tgt.closed = True
+ return
self.resolve(sst, lnk.name, do_resolved)
def resolve(self, sst, name, action):
@@ -631,8 +644,9 @@ class Driver:
sst.write_query(ExchangeQuery(name), do_result)
sst.write_query(QueueQuery(name), do_action)
- def declare(self, sst, name, options, action):
- props = options.get("node-properties", {})
+ def declare(self, sst, lnk, action):
+ name = lnk.name
+ props = lnk.options.get("node-properties", {})
durable = props.get("durable", DURABLE_DEFAULT)
type = props.get("type", "queue")
xprops = props.get("x-properties", {})
@@ -641,7 +655,6 @@ class Driver:
cmd = ExchangeDeclare(exchange=name, durable=durable)
elif type == "queue":
cmd = QueueDeclare(queue=name, durable=durable)
- bindings = xprops.pop("bindings", [])
else:
raise ValueError(type)
@@ -660,18 +673,22 @@ class Driver:
cmds = [cmd]
if type == "queue":
- for b in bindings:
- try:
- n, s, o = address.parse(b)
- except address.ParseError, e:
- return (e,)
- cmds.append(ExchangeBind(name, n, s, o))
+ try:
+ cmds.extend(self.bindings(lnk))
+ except address.ParseError, e:
+ return (e,)
+
+ sst.write_cmds(cmds, lambda: action(type, subtype))
- for c in cmds[:-1]:
- sst.write_cmd(c)
- def do_action():
- action(type, subtype)
- sst.write_cmd(cmds[-1], do_action)
+ def bindings(self, lnk):
+ props = lnk.options.get("node-properties", {})
+ xprops = props.get("x-properties", {})
+ bindings = xprops.get("bindings", [])
+ cmds = []
+ for b in bindings:
+ n, s, o = address.parse(b)
+ cmds.append(ExchangeBind(lnk.name, n, s, o))
+ return cmds
def delete(self, sst, name, action):
def do_delete(er, qr):
@@ -717,6 +734,7 @@ class Driver:
if not ssn.transactional:
sst.acked.remove(m)
sst.write_cmd(MessageAccept(ids), ack_ack)
+ log.debug("SACK[%s]: %s", ssn.log_id, m)
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -848,9 +866,11 @@ class Driver:
snd.acked += 1
m = snd.session.outgoing.pop(0)
sst.outgoing_idx -= 1
+ log.debug("RACK[%s]: %s", sst.session.log_id, msg)
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
payload=body), msg_acked)
+ log.debug("SENT[%s]: %s", sst.session.log_id, msg)
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)