diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-03-23 14:57:28 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-23 14:57:28 +0000 |
commit | fe99d24c4084b2cb2479b820a5b44378d1812ffc (patch) | |
tree | 82dd1a8c229d9d86e266870769b6460fad7dd2e0 | |
parent | 166b7426164e35992d43d88b5c9e61c0482a41bd (diff) | |
download | qpid-python-fe99d24c4084b2cb2479b820a5b44378d1812ffc.tar.gz |
Several updates to address options including:
- renamed node-properties to node
- added link to permit durable links (with names)
- split x-properties into x-declare, x-subscribe, and x-bindings
- removed automatic passthrough of unrecognized options (as this was confusing)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@926604 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/brokertest.py | 4 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 154 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 41 | ||||
-rw-r--r-- | python/qpid/tests/messaging/__init__.py | 4 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 157 | ||||
-rw-r--r-- | python/qpid/validator.py | 14 |
6 files changed, 234 insertions, 140 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 25642d0c88..192228a74a 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -303,8 +303,8 @@ class Broker(Popen): c.close() def _prep_sender(self, queue, durable, xprops): - s = queue + "; {create:always, node-properties:{durable:" + str(durable) - if xprops != None: s += ", x-properties:{" + xprops + "}" + s = queue + "; {create:always, node:{durable:" + str(durable) + if xprops != None: s += ", x-declare:{" + xprops + "}" return s + "}}" def send_message(self, queue, message, durable=True, xprops=None, session=None): diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 383845f214..ba53d94e33 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -33,7 +33,7 @@ from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import connect -from qpid.validator import And, Context, Map, Types, Values +from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread log = getLogger("qpid.messaging") @@ -78,9 +78,8 @@ class Pattern: sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#"))) -FILTER_DEFAULTS = { - "topic": Pattern("*"), - "amq.failover": Pattern("DUMMY") +SUBJECT_DEFAULTS = { + "topic": "#" } # XXX @@ -130,7 +129,14 @@ class SessionState: id = self.sent self.write_cmd(query, lambda: handler(self.results.pop(id))) - def write_cmd(self, cmd, action=noop): + def apply_overrides(self, cmd, overrides): + for k, v in overrides.items(): + cmd[k.replace('-', '_')] = v + + def write_cmd(self, cmd, action=noop, overrides=None): + if overrides: + self.apply_overrides(cmd, overrides) + if action != noop: cmd.sync = True if self.detached: @@ -154,28 +160,36 @@ class SessionState: self.driver.write_op(op) POLICIES = Values("always", "sender", "receiver", "never") +RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", + "exactly-once") -class Bindings: - - def validate(self, o, ctx): - t = ctx.containers[1].get("type", "queue") - if t != "queue": - return "bindings are only permitted on nodes of type queue" +DECLARE = Map({}, restricted=False) +BINDINGS = List(Map({ + "exchange": Types(basestring), + "queue": Types(basestring), + "key": Types(basestring), + "arguments": Map({}, restricted=False) + })) COMMON_OPTS = { - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node-properties": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-properties": Map({ - "type": Types(basestring), - "bindings": And(Types(list), Bindings()) - }, - restricted=False) - }) - } + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-declare": DECLARE, + "x-bindings": BINDINGS + }), + "link": Map({ + "name": Types(basestring), + "durable": Types(bool), + "reliability": RELIABILITY, + "x-declare": DECLARE, + "x-bindings": BINDINGS, + "x-subscribe": Map({}, restricted=False) + }) + } RECEIVE_MODES = Values("browse", "consume") @@ -196,36 +210,46 @@ class LinkIn: _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv _rcv.draining = False + _rcv.on_unlink = [] def do_link(self, sst, rcv, _rcv, type, subtype, action): + link_opts = _rcv.options.get("link", {}) + # XXX: default? + reliability = link_opts.get("reliability", "unreliable") + declare = link_opts.get("x-declare", {}) + subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) - filter = _rcv.options.get("filter") - if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[subtype] - elif _rcv.subject and filter: - # XXX - raise Exception("can't supply both subject and filter") - elif _rcv.subject: - # XXX - f = Pattern(_rcv.subject) - else: - f = filter - f._bind(sst, _rcv.name, _rcv._queue) + default_name = "%s.%s" % (rcv.session.name, _rcv.destination) + _rcv._queue = link_opts.get("name", default_name) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, + durable=link_opts.get("durable", False), + exclusive=True, + auto_delete=(reliability == "unreliable")), + overrides=declare) + _rcv.on_unlink = [QueueDelete(_rcv._queue)] + subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype) + sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject)) + bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject) elif type == "queue": _rcv._queue = _rcv.name if _rcv.options.get("mode", "consume") == "browse": acq_mode = acquire_mode.not_acquired + bindings = get_bindings(link_opts, queue=_rcv._queue) + sst.write_cmds(bindings) sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, - acquire_mode = acq_mode)) + acquire_mode = acq_mode), + overrides=subscribe) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): - sst.write_cmd(MessageCancel(_rcv.destination), action) + link_opts = _rcv.options.get("link", {}) + reliability = link_opts.get("reliability") + cmds = [MessageCancel(_rcv.destination)] + cmds.extend(_rcv.on_unlink) + sst.write_cmds(cmds, action) def del_link(self, sst, rcv, _rcv): del sst.destinations[_rcv.destination] @@ -240,13 +264,16 @@ class LinkOut: _snd.closing = False def do_link(self, sst, snd, _snd, type, subtype, action): + link_opts = _snd.options.get("link", {}) if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject + bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject) elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - action() + bindings = get_bindings(link_opts, queue=_snd.name) + sst.write_cmds(bindings, action) def do_unlink(self, sst, snd, _snd, action=noop): action() @@ -437,6 +464,17 @@ class Driver: DEFAULT_DISPOSITION = Disposition(None) +def get_bindings(opts, queue=None, exchange=None, key=None): + bindings = opts.get("x-bindings", []) + cmds = [] + for b in bindings: + exchange = b.get("exchange", exchange) + queue = b.get("queue", queue) + key = b.get("key", key) + args = b.get("arguments", {}) + cmds.append(ExchangeBind(queue, exchange, key, args)) + return cmds + class Engine: def __init__(self, connection): @@ -785,12 +823,6 @@ class Engine: err = self.declare(sst, lnk, action) else: err = ("no such queue: %s" % lnk.name,) - elif type == "queue": - try: - cmds = self.bindings(lnk) - sst.write_cmds(cmds, lambda: action(type, subtype)) - except address.ParseError, e: - err = (e,) else: action(type, subtype) @@ -831,23 +863,21 @@ class Engine: def declare(self, sst, lnk, action): name = lnk.name - props = lnk.options.get("node-properties", {}) + props = lnk.options.get("node", {}) durable = props.get("durable", DURABLE_DEFAULT) type = props.get("type", "queue") - xprops = props.get("x-properties", {}) + declare = props.get("x-declare", {}) if type == "topic": cmd = ExchangeDeclare(exchange=name, durable=durable) + bindings = get_bindings(props, exchange=name) elif type == "queue": cmd = QueueDeclare(queue=name, durable=durable) + bindings = get_bindings(props, queue=name) else: raise ValueError(type) - for f in cmd.FIELDS: - if f.name != "arguments" and xprops.has_key(f.name): - cmd[f.name] = xprops.pop(f.name) - if xprops: - cmd.arguments = xprops + sst.apply_overrides(cmd, declare) if type == "topic": if cmd.type is None: @@ -857,11 +887,7 @@ class Engine: subtype = None cmds = [cmd] - if type == "queue": - try: - cmds.extend(self.bindings(lnk)) - except address.ParseError, e: - return (e,) + cmds.extend(bindings) def declared(): self.address_cache[name] = (type, subtype) @@ -869,16 +895,6 @@ class Engine: sst.write_cmds(cmds, declared) - def bindings(self, lnk): - props = lnk.options.get("node-properties", {}) - xprops = props.get("x-properties", {}) - bindings = xprops.get("bindings", []) - cmds = [] - for b in bindings: - n, s, o = address.parse(b) - cmds.append(ExchangeBind(lnk.name, n, s, o)) - return cmds - def delete(self, sst, name, action): def deleted(): del self.address_cache[name] diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 53df51dfd8..af2b1a8007 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -295,14 +295,29 @@ class Session: create: <create-policy>, delete: <delete-policy>, assert: <assert-policy>, - node-properties: { + node: { type: <node-type>, durable: <node-durability>, - x-properties: { - bindings: ["<exchange>/<key>", ...], - <passthrough-key>: <passthrough-value> - } + x-declare: { ... <queue-declare overrides> ... } + x-bindings: [<binding_1>, ... <binding_n>] } + link: { + name: <link-name>, + durable: <link-durability>, + reliability: <link-reliability>, + x-declare: { ... <queue-declare overrides> ... } + x-bindings: [<binding_1>, ... <binding_n>] + x-subscribe: { ... <message-subscribe overrides> ... } + } + } + + Bindings are specified as a map with the following options:: + + { + exchange: <exchange>, + queue: <queue>, + key: <key>, + arguments: <arguments> } The create, delete, and assert policies specify who should perfom @@ -316,14 +331,12 @@ class Session: The node-type is one of: - I{topic}: a topic node will default to the topic exchange, - x-properties may be used to specify other exchange types + x-declare may be used to specify other exchange types - I{queue}: this is the default node-type - The x-properties map permits arbitrary additional keys and values to - be specified. These keys and values are passed through when creating - a node or asserting facts about an existing node. Any passthrough - keys and values that do not match a standard field of the underlying - exchange or queue declare command will be sent in the arguments map. + The x-declare map permits protocol specific keys and values to be + specified. These keys and values are passed through when creating a + node or asserting facts about an existing node. Examples -------- @@ -353,18 +366,18 @@ class Session: You can customize the properties of the queue:: - my-queue; {create: always, node-properties: {durable: True}} + my-queue; {create: always, node: {durable: True}} You can create a topic instead if you want:: - my-queue; {create: always, node-properties: {type: topic}} + my-queue; {create: always, node: {type: topic}} You can assert that the address resolves to a node with particular properties:: my-transient-topic; { assert: always, - node-properties: { + node: { type: topic, durable: False } diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py index 0744932944..c3581efb9d 100644 --- a/python/qpid/tests/messaging/__init__.py +++ b/python/qpid/tests/messaging/__init__.py @@ -59,8 +59,8 @@ class Base(Test): else: return "%s[%s, %s]" % (base, count, self.test_id) - def message(self, base, count = None): - return Message(self.content(base, count)) + def message(self, base, count = None, **kwargs): + return Message(content=self.content(base, count), **kwargs) def ping(self, ssn): PING_Q = 'ping-queue; {create: always, delete: always}' diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index 3f2c823914..5d4fc1646b 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -247,9 +247,9 @@ class SessionTests(Base): test-reject-queue; { create: always, delete: always, - node-properties: { - x-properties: { - alternate_exchange: 'amq.topic' + node: { + x-declare: { + alternate-exchange: 'amq.topic' } } } @@ -515,7 +515,7 @@ class ReceiverTests(Base): snd = self.ssn.sender("""test-double-close; { create: always, delete: sender, - node-properties: { + node: { type: topic } } @@ -571,9 +571,9 @@ class AddressTests(Base): assert "error in options: %s" % error == str(e), e def testIllegalKey(self): - self.badOption("{create: always, node-properties: " + self.badOption("{create: always, node: " "{this-property-does-not-exist: 3}}", - "node-properties: this-property-does-not-exist: " + "node: this-property-does-not-exist: " "illegal key") def testWrongValue(self): @@ -581,23 +581,17 @@ class AddressTests(Base): "('always', 'sender', 'receiver', 'never')") def testWrongType1(self): - self.badOption("{node-properties: asdf}", - "node-properties: asdf is not a map") + self.badOption("{node: asdf}", + "node: asdf is not a map") def testWrongType2(self): - self.badOption("{node-properties: {durable: []}}", - "node-properties: durable: [] is not a bool") - - def testNonQueueBindings(self): - self.badOption("{node-properties: {type: topic, x-properties: " - "{bindings: []}}}", - "node-properties: x-properties: bindings: " - "bindings are only permitted on nodes of type queue") + self.badOption("{node: {durable: []}}", + "node: durable: [] is not a bool") def testCreateQueue(self): snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " - "node-properties: {type: queue, durable: False, " - "x-properties: {auto_delete: true}}}") + "node: {type: queue, durable: False, " + "x-declare: {auto_delete: true}}}") content = self.content("testCreateQueue") snd.send(content) rcv = self.ssn.receiver("test-create-queue") @@ -607,10 +601,10 @@ class AddressTests(Base): addr = """test-create-exchange; { create: always, delete: always, - node-properties: { + node: { type: topic, durable: False, - x-properties: {auto_delete: true, %s} + x-declare: {auto_delete: true, %s} } }""" % props snd = self.ssn.sender(addr) @@ -677,15 +671,15 @@ class AddressTests(Base): # XXX: need to figure out close after error self.conn._remove_session(self.ssn) - def testBindings(self): + def testNodeBindingsQueue(self): snd = self.ssn.sender(""" -test-bindings-queue; { +test-node-bindings-queue; { create: always, delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"] - } + node: { + x-bindings: [{exchange: "amq.topic", key: "a.#"}, + {exchange: "amq.direct", key: "b"}, + {exchange: "amq.topic", key: "c.*"}] } } """) @@ -696,49 +690,80 @@ test-bindings-queue; { snd_a.send("two") snd_b.send("three") snd_c.send("four") - rcv = self.ssn.receiver("test-bindings-queue") + rcv = self.ssn.receiver("test-node-bindings-queue") self.drain(rcv, expected=["one", "two", "three", "four"]) - def testBindingsAdditive(self): - m1 = self.content("testBindingsAdditive", 1) - m2 = self.content("testBindingsAdditive", 2) - m3 = self.content("testBindingsAdditive", 3) - m4 = self.content("testBindingsAdditive", 4) - + def testNodeBindingsTopic(self): + rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") + rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") + rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") + rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") snd = self.ssn.sender(""" -test-bindings-additive-queue; { +test-node-bindings-topic; { create: always, delete: always, - node-properties: { - x-properties: { - bindings: ["amq.topic/a"] - } + node: { + type: topic, + x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, + {queue: test-node-bindings-topic-queue-a, key: "a.#"}, + {queue: test-node-bindings-topic-queue-b, key: "b"}, + {queue: test-node-bindings-topic-queue-c, key: "c.*"}] } } """) + m1 = Message("one") + m2 = Message(subject="a.foo", content="two") + m3 = Message(subject="b", content="three") + m4 = Message(subject="c.bar", content="four") + snd.send(m1) + snd.send(m2) + snd.send(m3) + snd.send(m4) + self.drain(rcv, expected=[m1, m2, m3, m4]) + self.drain(rcv_a, expected=[m2]) + self.drain(rcv_b, expected=[m3]) + self.drain(rcv_c, expected=[m4]) + + def testLinkBindings(self): + m_a = self.message("testLinkBindings", 1, subject="a") + m_b = self.message("testLinkBindings", 2, subject="b") + + self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") + snd = self.ssn.sender("amq.topic") + + snd.send(m_a) + snd.send(m_b) + snd.close() - snd_a = self.ssn.sender("amq.topic/a") - snd_b = self.ssn.sender("amq.topic/b") + rcv = self.ssn.receiver("test-link-bindings-queue") + self.assertEmpty(rcv) + + snd = self.ssn.sender(""" +amq.topic; { + link: { + x-bindings: [{queue: test-link-bindings-queue, key: a}] + } +} +""") - snd_a.send(m1) - snd_b.send(m2) + snd.send(m_a) + snd.send(m_b) - rcv = self.ssn.receiver("test-bindings-additive-queue") - self.drain(rcv, expected=[m1]) + self.drain(rcv, expected=[m_a]) + rcv.close() - new_snd = self.ssn.sender(""" -test-bindings-additive-queue; { - node-properties: { - x-properties: { - bindings: ["amq.topic/b"] - } + rcv = self.ssn.receiver(""" +test-link-bindings-queue; { + link: { + x-bindings: [{exchange: "amq.topic", key: b}] } } """) - new_snd.send(m3) - snd_b.send(m4) - self.drain(rcv, expected=[m3, m4]) + snd.send(m_a) + snd.send(m_b) + + self.drain(rcv, expected=[m_a, m_b]) def testSubjectOverride(self): snd = self.ssn.sender("amq.topic/a") @@ -764,6 +789,32 @@ test-bindings-additive-queue; { assert e2.subject == "b", "subject: %s" % e2.subject self.assertEmpty(rcv) + def doReliabilityTest(self, reliability, messages, expected): + snd = self.ssn.sender("amq.topic") + rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) + for m in messages: + snd.send(m) + self.conn.disconnect() + self.conn.connect() + self.drain(rcv, expected=expected) + + def testReliabilityUnreliable(self): + msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] + self.doReliabilityTest("unreliable", msgs, []) + + def testReliabilityAtLeastOnce(self): + msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] + self.doReliabilityTest("at-least-once", msgs, msgs) + + def testLinkName(self): + msgs = [self.message("testLinkName", i) for i in range(3)] + snd = self.ssn.sender("amq.topic") + trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") + qrcv = self.ssn.receiver("test-link-name") + for m in msgs: + snd.send(m) + self.drain(qrcv, expected=msgs) + NOSUCH_Q = "this-queue-should-not-exist" UNPARSEABLE_ADDR = "name/subject; {bad options" UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" diff --git a/python/qpid/validator.py b/python/qpid/validator.py index 7bd62b68f8..d234642b3e 100644 --- a/python/qpid/validator.py +++ b/python/qpid/validator.py @@ -54,6 +54,20 @@ class Types: else: return "%s is not one of: %s" % (o, ", ".join([t.__name__ for t in self.types])) +class List: + + def __init__(self, condition): + self.condition = condition + + def validate(self, o, ctx): + if not isinstance(o, list): + return "%s is not a list" % o + + ctx.push(o) + for v in o: + err = self.condition.validate(v, ctx) + if err: return err + class Map: def __init__(self, map, restricted=True): |