summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-11-16 12:05:50 +0000
committerRafael H. Schloming <rhs@apache.org>2009-11-16 12:05:50 +0000
commitb08f16897e3f5c65c155d6d63c380003510430c9 (patch)
tree56434bda6a4135e0ae209a19424916dd21779232 /python/qpid/driver.py
parentefc6473096622a01b2a3907093431b49d8ebfb1e (diff)
downloadqpid-python-b08f16897e3f5c65c155d6d63c380003510430c9.tar.gz
added address support for specifying node type and properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py190
1 files changed, 101 insertions, 89 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 3e45045bec..21d2b432f4 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -471,43 +471,20 @@ class Driver:
if _snd.options is None:
_snd.options = {}
- def do_link():
- snd.linked = True
-
- def do_queue_q(result):
- if sst.detached:
- return
-
- if result.queue:
+ def do_link(type, subtype):
+ if type == "topic":
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ elif type == "queue":
_snd._exchange = ""
_snd._routing_key = _snd.name
- do_link()
- else:
- snd.error = ("no such queue: %s" % _snd.name,)
- del self._attachments[snd]
- snd.closed = True
-
- def do_exchange_q(result):
- if sst.detached:
- return
- if result.not_found:
- if _snd.options.get("create") in ("always", "sender"):
- sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
- _snd._exchange = ""
- _snd._routing_key = _snd.name
- else:
- sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
- return
- else:
- _snd._exchange = _snd.name
- _snd._routing_key = _snd.subject
- do_link()
+ snd.linked = True
- sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
+ self.resolve_declare(sst, _snd, "sender", do_link)
self._attachments[snd] = _snd
- if snd.closing and not (snd.closed or _snd.closing):
+ if snd.linked and snd.closing and not (snd.closed or _snd.closing):
_snd.closing = True
def do_unlink():
del self._attachments[snd]
@@ -545,38 +522,13 @@ class Driver:
if _rcv.options is None:
_rcv.options = {}
- def do_link():
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
- rcv.linked = True
-
- def do_queue_q(result):
- if sst.detached:
- return
- if result.queue:
- _rcv._queue = _rcv.name
- do_link()
- else:
- rcv.error = ("no such queue: %s" % _rcv.name,)
- del self._attachments[rcv]
- rcv.closed = True
-
- def do_exchange_q(result):
- if sst.detached:
- return
- if result.not_found:
- if _rcv.options.get("create") in ("always", "receiver"):
- _rcv._queue = _rcv.name
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
- else:
- sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
- return
- else:
+ def do_link(type, subtype):
+ if type == "topic":
_rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
filter = _rcv.options.get("filter")
if _rcv.subject is None and filter is None:
- f = FILTER_DEFAULTS[result.type]
+ f = FILTER_DEFAULTS[subtype]
elif _rcv.subject and filter:
# XXX
raise Exception("can't supply both subject and filter")
@@ -587,41 +539,101 @@ class Driver:
else:
f = filter
f._bind(sst, _rcv.name, _rcv._queue)
- do_link()
- sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
+ elif type == "queue":
+ _rcv._queue = _rcv.name
+
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ rcv.linked = True
+
+ self.resolve_declare(sst, _rcv, "receiver", do_link)
self._attachments[rcv] = _rcv
- if rcv.closing and not rcv.closed:
- if rcv.linked:
- if not _rcv.canceled:
- def do_unlink():
- del self._attachments[rcv]
- rcv.closed = True
- if _rcv.options.get("delete") in ("always", "receiver"):
- sst.write_cmd(MessageCancel(rcv.destination))
- self.delete(sst, _rcv.name, do_unlink)
- else:
- sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
- _rcv.canceled = True
- else:
- rcv.closed = True
+ if rcv.linked and rcv.closing and not rcv.closed:
+ if not _rcv.canceled:
+ def do_unlink():
+ del self._attachments[rcv]
+ rcv.closed = True
+ if _rcv.options.get("delete") in ("always", "receiver"):
+ sst.write_cmd(MessageCancel(rcv.destination))
+ self.delete(sst, _rcv.name, do_unlink)
+ else:
+ sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
+ _rcv.canceled = True
+
+ def resolve_declare(self, sst, lnk, dir, action):
+ def do_resolved(er, qr):
+ if er.not_found and not qr.queue:
+ if lnk.options.get("create") in ("always", dir):
+ err = self.declare(sst, lnk.name, lnk.options, action)
+ else:
+ err = ("no such queue: %s" % lnk.name,)
- def delete(self, sst, name, completion):
- def do_queue_delq(result):
- if sst.detached:
- return
- if result.queue:
- sst.write_cmd(QueueDelete(name), completion)
+ if err:
+ tgt = lnk.target
+ tgt.error = err
+ del self._attachments[tgt]
+ tgt.closed = True
+ return
+ elif qr.queue:
+ action("queue", None)
else:
- completion()
- def do_exchange_delq(result):
- if sst.detached:
- return
- if result.not_found:
- sst.write_query(QueueQuery(name), do_queue_delq)
+ action("topic", er.type)
+ self.resolve(sst, lnk.name, do_resolved)
+
+ def resolve(self, sst, name, action):
+ args = []
+ def do_result(r):
+ args.append(r)
+ def do_action(r):
+ do_result(r)
+ action(*args)
+ sst.write_query(ExchangeQuery(name), do_result)
+ sst.write_query(QueueQuery(name), do_action)
+
+ def declare(self, sst, name, options, action):
+ opts = dict(options)
+ props = dict(opts.pop("node-properties", {}))
+ durable = props.pop("durable", DURABLE_DEFAULT)
+ type = props.pop("type", "queue")
+ xprops = dict(props.pop("x-properties", {}))
+
+ if props:
+ return ("unrecognized option(s): %s" % "".join(props.keys()),)
+
+ if type == "topic":
+ cmd = ExchangeDeclare(exchange=name, durable=durable)
+ elif type == "queue":
+ cmd = QueueDeclare(queue=name, durable=durable)
+ else:
+ return ("unrecognized type, must be topic or queue: %s" % type,)
+
+ for f in cmd.FIELDS:
+ if f.name != "arguments" and xprops.has_key(f.name):
+ cmd[f.name] = xprops.pop(f.name)
+ if xprops:
+ cmd.arguments = xprops
+
+ if type == "topic":
+ if cmd.type is None:
+ cmd.type = "topic"
+ subtype = cmd.type
+ else:
+ subtype = None
+
+ def do_action():
+ action(type, subtype)
+ sst.write_cmd(cmd, do_action)
+
+ def delete(self, sst, name, action):
+ def do_delete(er, qr):
+ if not er.not_found:
+ sst.write_cmd(ExchangeDelete(name), action)
+ elif qr.queue:
+ sst.write_cmd(QueueDelete(name), action)
else:
- sst.write_cmd(ExchangeDelete(name), completion)
- sst.write_query(ExchangeQuery(name), do_exchange_delq)
+ action()
+ self.resolve(sst, name, do_delete)
def process(self, ssn):
if ssn.closed or ssn.closing: return