summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-17 14:45:52 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-17 14:45:52 +0000
commitd8510ac5aecfe1d17e538b06718dabe296fc21a3 (patch)
tree83172a3b137d79d62d134ef16ebf0b334299068d
parent7e57938eb813975c83e738b5c3f6543117b34a38 (diff)
downloadqpid-python-d8510ac5aecfe1d17e538b06718dabe296fc21a3.tar.gz
added support for browsing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910999 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/driver.py72
-rw-r--r--qpid/python/qpid/tests/messaging.py22
-rw-r--r--qpid/python/todo.txt4
3 files changed, 69 insertions, 29 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
index 9978a27f5c..aa2ca3ccc5 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/driver.py
@@ -132,10 +132,44 @@ class SessionState:
op.channel = self.channel
self.driver.write_op(op)
+POLICIES = Values("always", "sender", "receiver", "never")
+
+class Bindings:
+
+ def validate(self, o, ctx):
+ t = ctx.containers[1].get("type", "queue")
+ if t != "queue":
+ return "bindings are only permitted on nodes of type queue"
+
+COMMON_OPTS = {
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node-properties": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-properties": Map({
+ "type": Types(basestring),
+ "bindings": And(Types(list), Bindings())
+ },
+ restricted=False)
+ })
+ }
+
+RECEIVE_MODES = Values("browse", "consume")
+
+SOURCE_OPTS = COMMON_OPTS.copy()
+SOURCE_OPTS.update({
+ "mode": RECEIVE_MODES
+ })
+
+TARGET_OPTS = COMMON_OPTS.copy()
+
class LinkIn:
ADDR_NAME = "source"
DIR_NAME = "receiver"
+ VALIDATOR = Map(SOURCE_OPTS)
def init_link(self, sst, rcv, _rcv):
_rcv.destination = str(rcv.id)
@@ -143,6 +177,8 @@ class LinkIn:
_rcv.draining = False
def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ acq_mode = acquire_mode.pre_acquired
+
if type == "topic":
_rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
@@ -161,8 +197,11 @@ class LinkIn:
f._bind(sst, _rcv.name, _rcv._queue)
elif type == "queue":
_rcv._queue = _rcv.name
+ if _rcv.options.get("mode", "consume") == "browse":
+ acq_mode = acquire_mode.not_acquired
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
+ acquire_mode = acq_mode))
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
@@ -175,6 +214,7 @@ class LinkOut:
ADDR_NAME = "target"
DIR_NAME = "sender"
+ VALIDATOR = Map(TARGET_OPTS)
def init_link(self, sst, snd, _snd):
_snd.closing = False
@@ -582,7 +622,7 @@ class Driver:
_lnk.closing = False
dir.init_link(sst, lnk, _lnk)
- err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
+ err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
if err:
lnk.error = (err,)
lnk.closed = True
@@ -626,33 +666,9 @@ class Driver:
except address.ParseError, e:
return e
- POLICIES = Values("always", "sender", "receiver", "never")
-
- class Bindings:
-
- def validate(self, o, ctx):
- t = ctx.containers[1].get("type", "queue")
- if t != "queue":
- return "bindings are only permitted on nodes of type queue"
-
- OPTS = Map({
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node-properties": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-properties": Map({
- "type": Types(basestring),
- "bindings": And(Types(list), Bindings())
- },
- restricted=False)
- })
- })
-
- def validate_options(self, lnk):
+ def validate_options(self, lnk, dir):
ctx = Context()
- err = Driver.OPTS.validate(lnk.options, ctx)
+ err = dir.VALIDATOR.validate(lnk.options, ctx)
if err: return "error in options: %s" % err
def resolve_declare(self, sst, lnk, dir, action):
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index ce1302a4d6..ca35c56166 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -580,6 +580,22 @@ class ReceiverTests(Base):
# XXX: need testClose
+ def testMode(self):
+ msgs = [self.content("testMode", 1),
+ self.content("testMode", 2),
+ self.content("testMode", 3)]
+
+ for m in msgs:
+ self.snd.send(m)
+
+ rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
+ rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
+ self.drain(rb, expected=msgs)
+ self.drain(rc, expected=msgs)
+ rb2 = self.ssn.receiver(rb.source)
+ self.assertEmpty(rb2)
+ self.drain(self.rcv, expected=[])
+
class AddressTests(Base):
def setup_connection(self):
@@ -846,6 +862,12 @@ class AddressErrorTests(Base):
self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
lambda e: "unrecognized characters" in str(e))
+ def testInvalidMode(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
+ ReceiveError,
+ lambda e: "not in ('browse', 'consume')" in str(e))
+
SENDER_Q = 'test-sender-q; {create: always, delete: always}'
class SenderTests(Base):
diff --git a/qpid/python/todo.txt b/qpid/python/todo.txt
index c8a4844370..6afe07d4f2 100644
--- a/qpid/python/todo.txt
+++ b/qpid/python/todo.txt
@@ -167,7 +167,7 @@ Address:
+ need to handle cleanup of temp queues/topics: F, NR
+ passthrough options for creating exchanges/queues: F, NR
- integration with java: NF
- - queue browsing: NF
+ - queue browsing: F, NR
- temporary queues: NF
- xquery: NF
@@ -190,3 +190,5 @@ Examples:
Miscellaneous:
- standard ping-like (drain/spout) utilities for all clients: NF
+ - caching of resolved addresses
+ - consider using separate session for query/deletion/creation of addresses