summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/driver.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r--python/qpid/messaging/driver.py154
1 files changed, 85 insertions, 69 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 383845f214..ba53d94e33 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -33,7 +33,7 @@ from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
from qpid.util import connect
-from qpid.validator import And, Context, Map, Types, Values
+from qpid.validator import And, Context, List, Map, Types, Values
from threading import Condition, Thread
log = getLogger("qpid.messaging")
@@ -78,9 +78,8 @@ class Pattern:
sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#")))
-FILTER_DEFAULTS = {
- "topic": Pattern("*"),
- "amq.failover": Pattern("DUMMY")
+SUBJECT_DEFAULTS = {
+ "topic": "#"
}
# XXX
@@ -130,7 +129,14 @@ class SessionState:
id = self.sent
self.write_cmd(query, lambda: handler(self.results.pop(id)))
- def write_cmd(self, cmd, action=noop):
+ def apply_overrides(self, cmd, overrides):
+ for k, v in overrides.items():
+ cmd[k.replace('-', '_')] = v
+
+ def write_cmd(self, cmd, action=noop, overrides=None):
+ if overrides:
+ self.apply_overrides(cmd, overrides)
+
if action != noop:
cmd.sync = True
if self.detached:
@@ -154,28 +160,36 @@ class SessionState:
self.driver.write_op(op)
POLICIES = Values("always", "sender", "receiver", "never")
+RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
+ "exactly-once")
-class Bindings:
-
- def validate(self, o, ctx):
- t = ctx.containers[1].get("type", "queue")
- if t != "queue":
- return "bindings are only permitted on nodes of type queue"
+DECLARE = Map({}, restricted=False)
+BINDINGS = List(Map({
+ "exchange": Types(basestring),
+ "queue": Types(basestring),
+ "key": Types(basestring),
+ "arguments": Map({}, restricted=False)
+ }))
COMMON_OPTS = {
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node-properties": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-properties": Map({
- "type": Types(basestring),
- "bindings": And(Types(list), Bindings())
- },
- restricted=False)
- })
- }
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-declare": DECLARE,
+ "x-bindings": BINDINGS
+ }),
+ "link": Map({
+ "name": Types(basestring),
+ "durable": Types(bool),
+ "reliability": RELIABILITY,
+ "x-declare": DECLARE,
+ "x-bindings": BINDINGS,
+ "x-subscribe": Map({}, restricted=False)
+ })
+ }
RECEIVE_MODES = Values("browse", "consume")
@@ -196,36 +210,46 @@ class LinkIn:
_rcv.destination = str(rcv.id)
sst.destinations[_rcv.destination] = _rcv
_rcv.draining = False
+ _rcv.on_unlink = []
def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ link_opts = _rcv.options.get("link", {})
+ # XXX: default?
+ reliability = link_opts.get("reliability", "unreliable")
+ declare = link_opts.get("x-declare", {})
+ subscribe = link_opts.get("x-subscribe", {})
acq_mode = acquire_mode.pre_acquired
if type == "topic":
- _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
- filter = _rcv.options.get("filter")
- if _rcv.subject is None and filter is None:
- f = FILTER_DEFAULTS[subtype]
- elif _rcv.subject and filter:
- # XXX
- raise Exception("can't supply both subject and filter")
- elif _rcv.subject:
- # XXX
- f = Pattern(_rcv.subject)
- else:
- f = filter
- f._bind(sst, _rcv.name, _rcv._queue)
+ default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
+ _rcv._queue = link_opts.get("name", default_name)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue,
+ durable=link_opts.get("durable", False),
+ exclusive=True,
+ auto_delete=(reliability == "unreliable")),
+ overrides=declare)
+ _rcv.on_unlink = [QueueDelete(_rcv._queue)]
+ subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
+ sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
+ bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
elif type == "queue":
_rcv._queue = _rcv.name
if _rcv.options.get("mode", "consume") == "browse":
acq_mode = acquire_mode.not_acquired
+ bindings = get_bindings(link_opts, queue=_rcv._queue)
+ sst.write_cmds(bindings)
sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
- acquire_mode = acq_mode))
+ acquire_mode = acq_mode),
+ overrides=subscribe)
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
- sst.write_cmd(MessageCancel(_rcv.destination), action)
+ link_opts = _rcv.options.get("link", {})
+ reliability = link_opts.get("reliability")
+ cmds = [MessageCancel(_rcv.destination)]
+ cmds.extend(_rcv.on_unlink)
+ sst.write_cmds(cmds, action)
def del_link(self, sst, rcv, _rcv):
del sst.destinations[_rcv.destination]
@@ -240,13 +264,16 @@ class LinkOut:
_snd.closing = False
def do_link(self, sst, snd, _snd, type, subtype, action):
+ link_opts = _snd.options.get("link", {})
if type == "topic":
_snd._exchange = _snd.name
_snd._routing_key = _snd.subject
+ bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
elif type == "queue":
_snd._exchange = ""
_snd._routing_key = _snd.name
- action()
+ bindings = get_bindings(link_opts, queue=_snd.name)
+ sst.write_cmds(bindings, action)
def do_unlink(self, sst, snd, _snd, action=noop):
action()
@@ -437,6 +464,17 @@ class Driver:
DEFAULT_DISPOSITION = Disposition(None)
+def get_bindings(opts, queue=None, exchange=None, key=None):
+ bindings = opts.get("x-bindings", [])
+ cmds = []
+ for b in bindings:
+ exchange = b.get("exchange", exchange)
+ queue = b.get("queue", queue)
+ key = b.get("key", key)
+ args = b.get("arguments", {})
+ cmds.append(ExchangeBind(queue, exchange, key, args))
+ return cmds
+
class Engine:
def __init__(self, connection):
@@ -785,12 +823,6 @@ class Engine:
err = self.declare(sst, lnk, action)
else:
err = ("no such queue: %s" % lnk.name,)
- elif type == "queue":
- try:
- cmds = self.bindings(lnk)
- sst.write_cmds(cmds, lambda: action(type, subtype))
- except address.ParseError, e:
- err = (e,)
else:
action(type, subtype)
@@ -831,23 +863,21 @@ class Engine:
def declare(self, sst, lnk, action):
name = lnk.name
- props = lnk.options.get("node-properties", {})
+ props = lnk.options.get("node", {})
durable = props.get("durable", DURABLE_DEFAULT)
type = props.get("type", "queue")
- xprops = props.get("x-properties", {})
+ declare = props.get("x-declare", {})
if type == "topic":
cmd = ExchangeDeclare(exchange=name, durable=durable)
+ bindings = get_bindings(props, exchange=name)
elif type == "queue":
cmd = QueueDeclare(queue=name, durable=durable)
+ bindings = get_bindings(props, queue=name)
else:
raise ValueError(type)
- for f in cmd.FIELDS:
- if f.name != "arguments" and xprops.has_key(f.name):
- cmd[f.name] = xprops.pop(f.name)
- if xprops:
- cmd.arguments = xprops
+ sst.apply_overrides(cmd, declare)
if type == "topic":
if cmd.type is None:
@@ -857,11 +887,7 @@ class Engine:
subtype = None
cmds = [cmd]
- if type == "queue":
- try:
- cmds.extend(self.bindings(lnk))
- except address.ParseError, e:
- return (e,)
+ cmds.extend(bindings)
def declared():
self.address_cache[name] = (type, subtype)
@@ -869,16 +895,6 @@ class Engine:
sst.write_cmds(cmds, declared)
- def bindings(self, lnk):
- props = lnk.options.get("node-properties", {})
- xprops = props.get("x-properties", {})
- bindings = xprops.get("bindings", [])
- cmds = []
- for b in bindings:
- n, s, o = address.parse(b)
- cmds.append(ExchangeBind(lnk.name, n, s, o))
- return cmds
-
def delete(self, sst, name, action):
def deleted():
del self.address_cache[name]