diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-11-16 12:05:50 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-16 12:05:50 +0000 |
| commit | b08f16897e3f5c65c155d6d63c380003510430c9 (patch) | |
| tree | 56434bda6a4135e0ae209a19424916dd21779232 /python | |
| parent | efc6473096622a01b2a3907093431b49d8ebfb1e (diff) | |
| download | qpid-python-b08f16897e3f5c65c155d6d63c380003510430c9.tar.gz | |
added address support for specifying node type and properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/address.py | 2 | ||||
| -rw-r--r-- | python/qpid/driver.py | 190 | ||||
| -rw-r--r-- | python/qpid/ops.py | 4 | ||||
| -rw-r--r-- | python/qpid/tests/address.py | 27 | ||||
| -rw-r--r-- | python/qpid/tests/messaging.py | 41 |
5 files changed, 170 insertions, 94 deletions
diff --git a/python/qpid/address.py b/python/qpid/address.py index 93336763f4..909a9e42e1 100644 --- a/python/qpid/address.py +++ b/python/qpid/address.py @@ -38,7 +38,7 @@ SEMI = Type("SEMI", r";") SLASH = Type("SLASH", r"/") COMMA = Type("COMMA", r",") NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') -ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*') +ID = Type("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?') STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") ESC = Type("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]") SYM = Type("SYM", r"[.#*%@$^!+-]") diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 3e45045bec..21d2b432f4 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -471,43 +471,20 @@ class Driver: if _snd.options is None: _snd.options = {} - def do_link(): - snd.linked = True - - def do_queue_q(result): - if sst.detached: - return - - if result.queue: + def do_link(type, subtype): + if type == "topic": + _snd._exchange = _snd.name + _snd._routing_key = _snd.subject + elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - do_link() - else: - snd.error = ("no such queue: %s" % _snd.name,) - del self._attachments[snd] - snd.closed = True - - def do_exchange_q(result): - if sst.detached: - return - if result.not_found: - if _snd.options.get("create") in ("always", "sender"): - sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT)) - _snd._exchange = "" - _snd._routing_key = _snd.name - else: - sst.write_query(QueueQuery(queue=_snd.name), do_queue_q) - return - else: - _snd._exchange = _snd.name - _snd._routing_key = _snd.subject - do_link() + snd.linked = True - sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q) + self.resolve_declare(sst, _snd, "sender", do_link) self._attachments[snd] = _snd - if snd.closing and not (snd.closed or _snd.closing): + if snd.linked and snd.closing and not (snd.closed or _snd.closing): _snd.closing = True def do_unlink(): del self._attachments[snd] @@ -545,38 +522,13 @@ class Driver: if _rcv.options is None: _rcv.options = {} - def do_link(): - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) - sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) - rcv.linked = True - - def do_queue_q(result): - if sst.detached: - return - if result.queue: - _rcv._queue = _rcv.name - do_link() - else: - rcv.error = ("no such queue: %s" % _rcv.name,) - del self._attachments[rcv] - rcv.closed = True - - def do_exchange_q(result): - if sst.detached: - return - if result.not_found: - if _rcv.options.get("create") in ("always", "receiver"): - _rcv._queue = _rcv.name - sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) - else: - sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q) - return - else: + def do_link(type, subtype): + if type == "topic": _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) filter = _rcv.options.get("filter") if _rcv.subject is None and filter is None: - f = FILTER_DEFAULTS[result.type] + f = FILTER_DEFAULTS[subtype] elif _rcv.subject and filter: # XXX raise Exception("can't supply both subject and filter") @@ -587,41 +539,101 @@ class Driver: else: f = filter f._bind(sst, _rcv.name, _rcv._queue) - do_link() - sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q) + elif type == "queue": + _rcv._queue = _rcv.name + + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) + rcv.linked = True + + self.resolve_declare(sst, _rcv, "receiver", do_link) self._attachments[rcv] = _rcv - if rcv.closing and not rcv.closed: - if rcv.linked: - if not _rcv.canceled: - def do_unlink(): - del self._attachments[rcv] - rcv.closed = True - if _rcv.options.get("delete") in ("always", "receiver"): - sst.write_cmd(MessageCancel(rcv.destination)) - self.delete(sst, _rcv.name, do_unlink) - else: - sst.write_cmd(MessageCancel(rcv.destination), do_unlink) - _rcv.canceled = True - else: - rcv.closed = True + if rcv.linked and rcv.closing and not rcv.closed: + if not _rcv.canceled: + def do_unlink(): + del self._attachments[rcv] + rcv.closed = True + if _rcv.options.get("delete") in ("always", "receiver"): + sst.write_cmd(MessageCancel(rcv.destination)) + self.delete(sst, _rcv.name, do_unlink) + else: + sst.write_cmd(MessageCancel(rcv.destination), do_unlink) + _rcv.canceled = True + + def resolve_declare(self, sst, lnk, dir, action): + def do_resolved(er, qr): + if er.not_found and not qr.queue: + if lnk.options.get("create") in ("always", dir): + err = self.declare(sst, lnk.name, lnk.options, action) + else: + err = ("no such queue: %s" % lnk.name,) - def delete(self, sst, name, completion): - def do_queue_delq(result): - if sst.detached: - return - if result.queue: - sst.write_cmd(QueueDelete(name), completion) + if err: + tgt = lnk.target + tgt.error = err + del self._attachments[tgt] + tgt.closed = True + return + elif qr.queue: + action("queue", None) else: - completion() - def do_exchange_delq(result): - if sst.detached: - return - if result.not_found: - sst.write_query(QueueQuery(name), do_queue_delq) + action("topic", er.type) + self.resolve(sst, lnk.name, do_resolved) + + def resolve(self, sst, name, action): + args = [] + def do_result(r): + args.append(r) + def do_action(r): + do_result(r) + action(*args) + sst.write_query(ExchangeQuery(name), do_result) + sst.write_query(QueueQuery(name), do_action) + + def declare(self, sst, name, options, action): + opts = dict(options) + props = dict(opts.pop("node-properties", {})) + durable = props.pop("durable", DURABLE_DEFAULT) + type = props.pop("type", "queue") + xprops = dict(props.pop("x-properties", {})) + + if props: + return ("unrecognized option(s): %s" % "".join(props.keys()),) + + if type == "topic": + cmd = ExchangeDeclare(exchange=name, durable=durable) + elif type == "queue": + cmd = QueueDeclare(queue=name, durable=durable) + else: + return ("unrecognized type, must be topic or queue: %s" % type,) + + for f in cmd.FIELDS: + if f.name != "arguments" and xprops.has_key(f.name): + cmd[f.name] = xprops.pop(f.name) + if xprops: + cmd.arguments = xprops + + if type == "topic": + if cmd.type is None: + cmd.type = "topic" + subtype = cmd.type + else: + subtype = None + + def do_action(): + action(type, subtype) + sst.write_cmd(cmd, do_action) + + def delete(self, sst, name, action): + def do_delete(er, qr): + if not er.not_found: + sst.write_cmd(ExchangeDelete(name), action) + elif qr.queue: + sst.write_cmd(QueueDelete(name), action) else: - sst.write_cmd(ExchangeDelete(name), completion) - sst.write_query(ExchangeQuery(name), do_exchange_delq) + action() + self.resolve(sst, name, do_delete) def process(self, ssn): if ssn.closed or ssn.closing: return diff --git a/python/qpid/ops.py b/python/qpid/ops.py index 61853522e9..a8ba826857 100644 --- a/python/qpid/ops.py +++ b/python/qpid/ops.py @@ -259,8 +259,8 @@ COMPOUND = {} COMMANDS = {} CONTROLS = {} -for name, bases, dict in types: - t = type(name, bases, dict) +for name, bases, _dict in types: + t = type(name, bases, _dict) vars[name] = t if issubclass(t, Command): diff --git a/python/qpid/tests/address.py b/python/qpid/tests/address.py index 75ce7b8e92..065e2ca8de 100644 --- a/python/qpid/tests/address.py +++ b/python/qpid/tests/address.py @@ -18,10 +18,35 @@ # from qpid.tests import Test -from qpid.address import parse, ParseError +from qpid.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE class AddressTests(Test): + def lex(self, addr, *types): + toks = [t.type for t in lex(addr) if t.type not in (WSPACE, EOF)] + assert list(types) == toks, "expected %s, got %s" % (types, toks) + + def testDashInId1(self): + self.lex("foo-bar", ID) + + def testDashInId2(self): + self.lex("foo-3", ID) + + def testDashAlone1(self): + self.lex("foo - bar", ID, SYM, ID) + + def testDashAlone2(self): + self.lex("foo - 3", ID, SYM, NUMBER) + + def testLeadingDash(self): + self.lex("-foo", SYM, ID) + + def testTrailingDash(self): + self.lex("foo-", ID, SYM) + + def testNegativeNum(self): + self.lex("-3", NUMBER) + def valid(self, addr, name=None, subject=None, options=None): expected = (name, subject, options) got = parse(addr) diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 474c9ab630..34ff57babe 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -201,7 +201,7 @@ class SessionTests(Base): self.ssn.acknowledge(msg) def testReceiver(self): - rcv = self.ssn.receiver('test-rcv-queue; {create: always, delete: always}') + rcv = self.ssn.receiver('test-rcv-queue; {create: always}') rcv2 = self.ssn.receiver(rcv.source) assert rcv is not rcv2 rcv2.close() @@ -212,6 +212,7 @@ class SessionTests(Base): msg = rcv.fetch(0) assert msg.content == content self.ssn.acknowledge(msg) + snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') def testNextReceiver(self): ADDR = 'test-next-rcv-queue; {create: always, delete: always}' @@ -551,6 +552,44 @@ class AddressTests(Base): def setup_session(self): return self.conn.session() + def testBadOption(self): + snd = self.ssn.sender("test-bad-option; {create: always, node-properties: {this-property-does-not-exist: 3}}") + try: + snd.send("ping") + except SendError, e: + assert "unrecognized option" in str(e) + + def testCreateQueue(self): + snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " + "node-properties: {type: queue, durable: False, " + "x-properties: {auto_delete: true}}}") + content = self.content("testCreateQueue") + snd.send(content) + rcv = self.ssn.receiver("test-create-queue") + self.drain(rcv, expected=[content]) + + def testCreateExchange(self): + snd = self.ssn.sender("test-create-exchange; {create: always, " + "delete: always, node-properties: {type: topic, " + "durable: False, x-properties: {auto_delete: true}}}") + snd.send("ping") + rcv1 = self.ssn.receiver("test-create-exchange/first") + rcv2 = self.ssn.receiver("test-create-exchange/second") + rcv3 = self.ssn.receiver("test-create-exchange") + for r in (rcv1, rcv2, rcv3): + try: + r.fetch(0) + assert False + except Empty: + pass + msg1 = Message(self.content("testCreateExchange", 1), subject="first") + msg2 = Message(self.content("testCreateExchange", 1), subject="second") + snd.send(msg1) + snd.send(msg2) + self.drain(rcv1, expected=[msg1.content]) + self.drain(rcv2, expected=[msg2.content]) + self.drain(rcv3, expected=[msg1.content, msg2.content]) + def testDeleteBySender(self): snd = self.ssn.sender("test-delete; {create: always}") snd.send("ping") |
