summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-01-22 20:09:58 +0000
committerRafael H. Schloming <rhs@apache.org>2010-01-22 20:09:58 +0000
commit20dba34f5764cde62d64241ff47cd5ee8a239cd9 (patch)
tree70e55aa2f024befc5d06fb20ead7352b7d9f73ae
parent6d79c93d2d895e19acaf571f5eaf52202919e6b1 (diff)
downloadqpid-python-20dba34f5764cde62d64241ff47cd5ee8a239cd9.tar.gz
made bindings additive, improved message level logging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@902246 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--python/qpid/driver.py66
-rw-r--r--python/qpid/tests/messaging.py41
2 files changed, 84 insertions, 23 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 2feba15918..ccef148c4d 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -119,6 +119,14 @@ class SessionState:
self.max_completion = cmd.id
self.write_op(cmd)
+ def write_cmds(self, cmds, action=noop):
+ if cmds:
+ for cmd in cmds[:-1]:
+ self.write_cmd(cmd)
+ self.write_cmd(cmds[-1], action)
+ else:
+ action()
+
def write_op(self, op):
op.channel = self.channel
self.driver.write_op(op)
@@ -603,22 +611,27 @@ class Driver:
def resolve_declare(self, sst, lnk, dir, action):
def do_resolved(er, qr):
+ err = None
if er.not_found and not qr.queue:
if lnk.options.get("create") in ("always", dir):
- err = self.declare(sst, lnk.name, lnk.options, action)
+ err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
-
- if err:
- tgt = lnk.target
- tgt.error = err
- del self._attachments[tgt]
- tgt.closed = True
- return
elif qr.queue:
- action("queue", None)
+ try:
+ cmds = self.bindings(lnk)
+ sst.write_cmds(cmds, lambda: action("queue", None))
+ except address.ParseError, e:
+ err = (e,)
else:
action("topic", er.type)
+
+ if err:
+ tgt = lnk.target
+ tgt.error = err
+ del self._attachments[tgt]
+ tgt.closed = True
+ return
self.resolve(sst, lnk.name, do_resolved)
def resolve(self, sst, name, action):
@@ -631,8 +644,9 @@ class Driver:
sst.write_query(ExchangeQuery(name), do_result)
sst.write_query(QueueQuery(name), do_action)
- def declare(self, sst, name, options, action):
- props = options.get("node-properties", {})
+ def declare(self, sst, lnk, action):
+ name = lnk.name
+ props = lnk.options.get("node-properties", {})
durable = props.get("durable", DURABLE_DEFAULT)
type = props.get("type", "queue")
xprops = props.get("x-properties", {})
@@ -641,7 +655,6 @@ class Driver:
cmd = ExchangeDeclare(exchange=name, durable=durable)
elif type == "queue":
cmd = QueueDeclare(queue=name, durable=durable)
- bindings = xprops.pop("bindings", [])
else:
raise ValueError(type)
@@ -660,18 +673,22 @@ class Driver:
cmds = [cmd]
if type == "queue":
- for b in bindings:
- try:
- n, s, o = address.parse(b)
- except address.ParseError, e:
- return (e,)
- cmds.append(ExchangeBind(name, n, s, o))
+ try:
+ cmds.extend(self.bindings(lnk))
+ except address.ParseError, e:
+ return (e,)
+
+ sst.write_cmds(cmds, lambda: action(type, subtype))
- for c in cmds[:-1]:
- sst.write_cmd(c)
- def do_action():
- action(type, subtype)
- sst.write_cmd(cmds[-1], do_action)
+ def bindings(self, lnk):
+ props = lnk.options.get("node-properties", {})
+ xprops = props.get("x-properties", {})
+ bindings = xprops.get("bindings", [])
+ cmds = []
+ for b in bindings:
+ n, s, o = address.parse(b)
+ cmds.append(ExchangeBind(lnk.name, n, s, o))
+ return cmds
def delete(self, sst, name, action):
def do_delete(er, qr):
@@ -717,6 +734,7 @@ class Driver:
if not ssn.transactional:
sst.acked.remove(m)
sst.write_cmd(MessageAccept(ids), ack_ack)
+ log.debug("SACK[%s]: %s", ssn.log_id, m)
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -848,9 +866,11 @@ class Driver:
snd.acked += 1
m = snd.session.outgoing.pop(0)
sst.outgoing_idx -= 1
+ log.debug("RACK[%s]: %s", sst.session.log_id, msg)
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
payload=body), msg_acked)
+ log.debug("SENT[%s]: %s", sst.session.log_id, msg)
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index ba33ab9e80..7bcbc455af 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -718,6 +718,47 @@ test-bindings-queue; {
rcv = self.ssn.receiver("test-bindings-queue")
self.drain(rcv, expected=["one", "two", "three", "four"])
+ def testBindingsAdditive(self):
+ m1 = self.content("testBindingsAdditive", 1)
+ m2 = self.content("testBindingsAdditive", 2)
+ m3 = self.content("testBindingsAdditive", 3)
+ m4 = self.content("testBindingsAdditive", 4)
+
+ snd = self.ssn.sender("""
+test-bindings-additive-queue; {
+ create: always,
+ delete: always,
+ node-properties: {
+ x-properties: {
+ bindings: ["amq.topic/a"]
+ }
+ }
+}
+""")
+
+ snd_a = self.ssn.sender("amq.topic/a")
+ snd_b = self.ssn.sender("amq.topic/b")
+
+ snd_a.send(m1)
+ snd_b.send(m2)
+
+ rcv = self.ssn.receiver("test-bindings-additive-queue")
+ self.drain(rcv, expected=[m1])
+
+ new_snd = self.ssn.sender("""
+test-bindings-additive-queue; {
+ node-properties: {
+ x-properties: {
+ bindings: ["amq.topic/b"]
+ }
+ }
+}
+""")
+
+ new_snd.send(m3)
+ snd_b.send(m4)
+ self.drain(rcv, expected=[m3, m4])
+
NOSUCH_Q = "this-queue-should-not-exist"
UNPARSEABLE_ADDR = "name/subject; {bad options"
UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"