summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-11-16 12:05:50 +0000
committerRafael H. Schloming <rhs@apache.org>2009-11-16 12:05:50 +0000
commitb08f16897e3f5c65c155d6d63c380003510430c9 (patch)
tree56434bda6a4135e0ae209a19424916dd21779232 /python
parentefc6473096622a01b2a3907093431b49d8ebfb1e (diff)
downloadqpid-python-b08f16897e3f5c65c155d6d63c380003510430c9.tar.gz
added address support for specifying node type and properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/address.py2
-rw-r--r--python/qpid/driver.py190
-rw-r--r--python/qpid/ops.py4
-rw-r--r--python/qpid/tests/address.py27
-rw-r--r--python/qpid/tests/messaging.py41
5 files changed, 170 insertions, 94 deletions
diff --git a/python/qpid/address.py b/python/qpid/address.py
index 93336763f4..909a9e42e1 100644
--- a/python/qpid/address.py
+++ b/python/qpid/address.py
@@ -38,7 +38,7 @@ SEMI = Type("SEMI", r";")
SLASH = Type("SLASH", r"/")
COMMA = Type("COMMA", r",")
NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
-ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*')
+ID = Type("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?')
STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
ESC = Type("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]")
SYM = Type("SYM", r"[.#*%@$^!+-]")
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 3e45045bec..21d2b432f4 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -471,43 +471,20 @@ class Driver:
if _snd.options is None:
_snd.options = {}
- def do_link():
- snd.linked = True
-
- def do_queue_q(result):
- if sst.detached:
- return
-
- if result.queue:
+ def do_link(type, subtype):
+ if type == "topic":
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ elif type == "queue":
_snd._exchange = ""
_snd._routing_key = _snd.name
- do_link()
- else:
- snd.error = ("no such queue: %s" % _snd.name,)
- del self._attachments[snd]
- snd.closed = True
-
- def do_exchange_q(result):
- if sst.detached:
- return
- if result.not_found:
- if _snd.options.get("create") in ("always", "sender"):
- sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
- _snd._exchange = ""
- _snd._routing_key = _snd.name
- else:
- sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
- return
- else:
- _snd._exchange = _snd.name
- _snd._routing_key = _snd.subject
- do_link()
+ snd.linked = True
- sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
+ self.resolve_declare(sst, _snd, "sender", do_link)
self._attachments[snd] = _snd
- if snd.closing and not (snd.closed or _snd.closing):
+ if snd.linked and snd.closing and not (snd.closed or _snd.closing):
_snd.closing = True
def do_unlink():
del self._attachments[snd]
@@ -545,38 +522,13 @@ class Driver:
if _rcv.options is None:
_rcv.options = {}
- def do_link():
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
- sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
- rcv.linked = True
-
- def do_queue_q(result):
- if sst.detached:
- return
- if result.queue:
- _rcv._queue = _rcv.name
- do_link()
- else:
- rcv.error = ("no such queue: %s" % _rcv.name,)
- del self._attachments[rcv]
- rcv.closed = True
-
- def do_exchange_q(result):
- if sst.detached:
- return
- if result.not_found:
- if _rcv.options.get("create") in ("always", "receiver"):
- _rcv._queue = _rcv.name
- sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
- else:
- sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
- return
- else:
+ def do_link(type, subtype):
+ if type == "topic":
_rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
filter = _rcv.options.get("filter")
if _rcv.subject is None and filter is None:
- f = FILTER_DEFAULTS[result.type]
+ f = FILTER_DEFAULTS[subtype]
elif _rcv.subject and filter:
# XXX
raise Exception("can't supply both subject and filter")
@@ -587,41 +539,101 @@ class Driver:
else:
f = filter
f._bind(sst, _rcv.name, _rcv._queue)
- do_link()
- sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
+ elif type == "queue":
+ _rcv._queue = _rcv.name
+
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ rcv.linked = True
+
+ self.resolve_declare(sst, _rcv, "receiver", do_link)
self._attachments[rcv] = _rcv
- if rcv.closing and not rcv.closed:
- if rcv.linked:
- if not _rcv.canceled:
- def do_unlink():
- del self._attachments[rcv]
- rcv.closed = True
- if _rcv.options.get("delete") in ("always", "receiver"):
- sst.write_cmd(MessageCancel(rcv.destination))
- self.delete(sst, _rcv.name, do_unlink)
- else:
- sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
- _rcv.canceled = True
- else:
- rcv.closed = True
+ if rcv.linked and rcv.closing and not rcv.closed:
+ if not _rcv.canceled:
+ def do_unlink():
+ del self._attachments[rcv]
+ rcv.closed = True
+ if _rcv.options.get("delete") in ("always", "receiver"):
+ sst.write_cmd(MessageCancel(rcv.destination))
+ self.delete(sst, _rcv.name, do_unlink)
+ else:
+ sst.write_cmd(MessageCancel(rcv.destination), do_unlink)
+ _rcv.canceled = True
+
+ def resolve_declare(self, sst, lnk, dir, action):
+ def do_resolved(er, qr):
+ if er.not_found and not qr.queue:
+ if lnk.options.get("create") in ("always", dir):
+ err = self.declare(sst, lnk.name, lnk.options, action)
+ else:
+ err = ("no such queue: %s" % lnk.name,)
- def delete(self, sst, name, completion):
- def do_queue_delq(result):
- if sst.detached:
- return
- if result.queue:
- sst.write_cmd(QueueDelete(name), completion)
+ if err:
+ tgt = lnk.target
+ tgt.error = err
+ del self._attachments[tgt]
+ tgt.closed = True
+ return
+ elif qr.queue:
+ action("queue", None)
else:
- completion()
- def do_exchange_delq(result):
- if sst.detached:
- return
- if result.not_found:
- sst.write_query(QueueQuery(name), do_queue_delq)
+ action("topic", er.type)
+ self.resolve(sst, lnk.name, do_resolved)
+
+ def resolve(self, sst, name, action):
+ args = []
+ def do_result(r):
+ args.append(r)
+ def do_action(r):
+ do_result(r)
+ action(*args)
+ sst.write_query(ExchangeQuery(name), do_result)
+ sst.write_query(QueueQuery(name), do_action)
+
+ def declare(self, sst, name, options, action):
+ opts = dict(options)
+ props = dict(opts.pop("node-properties", {}))
+ durable = props.pop("durable", DURABLE_DEFAULT)
+ type = props.pop("type", "queue")
+ xprops = dict(props.pop("x-properties", {}))
+
+ if props:
+ return ("unrecognized option(s): %s" % "".join(props.keys()),)
+
+ if type == "topic":
+ cmd = ExchangeDeclare(exchange=name, durable=durable)
+ elif type == "queue":
+ cmd = QueueDeclare(queue=name, durable=durable)
+ else:
+ return ("unrecognized type, must be topic or queue: %s" % type,)
+
+ for f in cmd.FIELDS:
+ if f.name != "arguments" and xprops.has_key(f.name):
+ cmd[f.name] = xprops.pop(f.name)
+ if xprops:
+ cmd.arguments = xprops
+
+ if type == "topic":
+ if cmd.type is None:
+ cmd.type = "topic"
+ subtype = cmd.type
+ else:
+ subtype = None
+
+ def do_action():
+ action(type, subtype)
+ sst.write_cmd(cmd, do_action)
+
+ def delete(self, sst, name, action):
+ def do_delete(er, qr):
+ if not er.not_found:
+ sst.write_cmd(ExchangeDelete(name), action)
+ elif qr.queue:
+ sst.write_cmd(QueueDelete(name), action)
else:
- sst.write_cmd(ExchangeDelete(name), completion)
- sst.write_query(ExchangeQuery(name), do_exchange_delq)
+ action()
+ self.resolve(sst, name, do_delete)
def process(self, ssn):
if ssn.closed or ssn.closing: return
diff --git a/python/qpid/ops.py b/python/qpid/ops.py
index 61853522e9..a8ba826857 100644
--- a/python/qpid/ops.py
+++ b/python/qpid/ops.py
@@ -259,8 +259,8 @@ COMPOUND = {}
COMMANDS = {}
CONTROLS = {}
-for name, bases, dict in types:
- t = type(name, bases, dict)
+for name, bases, _dict in types:
+ t = type(name, bases, _dict)
vars[name] = t
if issubclass(t, Command):
diff --git a/python/qpid/tests/address.py b/python/qpid/tests/address.py
index 75ce7b8e92..065e2ca8de 100644
--- a/python/qpid/tests/address.py
+++ b/python/qpid/tests/address.py
@@ -18,10 +18,35 @@
#
from qpid.tests import Test
-from qpid.address import parse, ParseError
+from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE
class AddressTests(Test):
+ def lex(self, addr, *types):
+ toks = [t.type for t in lex(addr) if t.type not in (WSPACE, EOF)]
+ assert list(types) == toks, "expected %s, got %s" % (types, toks)
+
+ def testDashInId1(self):
+ self.lex("foo-bar", ID)
+
+ def testDashInId2(self):
+ self.lex("foo-3", ID)
+
+ def testDashAlone1(self):
+ self.lex("foo - bar", ID, SYM, ID)
+
+ def testDashAlone2(self):
+ self.lex("foo - 3", ID, SYM, NUMBER)
+
+ def testLeadingDash(self):
+ self.lex("-foo", SYM, ID)
+
+ def testTrailingDash(self):
+ self.lex("foo-", ID, SYM)
+
+ def testNegativeNum(self):
+ self.lex("-3", NUMBER)
+
def valid(self, addr, name=None, subject=None, options=None):
expected = (name, subject, options)
got = parse(addr)
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index 474c9ab630..34ff57babe 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -201,7 +201,7 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}')
+ rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
@@ -212,6 +212,7 @@ class SessionTests(Base):
msg = rcv.fetch(0)
assert msg.content == content
self.ssn.acknowledge(msg)
+ snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
def testNextReceiver(self):
ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
@@ -551,6 +552,44 @@ class AddressTests(Base):
def setup_session(self):
return self.conn.session()
+ def testBadOption(self):
+ snd = self.ssn.sender("test-bad-option; {create: always, node-properties: {this-property-does-not-exist: 3}}")
+ try:
+ snd.send("ping")
+ except SendError, e:
+ assert "unrecognized option" in str(e)
+
+ def testCreateQueue(self):
+ snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
+ "node-properties: {type: queue, durable: False, "
+ "x-properties: {auto_delete: true}}}")
+ content = self.content("testCreateQueue")
+ snd.send(content)
+ rcv = self.ssn.receiver("test-create-queue")
+ self.drain(rcv, expected=[content])
+
+ def testCreateExchange(self):
+ snd = self.ssn.sender("test-create-exchange; {create: always, "
+ "delete: always, node-properties: {type: topic, "
+ "durable: False, x-properties: {auto_delete: true}}}")
+ snd.send("ping")
+ rcv1 = self.ssn.receiver("test-create-exchange/first")
+ rcv2 = self.ssn.receiver("test-create-exchange/second")
+ rcv3 = self.ssn.receiver("test-create-exchange")
+ for r in (rcv1, rcv2, rcv3):
+ try:
+ r.fetch(0)
+ assert False
+ except Empty:
+ pass
+ msg1 = Message(self.content("testCreateExchange", 1), subject="first")
+ msg2 = Message(self.content("testCreateExchange", 1), subject="second")
+ snd.send(msg1)
+ snd.send(msg2)
+ self.drain(rcv1, expected=[msg1.content])
+ self.drain(rcv2, expected=[msg2.content])
+ self.drain(rcv3, expected=[msg1.content, msg2.content])
+
def testDeleteBySender(self):
snd = self.ssn.sender("test-delete; {create: always}")
snd.send("ping")