diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 14:45:52 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 14:45:52 +0000 |
commit | d8510ac5aecfe1d17e538b06718dabe296fc21a3 (patch) | |
tree | 83172a3b137d79d62d134ef16ebf0b334299068d | |
parent | 7e57938eb813975c83e738b5c3f6543117b34a38 (diff) | |
download | qpid-python-d8510ac5aecfe1d17e538b06718dabe296fc21a3.tar.gz |
added support for browsing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910999 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/driver.py | 72 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 22 | ||||
-rw-r--r-- | qpid/python/todo.txt | 4 |
3 files changed, 69 insertions, 29 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index 9978a27f5c..aa2ca3ccc5 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -132,10 +132,44 @@ class SessionState: op.channel = self.channel self.driver.write_op(op) +POLICIES = Values("always", "sender", "receiver", "never") + +class Bindings: + + def validate(self, o, ctx): + t = ctx.containers[1].get("type", "queue") + if t != "queue": + return "bindings are only permitted on nodes of type queue" + +COMMON_OPTS = { + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node-properties": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-properties": Map({ + "type": Types(basestring), + "bindings": And(Types(list), Bindings()) + }, + restricted=False) + }) + } + +RECEIVE_MODES = Values("browse", "consume") + +SOURCE_OPTS = COMMON_OPTS.copy() +SOURCE_OPTS.update({ + "mode": RECEIVE_MODES + }) + +TARGET_OPTS = COMMON_OPTS.copy() + class LinkIn: ADDR_NAME = "source" DIR_NAME = "receiver" + VALIDATOR = Map(SOURCE_OPTS) def init_link(self, sst, rcv, _rcv): _rcv.destination = str(rcv.id) @@ -143,6 +177,8 @@ class LinkIn: _rcv.draining = False def do_link(self, sst, rcv, _rcv, type, subtype, action): + acq_mode = acquire_mode.pre_acquired + if type == "topic": _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) @@ -161,8 +197,11 @@ class LinkIn: f._bind(sst, _rcv.name, _rcv._queue) elif type == "queue": _rcv._queue = _rcv.name + if _rcv.options.get("mode", "consume") == "browse": + acq_mode = acquire_mode.not_acquired - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, + acquire_mode = acq_mode)) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): @@ -175,6 +214,7 @@ class LinkOut: ADDR_NAME = "target" DIR_NAME = "sender" + VALIDATOR = Map(TARGET_OPTS) def init_link(self, sst, snd, _snd): _snd.closing = False @@ -582,7 +622,7 @@ class Driver: _lnk.closing = False dir.init_link(sst, lnk, _lnk) - err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk) + err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) if err: lnk.error = (err,) lnk.closed = True @@ -626,33 +666,9 @@ class Driver: except address.ParseError, e: return e - POLICIES = Values("always", "sender", "receiver", "never") - - class Bindings: - - def validate(self, o, ctx): - t = ctx.containers[1].get("type", "queue") - if t != "queue": - return "bindings are only permitted on nodes of type queue" - - OPTS = Map({ - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node-properties": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-properties": Map({ - "type": Types(basestring), - "bindings": And(Types(list), Bindings()) - }, - restricted=False) - }) - }) - - def validate_options(self, lnk): + def validate_options(self, lnk, dir): ctx = Context() - err = Driver.OPTS.validate(lnk.options, ctx) + err = dir.VALIDATOR.validate(lnk.options, ctx) if err: return "error in options: %s" % err def resolve_declare(self, sst, lnk, dir, action): diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index ce1302a4d6..ca35c56166 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -580,6 +580,22 @@ class ReceiverTests(Base): # XXX: need testClose + def testMode(self): + msgs = [self.content("testMode", 1), + self.content("testMode", 2), + self.content("testMode", 3)] + + for m in msgs: + self.snd.send(m) + + rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') + rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') + self.drain(rb, expected=msgs) + self.drain(rc, expected=msgs) + rb2 = self.ssn.receiver(rb.source) + self.assertEmpty(rb2) + self.drain(self.rcv, expected=[]) + class AddressTests(Base): def setup_connection(self): @@ -846,6 +862,12 @@ class AddressErrorTests(Base): self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError, lambda e: "unrecognized characters" in str(e)) + def testInvalidMode(self): + # XXX: should have specific exception for this + self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', + ReceiveError, + lambda e: "not in ('browse', 'consume')" in str(e)) + SENDER_Q = 'test-sender-q; {create: always, delete: always}' class SenderTests(Base): diff --git a/qpid/python/todo.txt b/qpid/python/todo.txt index c8a4844370..6afe07d4f2 100644 --- a/qpid/python/todo.txt +++ b/qpid/python/todo.txt @@ -167,7 +167,7 @@ Address: + need to handle cleanup of temp queues/topics: F, NR + passthrough options for creating exchanges/queues: F, NR - integration with java: NF - - queue browsing: NF + - queue browsing: F, NR - temporary queues: NF - xquery: NF @@ -190,3 +190,5 @@ Examples: Miscellaneous: - standard ping-like (drain/spout) utilities for all clients: NF + - caching of resolved addresses + - consider using separate session for query/deletion/creation of addresses |