diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-01-22 20:09:58 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-01-22 20:09:58 +0000 |
commit | 20dba34f5764cde62d64241ff47cd5ee8a239cd9 (patch) | |
tree | 70e55aa2f024befc5d06fb20ead7352b7d9f73ae | |
parent | 6d79c93d2d895e19acaf571f5eaf52202919e6b1 (diff) | |
download | qpid-python-20dba34f5764cde62d64241ff47cd5ee8a239cd9.tar.gz |
made bindings additive, improved message level logging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@902246 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/driver.py | 66 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 41 |
2 files changed, 84 insertions, 23 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 2feba15918..ccef148c4d 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -119,6 +119,14 @@ class SessionState: self.max_completion = cmd.id self.write_op(cmd) + def write_cmds(self, cmds, action=noop): + if cmds: + for cmd in cmds[:-1]: + self.write_cmd(cmd) + self.write_cmd(cmds[-1], action) + else: + action() + def write_op(self, op): op.channel = self.channel self.driver.write_op(op) @@ -603,22 +611,27 @@ class Driver: def resolve_declare(self, sst, lnk, dir, action): def do_resolved(er, qr): + err = None if er.not_found and not qr.queue: if lnk.options.get("create") in ("always", dir): - err = self.declare(sst, lnk.name, lnk.options, action) + err = self.declare(sst, lnk, action) else: err = ("no such queue: %s" % lnk.name,) - - if err: - tgt = lnk.target - tgt.error = err - del self._attachments[tgt] - tgt.closed = True - return elif qr.queue: - action("queue", None) + try: + cmds = self.bindings(lnk) + sst.write_cmds(cmds, lambda: action("queue", None)) + except address.ParseError, e: + err = (e,) else: action("topic", er.type) + + if err: + tgt = lnk.target + tgt.error = err + del self._attachments[tgt] + tgt.closed = True + return self.resolve(sst, lnk.name, do_resolved) def resolve(self, sst, name, action): @@ -631,8 +644,9 @@ class Driver: sst.write_query(ExchangeQuery(name), do_result) sst.write_query(QueueQuery(name), do_action) - def declare(self, sst, name, options, action): - props = options.get("node-properties", {}) + def declare(self, sst, lnk, action): + name = lnk.name + props = lnk.options.get("node-properties", {}) durable = props.get("durable", DURABLE_DEFAULT) type = props.get("type", "queue") xprops = props.get("x-properties", {}) @@ -641,7 +655,6 @@ class Driver: cmd = ExchangeDeclare(exchange=name, durable=durable) elif type == "queue": cmd = QueueDeclare(queue=name, durable=durable) - bindings = xprops.pop("bindings", []) else: raise ValueError(type) @@ -660,18 +673,22 @@ class Driver: cmds = [cmd] if type == "queue": - for b in bindings: - try: - n, s, o = address.parse(b) - except address.ParseError, e: - return (e,) - cmds.append(ExchangeBind(name, n, s, o)) + try: + cmds.extend(self.bindings(lnk)) + except address.ParseError, e: + return (e,) + + sst.write_cmds(cmds, lambda: action(type, subtype)) - for c in cmds[:-1]: - sst.write_cmd(c) - def do_action(): - action(type, subtype) - sst.write_cmd(cmds[-1], do_action) + def bindings(self, lnk): + props = lnk.options.get("node-properties", {}) + xprops = props.get("x-properties", {}) + bindings = xprops.get("bindings", []) + cmds = [] + for b in bindings: + n, s, o = address.parse(b) + cmds.append(ExchangeBind(lnk.name, n, s, o)) + return cmds def delete(self, sst, name, action): def do_delete(er, qr): @@ -717,6 +734,7 @@ class Driver: if not ssn.transactional: sst.acked.remove(m) sst.write_cmd(MessageAccept(ids), ack_ack) + log.debug("SACK[%s]: %s", ssn.log_id, m) sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -848,9 +866,11 @@ class Driver: snd.acked += 1 m = snd.session.outgoing.pop(0) sst.outgoing_idx -= 1 + log.debug("RACK[%s]: %s", sst.session.log_id, msg) assert msg == m sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), payload=body), msg_acked) + log.debug("SENT[%s]: %s", sst.session.log_id, msg) def do_message_transfer(self, xfr): sst = self.get_sst(xfr) diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index ba33ab9e80..7bcbc455af 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -718,6 +718,47 @@ test-bindings-queue; { rcv = self.ssn.receiver("test-bindings-queue") self.drain(rcv, expected=["one", "two", "three", "four"]) + def testBindingsAdditive(self): + m1 = self.content("testBindingsAdditive", 1) + m2 = self.content("testBindingsAdditive", 2) + m3 = self.content("testBindingsAdditive", 3) + m4 = self.content("testBindingsAdditive", 4) + + snd = self.ssn.sender(""" +test-bindings-additive-queue; { + create: always, + delete: always, + node-properties: { + x-properties: { + bindings: ["amq.topic/a"] + } + } +} +""") + + snd_a = self.ssn.sender("amq.topic/a") + snd_b = self.ssn.sender("amq.topic/b") + + snd_a.send(m1) + snd_b.send(m2) + + rcv = self.ssn.receiver("test-bindings-additive-queue") + self.drain(rcv, expected=[m1]) + + new_snd = self.ssn.sender(""" +test-bindings-additive-queue; { + node-properties: { + x-properties: { + bindings: ["amq.topic/b"] + } + } +} +""") + + new_snd.send(m3) + snd_b.send(m4) + self.drain(rcv, expected=[m3, m4]) + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" |