diff options
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r-- | python/qpid/messaging/driver.py | 154 |
1 files changed, 85 insertions, 69 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 383845f214..ba53d94e33 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -33,7 +33,7 @@ from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import connect -from qpid.validator import And, Context, Map, Types, Values +from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread log = getLogger("qpid.messaging") @@ -78,9 +78,8 @@ class Pattern: sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#"))) -FILTER_DEFAULTS = { - "topic": Pattern("*"), - "amq.failover": Pattern("DUMMY") +SUBJECT_DEFAULTS = { + "topic": "#" } # XXX @@ -130,7 +129,14 @@ class SessionState: id = self.sent self.write_cmd(query, lambda: handler(self.results.pop(id))) - def write_cmd(self, cmd, action=noop): + def apply_overrides(self, cmd, overrides): + for k, v in overrides.items(): + cmd[k.replace('-', '_')] = v + + def write_cmd(self, cmd, action=noop, overrides=None): + if overrides: + self.apply_overrides(cmd, overrides) + if action != noop: cmd.sync = True if self.detached: @@ -154,28 +160,36 @@ class SessionState: self.driver.write_op(op) POLICIES = Values("always", "sender", "receiver", "never") +RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", + "exactly-once") -class Bindings: - - def validate(self, o, ctx): - t = ctx.containers[1].get("type", "queue") - if t != "queue": - return "bindings are only permitted on nodes of type queue" +DECLARE = Map({}, restricted=False) +BINDINGS = List(Map({ + "exchange": Types(basestring), + "queue": Types(basestring), + "key": Types(basestring), + "arguments": Map({}, restricted=False) + })) COMMON_OPTS = { - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node-properties": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-properties": Map({ - "type": Types(basestring), - "bindings": And(Types(list), Bindings()) - }, - restricted=False) - }) - } + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-declare": DECLARE, + "x-bindings": BINDINGS + }), + "link": Map({ + "name": Types(basestring), + "durable": Types(bool), + "reliability": RELIABILITY, + "x-declare": DECLARE, + "x-bindings": BINDINGS, + "x-subscribe": Map({}, restricted=False) + }) + } RECEIVE_MODES = Values("browse", "consume") @@ -196,36 +210,46 @@ class LinkIn: _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv _rcv.draining = False + _rcv.on_unlink = [] def do_link(self, sst, rcv, _rcv, type, subtype, action): + link_opts = _rcv.options.get("link", {}) + # XXX: default? + reliability = link_opts.get("reliability", "unreliable") + declare = link_opts.get("x-declare", {}) + subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired if type == "topic": - _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) - filter = _rcv.options.get("filter") - if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[subtype] - elif _rcv.subject and filter: - # XXX - raise Exception("can't supply both subject and filter") - elif _rcv.subject: - # XXX - f = Pattern(_rcv.subject) - else: - f = filter - f._bind(sst, _rcv.name, _rcv._queue) + default_name = "%s.%s" % (rcv.session.name, _rcv.destination) + _rcv._queue = link_opts.get("name", default_name) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, + durable=link_opts.get("durable", False), + exclusive=True, + auto_delete=(reliability == "unreliable")), + overrides=declare) + _rcv.on_unlink = [QueueDelete(_rcv._queue)] + subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype) + sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject)) + bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject) elif type == "queue": _rcv._queue = _rcv.name if _rcv.options.get("mode", "consume") == "browse": acq_mode = acquire_mode.not_acquired + bindings = get_bindings(link_opts, queue=_rcv._queue) + sst.write_cmds(bindings) sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, - acquire_mode = acq_mode)) + acquire_mode = acq_mode), + overrides=subscribe) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): - sst.write_cmd(MessageCancel(_rcv.destination), action) + link_opts = _rcv.options.get("link", {}) + reliability = link_opts.get("reliability") + cmds = [MessageCancel(_rcv.destination)] + cmds.extend(_rcv.on_unlink) + sst.write_cmds(cmds, action) def del_link(self, sst, rcv, _rcv): del sst.destinations[_rcv.destination] @@ -240,13 +264,16 @@ class LinkOut: _snd.closing = False def do_link(self, sst, snd, _snd, type, subtype, action): + link_opts = _snd.options.get("link", {}) if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject + bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject) elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - action() + bindings = get_bindings(link_opts, queue=_snd.name) + sst.write_cmds(bindings, action) def do_unlink(self, sst, snd, _snd, action=noop): action() @@ -437,6 +464,17 @@ class Driver: DEFAULT_DISPOSITION = Disposition(None) +def get_bindings(opts, queue=None, exchange=None, key=None): + bindings = opts.get("x-bindings", []) + cmds = [] + for b in bindings: + exchange = b.get("exchange", exchange) + queue = b.get("queue", queue) + key = b.get("key", key) + args = b.get("arguments", {}) + cmds.append(ExchangeBind(queue, exchange, key, args)) + return cmds + class Engine: def __init__(self, connection): @@ -785,12 +823,6 @@ class Engine: err = self.declare(sst, lnk, action) else: err = ("no such queue: %s" % lnk.name,) - elif type == "queue": - try: - cmds = self.bindings(lnk) - sst.write_cmds(cmds, lambda: action(type, subtype)) - except address.ParseError, e: - err = (e,) else: action(type, subtype) @@ -831,23 +863,21 @@ class Engine: def declare(self, sst, lnk, action): name = lnk.name - props = lnk.options.get("node-properties", {}) + props = lnk.options.get("node", {}) durable = props.get("durable", DURABLE_DEFAULT) type = props.get("type", "queue") - xprops = props.get("x-properties", {}) + declare = props.get("x-declare", {}) if type == "topic": cmd = ExchangeDeclare(exchange=name, durable=durable) + bindings = get_bindings(props, exchange=name) elif type == "queue": cmd = QueueDeclare(queue=name, durable=durable) + bindings = get_bindings(props, queue=name) else: raise ValueError(type) - for f in cmd.FIELDS: - if f.name != "arguments" and xprops.has_key(f.name): - cmd[f.name] = xprops.pop(f.name) - if xprops: - cmd.arguments = xprops + sst.apply_overrides(cmd, declare) if type == "topic": if cmd.type is None: @@ -857,11 +887,7 @@ class Engine: subtype = None cmds = [cmd] - if type == "queue": - try: - cmds.extend(self.bindings(lnk)) - except address.ParseError, e: - return (e,) + cmds.extend(bindings) def declared(): self.address_cache[name] = (type, subtype) @@ -869,16 +895,6 @@ class Engine: sst.write_cmds(cmds, declared) - def bindings(self, lnk): - props = lnk.options.get("node-properties", {}) - xprops = props.get("x-properties", {}) - bindings = xprops.get("bindings", []) - cmds = [] - for b in bindings: - n, s, o = address.parse(b) - cmds.append(ExchangeBind(lnk.name, n, s, o)) - return cmds - def delete(self, sst, name, action): def deleted(): del self.address_cache[name] |