diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /python | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/README.txt | 24 | ||||
-rwxr-xr-x | python/amqp-doc | 75 | ||||
-rw-r--r-- | python/cpp_failing.txt | 0 | ||||
-rw-r--r-- | python/doc/test-requirements.txt | 10 | ||||
-rw-r--r-- | python/java_failing.txt | 13 | ||||
-rwxr-xr-x | python/pal2py | 255 | ||||
-rw-r--r-- | python/qpid/__init__.py | 17 | ||||
-rw-r--r-- | python/qpid/client.py | 111 | ||||
-rw-r--r-- | python/qpid/codec.py | 221 | ||||
-rw-r--r-- | python/qpid/connection.py | 265 | ||||
-rw-r--r-- | python/qpid/content.py | 47 | ||||
-rw-r--r-- | python/qpid/delegate.py | 52 | ||||
-rw-r--r-- | python/qpid/message.py | 81 | ||||
-rw-r--r-- | python/qpid/peer.py | 209 | ||||
-rw-r--r-- | python/qpid/queue.py | 42 | ||||
-rw-r--r-- | python/qpid/spec.py | 349 | ||||
-rw-r--r-- | python/qpid/testlib.py | 221 | ||||
-rw-r--r-- | python/qpid/xmlutil.py | 116 | ||||
-rwxr-xr-x | python/rule2test | 89 | ||||
-rwxr-xr-x | python/run-tests | 24 | ||||
-rw-r--r-- | python/tests/__init__.py | 1 | ||||
-rw-r--r-- | python/tests/basic.py | 115 | ||||
-rw-r--r-- | python/tests/broker.py | 84 | ||||
-rw-r--r-- | python/tests/example.py | 91 | ||||
-rw-r--r-- | python/tests/exchange.py | 234 | ||||
-rw-r--r-- | python/tests/queue.py | 254 |
26 files changed, 3000 insertions, 0 deletions
diff --git a/python/README.txt b/python/README.txt new file mode 100644 index 0000000000..0a64f0e2f2 --- /dev/null +++ b/python/README.txt @@ -0,0 +1,24 @@ += RUNNING THE PYTHON TESTS = + +The tests/ directory contains a collection of python unit tests to +exercise functions of a broker. + +Simplest way to run the tests: + + * Run a broker on the default port + + * ./run_tests + +For additional options: ./run_tests --help + + +== Expected failures == + +Until we complete functionality, tests may fail because the tested +functionality is missing in the broker. To skip expected failures +in the C++ or Java brokers: + + ./run_tests -I cpp_failing.txt + ./run_tests -I java_failing.txt + +If you fix a failure, please remove it from the corresponding list. diff --git a/python/amqp-doc b/python/amqp-doc new file mode 100755 index 0000000000..a5b785fd73 --- /dev/null +++ b/python/amqp-doc @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys, re +from qpid.spec import load, pythonize +from getopt import gnu_getopt as getopt, GetoptError +from fnmatch import fnmatchcase as fnmatch + +def die(msg): + print >> sys.stderr, msg + sys.exit(1) + +def usage(msg = ""): + return ("""%s + +Usage %s [<options>] [<pattern_1> ... <pattern_n>] + +Options: + -e, --regexp use regex instead of glob when matching + -s, --spec <url> location of amqp.xml +""" % (msg, sys.argv[0])).strip() + +try: + opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="]) +except GetoptError, e: + die(str(e)) + +regexp = False +spec = "../specs/amqp-8.0.xml" +for k, v in opts: + if k == "-e" or k == "--regexp": regexp = True + if k == "-s" or k == "--spec": spec = v + +if regexp: + def match(pattern, value): + try: + return re.match(pattern, value) + except Exception, e: + die("error: '%s': %s" % (pattern, e)) +else: + def match(pattern, value): + return fnmatch(value, pattern) + +spec = load(spec) +methods = {} +patterns = args +for pattern in patterns: + for c in spec.classes: + for m in c.methods: + name = pythonize("%s_%s" % (c.name, m.name)) + if match(pattern, name): + methods[name] = m.define_method(name) + +if patterns: + if methods: + AMQP = type("AMQP[%s]" % ", ".join(patterns), (), methods) + else: + die("no matches") +else: + AMQP = spec.define_class("AMQP") + +help(AMQP) diff --git a/python/cpp_failing.txt b/python/cpp_failing.txt new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/python/cpp_failing.txt diff --git a/python/doc/test-requirements.txt b/python/doc/test-requirements.txt new file mode 100644 index 0000000000..a1ba414eb2 --- /dev/null +++ b/python/doc/test-requirements.txt @@ -0,0 +1,10 @@ + * start and stop server, possibly in different configurations, should + at least be able to specify host and port + + * initiate multiple connections/server + + * initiate multiple channels/connection + + * enable positive and negative tests for any protocol interaction + + * test harness must be as robust as possible to spec changes diff --git a/python/java_failing.txt b/python/java_failing.txt new file mode 100644 index 0000000000..2e61363817 --- /dev/null +++ b/python/java_failing.txt @@ -0,0 +1,13 @@ +tests.basic.BasicTests.test_consume_exclusive +tests.basic.BasicTests.test_consume_no_local +tests.basic.BasicTests.test_consume_queue_errors +tests.basic.BasicTests.test_consume_unique_consumers +tests.exchange.RecommendedTypesRuleTests.testFanout +tests.exchange.RequiredInstancesRuleTests.testAmqFanOut +tests.queue.QueueTests.test_declare_exclusive +tests.queue.QueueTests.test_declare_passive +tests.queue.QueueTests.test_delete_ifempty +tests.queue.QueueTests.test_delete_ifunused +tests.queue.QueueTests.test_delete_simple +tests.queue.QueueTests.test_purge +tests.queue.QueueTests.test_bind diff --git a/python/pal2py b/python/pal2py new file mode 100755 index 0000000000..48fa0111de --- /dev/null +++ b/python/pal2py @@ -0,0 +1,255 @@ +#!/usr/bin/env python +import sys, os, xml + +from qpid.spec import load, pythonize +from textwrap import TextWrapper +from xml.sax.handler import ContentHandler + +class Block: + + def __init__(self, children): + self.children = children + + def emit(self, out): + for child in self.children: + if not hasattr(child, "emit"): + raise ValueError(child) + child.emit(out) + + if not self.children: + out.line("pass") + +class If: + + def __init__(self, expr, cons, alt = None): + self.expr = expr + self.cons = cons + self.alt = alt + + def emit(self, out): + out.line("if ") + self.expr.emit(out) + out.write(":") + out.level += 1 + self.cons.emit(out) + out.level -= 1 + if self.alt: + out.line("else:") + out.level += 1 + self.alt.emit(out) + out.level -= 1 + +class Stmt: + + def __init__(self, code): + self.code = code + + def emit(self, out): + out.line(self.code) + +class Expr: + + def __init__(self, code): + self.code = code + + def emit(self, out): + out.write(self.code) + +class Abort: + + def __init__(self, expr): + self.expr = expr + + def emit(self, out): + out.line("assert False, ") + self.expr.emit(out) + +WRAPPER = TextWrapper() + +def wrap(text): + return WRAPPER.wrap(" ".join(text.split())) + +class Doc: + + def __init__(self, text): + self.text = text + + def emit(self, out): + out.line('"""') + for line in wrap(self.text): + out.line(line) + out.line('"""') + +class Frame: + + def __init__(self, attrs): + self.attrs = attrs + self.children = [] + self.text = None + + def __getattr__(self, attr): + return self.attrs[attr] + +def isunicode(s): + if isinstance(s, str): + return False + for ch in s: + if ord(ch) > 127: + return True + return False + +def string_literal(s): + if s == None: + return None + if isunicode(s): + return "%r" % s + else: + return "%r" % str(s) + +TRUTH = { + "1": True, + "0": False, + "true": True, + "false": False + } + +LITERAL = { + "shortstr": string_literal, + "longstr": string_literal, + "bit": lambda s: TRUTH[s.lower()], + "longlong": lambda s: "%r" % long(s) + } + +def literal(s, field): + return LITERAL[field.type](s) + +def palexpr(s, field): + if s.startswith("$"): + return "msg.%s" % s[1:] + else: + return literal(s, field) + +class Translator(ContentHandler): + + def __init__(self, spec): + self.spec = spec + self.stack = [] + self.content = None + self.root = Frame(None) + self.push(self.root) + + def emit(self, out): + blk = Block(self.root.children) + blk.emit(out) + out.write("\n") + + def peek(self): + return self.stack[-1] + + def pop(self): + return self.stack.pop() + + def push(self, frame): + self.stack.append(frame) + + def startElement(self, name, attrs): + self.push(Frame(attrs)) + + def endElement(self, name): + frame = self.pop() + if hasattr(self, name): + child = getattr(self, name)(frame) + else: + child = self.handle(name, frame) + + if child: + self.peek().children.append(child) + + def characters(self, text): + frame = self.peek() + if frame.text: + frame.text += text + else: + frame.text = text + + def handle(self, name, frame): + for klass in self.spec.classes: + pyklass = pythonize(klass.name) + if name.startswith(pyklass): + name = name[len(pyklass) + 1:] + break + else: + raise ValueError("unknown class: %s" % name) + + for method in klass.methods: + pymethod = pythonize(method.name) + if name == pymethod: + break + else: + raise ValueError("unknown method: %s" % name) + + args = ["%s = %s" % (key, palexpr(val, method.fields.bypyname[key])) + for key, val in frame.attrs.items()] + if method.content and self.content: + args.append("content = %r" % string_literal(self.content)) + code = "ssn.%s_%s(%s)" % (pyklass, pymethod, ", ".join(args)) + if pymethod == "consume": + code = "consumer_tag = %s.consumer_tag" % code + return Stmt(code) + + def pal(self, frame): + return Block([Doc(frame.text)] + frame.children) + + def include(self, frame): + base, ext = os.path.splitext(frame.filename) + return Stmt("from %s import *" % base) + + def session(self, frame): + return Block([Stmt("cli = open()"), Stmt("ssn = cli.channel(0)"), + Stmt("ssn.channel_open()")] + frame.children) + + def empty(self, frame): + return If(Expr("msg == None"), Block(frame.children)) + + def abort(self, frame): + return Abort(Expr(string_literal(frame.text))) + + def wait(self, frame): + return Stmt("msg = ssn.queue(consumer_tag).get(timeout=%r)" % + (int(frame.timeout)/1000)) + + def basic_arrived(self, frame): + if frame.children: + return If(Expr("msg != None"), Block(frame.children)) + + def basic_content(self, frame): + self.content = frame.text + +class Emitter: + + def __init__(self, out): + self.out = out + self.level = 0 + + def write(self, code): + self.out.write(code) + + def line(self, code): + self.write("\n%s%s" % (" "*self.level, code)) + + def flush(self): + self.out.flush() + + def close(self): + self.out.close() + + +for f in sys.argv[2:]: + base, ext = os.path.splitext(f) + spec = load(sys.argv[1]) + t = Translator(spec) + xml.sax.parse(f, t) +# out = Emitter(open("%s.py" % base)) + out = Emitter(sys.stdout) + t.emit(out) + out.close() diff --git a/python/qpid/__init__.py b/python/qpid/__init__.py new file mode 100644 index 0000000000..3f69e88e24 --- /dev/null +++ b/python/qpid/__init__.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import spec, codec, connection, content, peer, delegate, client diff --git a/python/qpid/client.py b/python/qpid/client.py new file mode 100644 index 0000000000..cef10622ac --- /dev/null +++ b/python/qpid/client.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +An AQMP client implementation that uses a custom delegate for +interacting with the server. +""" + +import threading +from peer import Peer, Closed +from delegate import Delegate +from connection import Connection, Frame +from spec import load +from queue import Queue + + +class Client: + + def __init__(self, host, port, spec, vhost = None): + self.host = host + self.port = port + self.spec = spec + + self.mechanism = None + self.response = None + self.locale = None + + self.vhost = vhost + if self.vhost == None: + self.vhost = self.host + + self.queues = {} + self.lock = threading.Lock() + + self.closed = False + self.started = threading.Event() + + self.conn = Connection(self.host, self.port, self.spec) + self.peer = Peer(self.conn, ClientDelegate(self)) + + def wait(self): + self.started.wait() + if self.closed: + raise EOFError() + + def queue(self, key): + self.lock.acquire() + try: + try: + q = self.queues[key] + except KeyError: + q = Queue(0) + self.queues[key] = q + finally: + self.lock.release() + return q + + def start(self, response, mechanism="AMQPLAIN", locale="en_US"): + self.mechanism = mechanism + self.response = response + self.locale = locale + + self.conn.connect() + self.conn.init() + self.peer.start() + self.wait() + self.channel(0).connection_open(self.vhost) + + def channel(self, id): + return self.peer.channel(id) + +class ClientDelegate(Delegate): + + def __init__(self, client): + Delegate.__init__(self) + self.client = client + + def connection_start(self, ch, msg): + ch.connection_start_ok(mechanism=self.client.mechanism, + response=self.client.response, + locale=self.client.locale) + + def connection_tune(self, ch, msg): + ch.connection_tune_ok(*msg.fields) + self.client.started.set() + + def basic_deliver(self, ch, msg): + self.client.queue(msg.consumer_tag).put(msg) + + def channel_close(self, ch, msg): + ch.close(msg) + + def connection_close(self, ch, msg): + self.client.peer.close(msg) + + def close(self, reason): + self.client.closed = True + self.client.started.set() diff --git a/python/qpid/codec.py b/python/qpid/codec.py new file mode 100644 index 0000000000..c4bbe91f32 --- /dev/null +++ b/python/qpid/codec.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Utility code to translate between python objects and AMQP encoded data +fields. +""" + +from cStringIO import StringIO +from struct import * + +class EOF(Exception): + pass + +class Codec: + + def __init__(self, stream): + self.stream = stream + self.nwrote = 0 + self.nread = 0 + self.incoming_bits = [] + self.outgoing_bits = [] + + def read(self, n): + data = self.stream.read(n) + if n > 0 and len(data) == 0: + raise EOF() + self.nread += len(data) + return data + + def write(self, s): + self.flushbits() + self.stream.write(s) + self.nwrote += len(s) + + def flush(self): + self.flushbits() + self.stream.flush() + + def flushbits(self): + if len(self.outgoing_bits) > 0: + bytes = [] + index = 0 + for b in self.outgoing_bits: + if index == 0: bytes.append(0) + if b: bytes[-1] |= 1 << index + index = (index + 1) % 8 + del self.outgoing_bits[:] + for byte in bytes: + self.encode_octet(byte) + + def pack(self, fmt, *args): + self.write(pack(fmt, *args)) + + def unpack(self, fmt): + size = calcsize(fmt) + data = self.read(size) + values = unpack(fmt, data) + if len(values) == 1: + return values[0] + else: + return values + + def encode(self, type, value): + getattr(self, "encode_" + type)(value) + + def decode(self, type): + return getattr(self, "decode_" + type)() + + # bit + def encode_bit(self, o): + if o: + self.outgoing_bits.append(True) + else: + self.outgoing_bits.append(False) + + def decode_bit(self): + if len(self.incoming_bits) == 0: + bits = self.decode_octet() + for i in range(8): + self.incoming_bits.append(bits >> i & 1 != 0) + return self.incoming_bits.pop(0) + + # octet + def encode_octet(self, o): + self.pack("!B", o) + + def decode_octet(self): + return self.unpack("!B") + + # short + def encode_short(self, o): + self.pack("!H", o) + + def decode_short(self): + return self.unpack("!H") + + # long + def encode_long(self, o): + self.pack("!L", o) + + def decode_long(self): + return self.unpack("!L") + + # longlong + def encode_longlong(self, o): + self.pack("!Q", o) + + def decode_longlong(self): + return self.unpack("!Q") + + def enc_str(self, fmt, s): + size = len(s) + self.pack(fmt, size) + self.write(s) + + def dec_str(self, fmt): + size = self.unpack(fmt) + return self.read(size) + + # shortstr + def encode_shortstr(self, s): + self.enc_str("!B", s) + + def decode_shortstr(self): + return self.dec_str("!B") + + # longstr + def encode_longstr(self, s): + if isinstance(s, dict): + self.encode_table(s) + else: + self.enc_str("!L", s) + + def decode_longstr(self): + return self.dec_str("!L") + + # table + def encode_table(self, tbl): + enc = StringIO() + codec = Codec(enc) + for key, value in tbl.items(): + codec.encode_shortstr(key) + if isinstance(value, basestring): + codec.write("S") + codec.encode_longstr(value) + else: + codec.write("I") + codec.encode_long(value) + s = enc.getvalue() + self.encode_long(len(s)) + self.write(s) + + def decode_table(self): + size = self.decode_long() + start = self.nread + result = {} + while self.nread - start < size: + key = self.decode_shortstr() + type = self.read(1) + if type == "S": + value = self.decode_longstr() + elif type == "I": + value = self.decode_long() + else: + raise ValueError(repr(type)) + result[key] = value + return result + +def test(type, value): + if isinstance(value, (list, tuple)): + values = value + else: + values = [value] + stream = StringIO() + codec = Codec(stream) + for v in values: + codec.encode(type, v) + codec.flush() + enc = stream.getvalue() + stream.reset() + dup = [] + for i in xrange(len(values)): + dup.append(codec.decode(type)) + if values != dup: + raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) + +if __name__ == "__main__": + def dotest(type, value): + args = (type, value) + test(*args) + + for value in ("1", "0", "110", "011", "11001", "10101", "10011"): + for i in range(10): + dotest("bit", map(lambda x: x == "1", value*i)) + + for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): + dotest("table", value) + + for type in ("octet", "short", "long", "longlong"): + for value in range(0, 256): + dotest(type, value) + + for type in ("shortstr", "longstr"): + for value in ("", "a", "asdf"): + dotest(type, value) diff --git a/python/qpid/connection.py b/python/qpid/connection.py new file mode 100644 index 0000000000..f4d0817e60 --- /dev/null +++ b/python/qpid/connection.py @@ -0,0 +1,265 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A Connection class containing socket code that uses the spec metadata +to read and write Frame objects. This could be used by a client, +server, or even a proxy implementation. +""" + +import socket, codec +from cStringIO import StringIO +from spec import load, pythonize +from codec import EOF + +class SockIO: + + def __init__(self, sock): + self.sock = sock + + def write(self, buf): +# print "OUT: %r" % buf + self.sock.sendall(buf) + + def read(self, n): + data = "" + while len(data) < n: + try: + s = self.sock.recv(n - len(data)) + except socket.error: + break + if len(s) == 0: + break +# print "IN: %r" % s + data += s + return data + + def flush(self): + pass + +class Connection: + + def __init__(self, host, port, spec): + self.host = host + self.port = port + self.spec = spec + self.FRAME_END = self.spec.constants.byname["frame end"].id + + def connect(self): + sock = socket.socket() + sock.connect((self.host, self.port)) + sock.setblocking(1) + self.codec = codec.Codec(SockIO(sock)) + + def flush(self): + self.codec.flush() + + INIT="!4s4B" + + def init(self): + self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, + self.spec.minor) + + def write(self, frame): + c = self.codec + c.encode_octet(self.spec.constants.byname[frame.payload.type].id) + c.encode_short(frame.channel) + frame.payload.encode(c) + c.encode_octet(self.FRAME_END) + + def read(self): + c = self.codec + type = self.spec.constants.byid[c.decode_octet()].name + channel = c.decode_short() + payload = Frame.DECODERS[type].decode(self.spec, c) + end = c.decode_octet() + if end != self.FRAME_END: + raise "frame error: expected %r, got %r" % (self.FRAME_END, end) + frame = Frame(channel, payload) + return frame + +class Frame: + + METHOD = "frame method" + HEADER = "frame header" + BODY = "frame body" + OOB_METHOD = "frame oob method" + OOB_HEADER = "frame oob header" + OOB_BODY = "frame oob body" + TRACE = "frame trace" + HEARTBEAT = "frame heartbeat" + + DECODERS = {} + + def __init__(self, channel, payload): + self.channel = channel + self.payload = payload + + def __str__(self): + return "[%d] %s" % (self.channel, self.payload) + +class Payload: + + class __metaclass__(type): + + def __new__(cls, name, bases, dict): + for req in ("encode", "decode", "type"): + if not dict.has_key(req): + raise TypeError("%s must define %s" % (name, req)) + dict["decode"] = staticmethod(dict["decode"]) + t = type.__new__(cls, name, bases, dict) + if t.type != None: + Frame.DECODERS[t.type] = t + return t + + type = None + + def encode(self, enc): abstract + + def decode(spec, dec): abstract + +class Method(Payload): + + type = Frame.METHOD + + def __init__(self, method, *args): + if len(args) != len(method.fields): + argspec = ["%s: %s" % (pythonize(f.name), f.type) + for f in method.fields] + raise TypeError("%s.%s expecting (%s), got %s" % + (pythonize(method.klass.name), + pythonize(method.name), ", ".join(argspec), args)) + self.method = method + self.args = args + + def encode(self, enc): + buf = StringIO() + c = codec.Codec(buf) + c.encode_short(self.method.klass.id) + c.encode_short(self.method.id) + for field, arg in zip(self.method.fields, self.args): + c.encode(field.type, arg) + c.flush() + enc.encode_longstr(buf.getvalue()) + + def decode(spec, dec): + enc = dec.decode_longstr() + c = codec.Codec(StringIO(enc)) + klass = spec.classes.byid[c.decode_short()] + meth = klass.methods.byid[c.decode_short()] + args = tuple([c.decode(f.type) for f in meth.fields]) + return Method(meth, *args) + + def __str__(self): + return "%s %s" % (self.method, ", ".join([str(a) for a in self.args])) + +class Header(Payload): + + type = Frame.HEADER + + def __init__(self, klass, weight, size, **properties): + self.klass = klass + self.weight = weight + self.size = size + self.properties = properties + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] + + def encode(self, enc): + buf = StringIO() + c = codec.Codec(buf) + c.encode_short(self.klass.id) + c.encode_short(self.weight) + c.encode_longlong(self.size) + + # property flags + nprops = len(self.klass.fields) + flags = 0 + for i in range(nprops): + f = self.klass.fields.items[i] + flags <<= 1 + if self.properties.get(f.name) != None: + flags |= 1 + # the last bit indicates more flags + if i > 0 and (i % 15) == 0: + flags <<= 1 + if nprops > (i + 1): + flags |= 1 + c.encode_short(flags) + flags = 0 + flags <<= ((16 - (nprops % 15)) % 16) + c.encode_short(flags) + + # properties + for f in self.klass.fields: + v = self.properties.get(f.name) + if v != None: + c.encode(f.type, v) + c.flush() + enc.encode_longstr(buf.getvalue()) + + def decode(spec, dec): + c = codec.Codec(StringIO(dec.decode_longstr())) + klass = spec.classes.byid[c.decode_short()] + weight = c.decode_short() + size = c.decode_longlong() + + # property flags + bits = [] + while True: + flags = c.decode_short() + for i in range(15, 0, -1): + if flags >> i & 0x1 != 0: + bits.append(True) + else: + bits.append(False) + if flags & 0x1 == 0: + break + + # properties + properties = {} + for b, f in zip(bits, klass.fields): + if b: + properties[f.name] = c.decode(f.type) + + return Header(klass, weight, size, **properties) + + def __str__(self): + return "%s %s %s %s" % (self.klass, self.weight, self.size, + self.properties) + +class Body(Payload): + + type = Frame.BODY + + def __init__(self, content): + self.content = content + + def encode(self, enc): + enc.encode_longstr(self.content) + + def decode(spec, dec): + return Body(dec.decode_longstr()) + + def __str__(self): + return "Body(%r)" % self.content diff --git a/python/qpid/content.py b/python/qpid/content.py new file mode 100644 index 0000000000..33c9ec35f4 --- /dev/null +++ b/python/qpid/content.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A simple python representation for AMQP content. +""" + +def default(val, defval): + if val == None: + return defval + else: + return val + +class Content: + + def __init__(self, body = "", children = None, properties = None): + self.body = body + self.children = default(children, []) + self.properties = default(properties, {}) + + def size(self): + return len(self.body) + + def weight(self): + return len(self.children) + + def __getitem__(self, name): + return self.properties[name] + + def __setitem__(self, name, value): + self.properties[name] = value + + def __delitem__(self, name): + del self.properties[name] diff --git a/python/qpid/delegate.py b/python/qpid/delegate.py new file mode 100644 index 0000000000..0467162498 --- /dev/null +++ b/python/qpid/delegate.py @@ -0,0 +1,52 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Delegate implementation intended for use with the peer module. +""" + +import threading, inspect +from spec import pythonize + +class Delegate: + + def __init__(self): + self.handlers = {} + self.invokers = {} + # initialize all the mixins + self.invoke_all("init") + + def invoke_all(self, meth, *args, **kwargs): + for cls in inspect.getmro(self.__class__): + if hasattr(cls, meth): + getattr(cls, meth)(self, *args, **kwargs) + + def dispatch(self, channel, message): + method = message.method + spec = method.klass.spec + + try: + handler = self.handlers[method] + except KeyError: + name = "%s_%s" % (pythonize(method.klass.name), + pythonize(method.name)) + handler = getattr(self, name) + self.handlers[method] = handler + + return handler(channel, message) + + def close(self, reason): + self.invoke_all("close", reason) diff --git a/python/qpid/message.py b/python/qpid/message.py new file mode 100644 index 0000000000..08b3e70c0b --- /dev/null +++ b/python/qpid/message.py @@ -0,0 +1,81 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from sets import Set + +class Message: + + COMMON_FIELDS = Set(("content", "method", "fields")) + + def __init__(self, method, fields, content = None): + self.method = method + self.fields = fields + self.content = content + + def __len__(self): + l = len(self.fields) + if method.content: + l += 1 + return len(self.fields) + + def _idx(self, idx): + if idx < 0: idx += len(self) + if idx < 0 or idx > len(self): + raise IndexError(idx) + return idx + + def __getitem__(self, idx): + idx = self._idx(idx) + if idx == len(self.fields): + return self.content + else: + return self.fields[idx] + + def __setitem__(self, idx, value): + idx = self._idx(idx) + if idx == len(self.fields): + self.content = value + else: + self.fields[idx] = value + + def _slot(self, attr): + if attr in Message.COMMON_FIELDS: + env = self.__dict__ + key = attr + else: + env = self.fields + try: + field = self.method.fields.bypyname[attr] + key = self.method.fields.index(field) + except KeyError: + raise AttributeError(attr) + return env, key + + def __getattr__(self, attr): + env, key = self._slot(attr) + return env[key] + + def __setattr__(self, attr, value): + env, key = self._slot(attr) + env[attr] = value + + STR = "%s %s content = %s" + REPR = STR.replace("%s", "%r") + + def __str__(self): + return Message.STR % (self.method, self.fields, self.content) + + def __repr__(self): + return Message.REPR % (self.method, self.fields, self.content) diff --git a/python/qpid/peer.py b/python/qpid/peer.py new file mode 100644 index 0000000000..4179a05568 --- /dev/null +++ b/python/qpid/peer.py @@ -0,0 +1,209 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module contains a skeletal peer implementation useful for +implementing an AMQP server, client, or proxy. The peer implementation +sorts incoming frames to their intended channels, and dispatches +incoming method frames to a delegate. +""" + +import thread, traceback, socket, sys, logging +from connection import Frame, EOF, Method, Header, Body +from message import Message +from queue import Queue, Closed as QueueClosed +from content import Content +from cStringIO import StringIO + +class Peer: + + def __init__(self, conn, delegate): + self.conn = conn + self.delegate = delegate + self.outgoing = Queue(0) + self.work = Queue(0) + self.channels = {} + self.Channel = type("Channel%s" % conn.spec.klass.__name__, + (Channel, conn.spec.klass), {}) + self.lock = thread.allocate_lock() + + def channel(self, id): + self.lock.acquire() + try: + try: + ch = self.channels[id] + except KeyError: + ch = self.Channel(id, self.outgoing) + self.channels[id] = ch + finally: + self.lock.release() + return ch + + def start(self): + thread.start_new_thread(self.writer, ()) + thread.start_new_thread(self.reader, ()) + thread.start_new_thread(self.worker, ()) + + def fatal(message=None): + """Call when an unexpected exception occurs that will kill a thread. + + In this case it's better to crash the process than to continue in + an invalid state with a missing thread.""" + if message: print >> sys.stderr, message + traceback.print_exc() + + def reader(self): + try: + while True: + try: + frame = self.conn.read() + except EOF, e: + self.close(e) + break + ch = self.channel(frame.channel) + ch.dispatch(frame, self.work) + except: + self.fatal() + + def close(self, reason): + for ch in self.channels.values(): + ch.close(reason) + self.delegate.close(reason) + + def writer(self): + try: + while True: + try: + message = self.outgoing.get() + self.conn.write(message) + except socket.error, e: + self.close(e) + break + self.conn.flush() + except: + self.fatal() + + def worker(self): + try: + while True: + self.dispatch(self.work.get()) + except: + self.fatal() + + def dispatch(self, queue): + frame = queue.get() + channel = self.channel(frame.channel) + payload = frame.payload + if payload.method.content: + content = read_content(queue) + else: + content = None + # Let the caller deal with exceptions thrown here. + message = Message(payload.method, payload.args, content) + self.delegate.dispatch(channel, message) + +class Closed(Exception): pass + +class Channel: + + def __init__(self, id, outgoing): + self.id = id + self.outgoing = outgoing + self.incoming = Queue(0) + self.responses = Queue(0) + self.queue = None + self.closed = False + self.reason = None + + def close(self, reason): + if self.closed: + return + self.closed = True + self.reason = reason + self.incoming.close() + self.responses.close() + + def dispatch(self, frame, work): + payload = frame.payload + if isinstance(payload, Method): + if payload.method.response: + self.queue = self.responses + else: + self.queue = self.incoming + work.put(self.incoming) + self.queue.put(frame) + + def invoke(self, method, args, content = None): + if self.closed: + raise Closed(self.reason) + + frame = Frame(self.id, Method(method, *args)) + self.outgoing.put(frame) + + if method.content: + if content == None: + content = Content() + self.write_content(method.klass, content, self.outgoing) + + try: + # here we depend on all nowait fields being named nowait + f = method.fields.byname["nowait"] + nowait = args[method.fields.index(f)] + except KeyError: + nowait = False + + try: + if not nowait and method.responses: + resp = self.responses.get().payload + if resp.method.content: + content = read_content(self.responses) + else: + content = None + if resp.method in method.responses: + return Message(resp.method, resp.args, content) + else: + raise ValueError(resp) + except QueueClosed, e: + if self.closed: + raise Closed(self.reason) + else: + raise e + + def write_content(self, klass, content, queue): + size = content.size() + header = Frame(self.id, Header(klass, content.weight(), size)) + queue.put(header) + for child in content.children: + self.write_content(klass, child, queue) + # should split up if content.body exceeds max frame size + if size > 0: + queue.put(Frame(self.id, Body(content.body))) + +def read_content(queue): + frame = queue.get() + header = frame.payload + children = [] + for i in range(header.weight): + children.append(read_content(queue)) + size = header.size + read = 0 + buf = StringIO() + while read < size: + body = queue.get() + content = body.payload.content + buf.write(content) + read += len(content) + return Content(buf.getvalue(), children, header.properties.copy()) diff --git a/python/qpid/queue.py b/python/qpid/queue.py new file mode 100644 index 0000000000..491cc3947d --- /dev/null +++ b/python/qpid/queue.py @@ -0,0 +1,42 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module augments the standard python multithreaded Queue +implementation to add a close() method so that threads blocking on the +content of a queue can be notified if the queue is no longer in use. +""" + +from Queue import Queue as BaseQueue, Empty, Full + +class Closed(Exception): pass + +class Queue(BaseQueue): + + END = object() + + def close(self): + self.put(Queue.END) + + def get(self, block = True, timeout = None): + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result diff --git a/python/qpid/spec.py b/python/qpid/spec.py new file mode 100644 index 0000000000..70e09aa1e9 --- /dev/null +++ b/python/qpid/spec.py @@ -0,0 +1,349 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import re, textwrap, new, xmlutil + +class SpecContainer: + + def __init__(self): + self.items = [] + self.byname = {} + self.byid = {} + self.indexes = {} + self.bypyname = {} + + def add(self, item): + if self.byname.has_key(item.name): + raise ValueError("duplicate name: %s" % item) + if self.byid.has_key(item.id): + raise ValueError("duplicate id: %s" % item) + pyname = pythonize(item.name) + if self.bypyname.has_key(pyname): + raise ValueError("duplicate pyname: %s" % item) + self.indexes[item] = len(self.items) + self.items.append(item) + self.byname[item.name] = item + self.byid[item.id] = item + self.bypyname[pyname] = item + + def index(self, item): + try: + return self.indexes[item] + except KeyError: + raise ValueError(item) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + +class Metadata: + + PRINT = [] + + def __init__(self): + pass + + def __str__(self): + args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) + return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) + + def __repr__(self): + return str(self) + +class Spec(Metadata): + + PRINT=["major", "minor", "file"] + + def __init__(self, major, minor, file): + Metadata.__init__(self) + self.major = major + self.minor = minor + self.file = file + self.constants = SpecContainer() + self.classes = SpecContainer() + + def post_load(self): + self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) + self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + + def parse_method(self, name): + parts = re.split(r"\s*\.\s*", name) + if len(parts) != 2: + raise ValueError(name) + klass, meth = parts + return self.classes.byname[klass].methods.byname[meth] + + def define_module(self, name, doc = None): + module = new.module(name, doc) + module.__file__ = self.file + for c in self.classes: + classname = pythonize(c.name) + cls = c.define_class(classname) + cls.__module__ = module.__name__ + setattr(module, classname, cls) + return module + + def define_class(self, name): + methods = {} + for c in self.classes: + for m in c.methods: + meth = pythonize(m.klass.name + "_" + m.name) + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Constant(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, klass, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.klass = klass + self.docs = docs + +class Class(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, handler, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.handler = handler + self.fields = SpecContainer() + self.methods = SpecContainer() + self.docs = docs + + def define_class(self, name): + methods = {} + for m in self.methods: + meth = pythonize(m.name) + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Method(Metadata): + + PRINT=["name", "id"] + + def __init__(self, klass, name, id, content, responses, synchronous, + description, docs): + Metadata.__init__(self) + self.klass = klass + self.name = name + self.id = id + self.content = content + self.responses = responses + self.synchronous = synchronous + self.fields = SpecContainer() + self.description = description + self.docs = docs + self.response = False + + def docstring(self): + s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) + for f in self.fields: + if f.docs: + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] + + [fill(d, 4) for d in f.docs[1:]]) + return s + + METHOD = "__method__" + DEFAULTS = {"bit": False, + "shortstr": "", + "longstr": "", + "table": {}, + "octet": 0, + "short": 0, + "long": 0, + "longlong": 0} + + def define_method(self, name): + g = {Method.METHOD: self} + l = {} + args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields] + if self.content: + args += [("content", None)] + code = "def %s(self, %s):\n" % \ + (name, ", ".join(["%s = %r" % a for a in args])) + code += " %r\n" % self.docstring() + if self.content: + methargs = args[:-1] + else: + methargs = args + argnames = ", ".join([a[0] for a in methargs]) + code += " return self.invoke(%s" % Method.METHOD + if argnames: + code += ", (%s,)" % argnames + if self.content: + code += ", content" + code += ")" + exec code in g, l + return l[name] + +class Field(Metadata): + + PRINT=["name", "id", "type"] + + def __init__(self, name, id, type, docs): + Metadata.__init__(self) + self.name = name + self.id = id + self.type = type + self.docs = docs + +def get_docs(nd): + return [n.text for n in nd["doc"]] + +def load_fields(nd, l, domains): + for f_nd in nd["field"]: + try: + type = f_nd["@type"] + except KeyError: + type = domains[f_nd["@domain"]] + l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd))) + +def load(specfile): + doc = xmlutil.parse(specfile) + root = doc["amqp"][0] + spec = Spec(int(root["@major"]), int(root["@minor"]), specfile) + + # constants + for nd in root["constant"]: + const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"), + get_docs(nd)) + spec.constants.add(const) + + # domains are typedefs + domains = {} + for nd in root["domain"]: + domains[nd["@name"]] = nd["@type"] + + # classes + for c_nd in root["class"]: + klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"], + get_docs(c_nd)) + load_fields(c_nd, klass.fields, domains) + for m_nd in c_nd["method"]: + meth = Method(klass, m_nd["@name"], + int(m_nd["@index"]), + m_nd.get_bool("@content", False), + [nd["@name"] for nd in m_nd["response"]], + m_nd.get_bool("@synchronous", False), + m_nd.text, + get_docs(m_nd)) + load_fields(m_nd, meth.fields, domains) + klass.methods.add(meth) + # resolve the responses + for m in klass.methods: + m.responses = [klass.methods.byname[r] for r in m.responses] + for resp in m.responses: + resp.response = True + spec.classes.add(klass) + spec.post_load() + return spec + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def pythonize(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +def fill(text, indent, heading = None): + sub = indent * " " + if heading: + init = (indent - 2) * " " + heading + " -- " + else: + init = sub + w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) + return w.fill(" ".join(text.split())) + +class Rule(Metadata): + + PRINT = ["text", "implement", "tests"] + + def __init__(self, text, implement, tests, path): + self.text = text + self.implement = implement + self.tests = tests + self.path = path + +def find_rules(node, rules): + if node.name == "rule": + rules.append(Rule(node.text, node.get("@implement"), + [ch.text for ch in node if ch.name == "test"], + node.path())) + if node.name == "doc" and node.get("@name") == "rule": + tests = [] + if node.has("@test"): + tests.append(node["@test"]) + rules.append(Rule(node.text, None, tests, node.path())) + for child in node: + find_rules(child, rules) + +def load_rules(specfile): + rules = [] + find_rules(xmlutil.parse(specfile), rules) + return rules + +def test_summary(): + template = """ + <html><head><title>AMQP Tests</title></head> + <body> + <table width="80%%" align="center"> + %s + </table> + </body> + </html> + """ + rows = [] + for rule in load_rules("amqp.org/specs/amqp7.xml"): + if rule.tests: + tests = ", ".join(rule.tests) + else: + tests = " " + rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' + '<td><b>Implement:</b> %s</td>' + '<td><b>Tests:</b> %s</td></tr>' % + (rule.path[len("/root/amqp"):], rule.implement, tests)) + rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) + rows.append('<tr><td colspan="3"> </td></tr>') + + print template % "\n".join(rows) diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py new file mode 100644 index 0000000000..ff9ecbee8a --- /dev/null +++ b/python/qpid/testlib.py @@ -0,0 +1,221 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Support library for qpid python tests. +# + +import sys, re, unittest, os, random, logging +import qpid.client, qpid.spec +from getopt import getopt, GetoptError + + +def findmodules(root): + """Find potential python modules under directory root""" + found = [] + for dirpath, subdirs, files in os.walk(root): + modpath = dirpath.replace(os.sep, '.') + if not re.match(r'\.svn$', dirpath): # Avoid SVN directories + for f in files: + match = re.match(r'(.+)\.py$', f) + if match and f != '__init__.py': + found.append('.'.join([modpath, match.group(1)])) + return found + +def default(value, default): + if (value == None): return default + else: return value + +class TestRunner: + """Runs unit tests. + + Parses command line arguments, provides utility functions for tests, + runs the selected test suite. + """ + + def _die(self, message = None): + if message: print message + print """ +run-tests [options] [test*] +The name of a test is package.module.ClassName.testMethod +Options: + -?/-h/--help : this message + -s/--spec <spec.xml> : file containing amqp XML spec + -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. + """ + sys.exit(1) + + def setBroker(self, broker): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(broker) + if not match: self._die("'%s' is not a valid broker" % (broker)) + self.user, self.password, self.host, self.port = match.groups() + self.port = int(default(self.port, 5672)) + self.user = default(self.user, "guest") + self.password = default(self.password, "guest") + + def __init__(self): + # Defaults + self.setBroker("localhost") + self.spec = "../specs/amqp-8.0.xml" + self.verbose = 1 + self.ignore = [] + + def ignoreFile(self, filename): + f = file(filename) + for line in f.readlines(): self.ignore.append(line.strip()) + f.close() + + def _parseargs(self, args): + try: + opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) + except GetoptError, e: + self._die(str(e)) + for opt, value in opts: + if opt in ("-?", "-h", "--help"): self._die() + if opt in ("-s", "--spec"): self.spec = value + if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-v", "--verbose"): self.verbose = 2 + if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) + if opt in ("-i", "--ignore"): self.ignore.append(value) + if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + + if len(self.tests) == 0: self.tests=findmodules("tests") + + def testSuite(self): + class IgnoringTestSuite(unittest.TestSuite): + def addTest(self, test): + if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore: + return + unittest.TestSuite.addTest(self, test) + + # Use our IgnoringTestSuite in the test loader. + unittest.TestLoader.suiteClass = IgnoringTestSuite + return unittest.defaultTestLoader.loadTestsFromNames(self.tests) + + def run(self, args=sys.argv[1:]): + self._parseargs(args) + runner = unittest.TextTestRunner(descriptions=False, + verbosity=self.verbose) + result = runner.run(self.testSuite()) + if (self.ignore): + print "=======================================" + print "NOTE: the following tests were ignored:" + for t in self.ignore: print t + print "=======================================" + return result.wasSuccessful() + + def connect(self, host=None, port=None, spec=None, user=None, password=None): + """Connect to the broker, returns a qpid.client.Client""" + host = host or self.host + port = port or self.port + spec = spec or self.spec + user = user or self.user + password = password or self.password + client = qpid.client.Client(host, port, qpid.spec.load(spec)) + client.start({"LOGIN": user, "PASSWORD": password}) + return client + + +# Global instance for tests to call connect. +testrunner = TestRunner() + + +class TestBase(unittest.TestCase): + """Base class for Qpid test cases. + + self.client is automatically connected with channel 1 open before + the test methods are run. + + Deletes queues and exchanges after. Tests call + self.queue_declare(channel, ...) and self.exchange_declare(chanel, + ...) which are wrappers for the Channel functions that note + resources to clean up later. + """ + + def setUp(self): + self.queues = [] + self.exchanges = [] + self.client = self.connect() + self.channel = self.client.channel(1) + self.channel.channel_open() + + def tearDown(self): + # TODO aconway 2006-09-05: Wrong behaviour here, we should + # close all open channels (checking for exceptions on the + # channesl) then open a channel to clean up qs and exs, + # finally close that channel. + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + + def connect(self, *args, **keys): + """Create a new connction, return the Client object""" + return testrunner.connect(*args, **keys) + + def queue_declare(self, channel=None, *args, **keys): + channel = channel or self.channel + reply = channel.queue_declare(*args, **keys) + self.queues.append((channel, reply.queue)) + return reply + + def exchange_declare(self, channel=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, internal=False, nowait=False, + arguments={}): + channel = channel or self.channel + reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) + # TODO aconway 2006-09-14: Don't add exchange on failure. + self.exchanges.append((channel,exchange)) + return reply + + def assertPublishConsume(self, queue="", exchange="", routing_key=""): + """ + Publish a message and consume it, assert it comes back intact. + + queue can be a single queue name or a list of queue names. + For a list assert the message appears on all queues. + Crude attempt to make unique messages so we can't consume + a message not really meant for us. + """ + body = "TestMessage("+str(random.randint(999999, 1000000))+")" + self.channel.basic_publish(exchange=exchange, + content=qpid.content.Content(body), + routing_key=routing_key) + if not isinstance(queue, list): queue = [queue] + for q in queue: + reply = self.channel.basic_consume(queue=q, no_ack=True) + msg = self.client.queue(reply.consumer_tag).get(timeout=2) + self.assertEqual(body, msg.content.body) + + + def assertChannelException(self, expectedCode, message): + self.assertEqual(message.method.klass.name, "channel") + self.assertEqual(message.method.name, "close") + self.assertEqual(message.reply_code, expectedCode) + + + def assertConnectionException(self, expectedCode, message): + self.assertEqual(message.method.klass.name, "connection") + self.assertEqual(message.method.name, "close") + self.assertEqual(message.reply_code, expectedCode) + diff --git a/python/qpid/xmlutil.py b/python/qpid/xmlutil.py new file mode 100644 index 0000000000..8f8a7116f5 --- /dev/null +++ b/python/qpid/xmlutil.py @@ -0,0 +1,116 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +XML utilities used by spec.py +""" + +import xml.sax +from xml.sax.handler import ContentHandler + +def parse(file): + doc = Node("root") + xml.sax.parse(file, Builder(doc)) + return doc + +class Node: + + def __init__(self, name, attrs = None, text = None, parent = None): + self.name = name + self.attrs = attrs + self.text = text + self.parent = parent + self.children = [] + if parent != None: + parent.children.append(self) + + def get_bool(self, key, default = False): + v = self.get(key) + if v == None: + return default + else: + return bool(int(v)) + + def index(self): + if self.parent: + return self.parent.children.index(self) + else: + return 0 + + def has(self, key): + try: + result = self[key] + return True + except KeyError: + return False + except IndexError: + return False + + def get(self, key, default = None): + if self.has(key): + return self[key] + else: + return default + + def __getitem__(self, key): + if callable(key): + return filter(key, self.children) + else: + t = key.__class__ + meth = "__get%s__" % t.__name__ + if hasattr(self, meth): + return getattr(self, meth)(key) + else: + raise KeyError(key) + + def __getstr__(self, name): + if name[:1] == "@": + return self.attrs[name[1:]] + else: + return self[lambda nd: nd.name == name] + + def __getint__(self, index): + return self.children[index] + + def __iter__(self): + return iter(self.children) + + def path(self): + if self.parent == None: + return "/%s" % self.name + else: + return "%s/%s" % (self.parent.path(), self.name) + +class Builder(ContentHandler): + + def __init__(self, start = None): + self.node = start + + def __setitem__(self, element, type): + self.types[element] = type + + def startElement(self, name, attrs): + self.node = Node(name, attrs, None, self.node) + + def endElement(self, name): + self.node = self.node.parent + + def characters(self, content): + if self.node.text == None: + self.node.text = content + else: + self.node.text += content + diff --git a/python/rule2test b/python/rule2test new file mode 100755 index 0000000000..b57ea9e24e --- /dev/null +++ b/python/rule2test @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +# +# Convert rules to tests +# +import sys, re, os.path +from getopt import getopt, GetoptError +from string import capitalize +from xml import dom +from xml.dom.minidom import parse + +def camelcase(s): + """Convert 'string like this' to 'StringLikeThis'""" + return "".join([capitalize(w) for w in re.split(re.compile("\W*"), s)]) + +def uncapitalize(s): return s[0].lower()+s[1:] + +def ancestors(node): + "Return iterator of ancestors from top-level element to node" + def generator(node): + while node and node.parentNode: + yield node + node = node.parentNode + return reversed(list(generator(node))) + +def tagAndName(element): + nameAttr = element.getAttribute("name"); + if (nameAttr) : return camelcase(nameAttr) + camelcase(element.tagName) + else: return camelcase(element.tagName) + +def nodeText(n): + """Recursively collect text from all text nodes under n""" + if n.nodeType == dom.Node.TEXT_NODE: + return n.data + if n.childNodes: + return reduce(lambda t, c: t + nodeText(c), n.childNodes, "") + return "" + +def cleanup(docString, level=8): + unindent = re.sub("\n[ \t]*", "\n", docString.strip()) + emptyLines = re.sub("\n\n\n", "\n\n", unindent) + indented = re.sub("\n", "\n"+level*" ", emptyLines) + return level*" " + indented + +def printTest(test, docstring): + print "class %s(TestBase):" % test + print ' """' + print docstring + print ' """' + print + print + +def printTests(doc, module): + """Returns dictionary { classname : [ (methodname, docstring)* ] * }""" + tests = {} + rules = doc.getElementsByTagName("rule") + for r in rules: + path = list(ancestors(r)) + if module == path[1].getAttribute("name").lower(): + test = "".join(map(tagAndName, path[2:])) + "Tests" + docstring = cleanup(nodeText(r), 4) + printTest(test, docstring) + +def usage(message=None): + if message: print >>sys.stderr, message + print >>sys.stderr, """ +rule2test [options] <amqpclass> + +Print test classes for each rule for the amqpclass in amqp.xml. + +Options: + -?/-h/--help : this message + -s/--spec <spec.xml> : file containing amqp XML spec +""" + return 1 + +def main(argv): + try: opts, args = getopt(argv[1:], "h?s:", ["help", "spec="]) + except GetoptError, e: return usage(e) + spec = "../specs/amqp.xml" # Default + for opt, val in opts: + if (opt in ("-h", "-?", "--help")): return usage() + if (opt in ("-s", "--spec")): spec = val + doc = parse(spec) + if len(args) == 0: return usage() + printTests(doc, args[0]) + return 0 + +if (__name__ == "__main__"): sys.exit(main(sys.argv)) diff --git a/python/run-tests b/python/run-tests new file mode 100755 index 0000000000..c49bd32a96 --- /dev/null +++ b/python/run-tests @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +from qpid.testlib import testrunner + +if not testrunner.run(): sys.exit(1) + + + diff --git a/python/tests/__init__.py b/python/tests/__init__.py new file mode 100644 index 0000000000..d55ff3fd85 --- /dev/null +++ b/python/tests/__init__.py @@ -0,0 +1 @@ +# Do not delete - marks this directory as a python package. diff --git a/python/tests/basic.py b/python/tests/basic.py new file mode 100644 index 0000000000..b912fc40be --- /dev/null +++ b/python/tests/basic.py @@ -0,0 +1,115 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BasicTests(TestBase): + """Tests for 'methods' on the amqp basic 'class'""" + + def test_consume_no_local(self): + """ + Test that the no_local flag is honoured in the consume method + """ + channel = self.channel + #setup, declare two queues: + channel.queue_declare(queue="test-queue-1a", exclusive=True) + channel.queue_declare(queue="test-queue-1b", exclusive=True) + #establish two consumers one of which excludes delivery of locally sent messages + channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") + channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) + + #send a message + channel.basic_publish(exchange="amq.direct", routing_key="test-queue-1a", content=Content("consume_no_local")) + channel.basic_publish(exchange="amq.direct", routing_key="test-queue-1b", content=Content("consume_no_local")) + + #check the queues of the two consumers + excluded = self.client.queue("local_excluded") + included = self.client.queue("local_included") + msg = included.get(timeout=1) + self.assertEqual("consume_no_local", msg.content.body) + try: + excluded.get(timeout=1) + self.fail("Received locally published message though no_local=true") + except Empty: None + + + def test_consume_exclusive(self): + """ + Test that the exclusive flag is honoured in the consume method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-2", exclusive=True) + + #check that an exclusive consumer prevents other consumer being created: + channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2") + self.fail("Expected consume request to fail due to previous exclusive consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + #open new channel and cleanup last consumer: + channel = self.client.channel(2) + channel.channel_open() + + #check that an exclusive consumer cannot be created if a consumer already exists: + channel.basic_consume(consumer_tag="first", queue="test-queue-2") + try: + channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) + self.fail("Expected exclusive consume request to fail due to previous consumer") + except Closed, e: + self.assertChannelException(403, e.args[0]) + + def test_consume_queue_errors(self): + """ + Test error conditions associated with the queue field of the consume method: + """ + channel = self.channel + try: + #queue specified but doesn't exist: + channel.basic_consume(queue="invalid-queue") + self.fail("Expected failure when consuming from non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(2) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.basic_consume(queue="") + self.fail("Expected failure when consuming from unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + def test_consume_unique_consumers(self): + """ + Ensure unique consumer tags are enforced + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-3", exclusive=True) + + #check that attempts to use duplicate tags are detected and prevented: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + try: + channel.basic_consume(consumer_tag="first", queue="test-queue-3") + self.fail("Expected consume request to fail due to non-unique tag") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + diff --git a/python/tests/broker.py b/python/tests/broker.py new file mode 100644 index 0000000000..1345076604 --- /dev/null +++ b/python/tests/broker.py @@ -0,0 +1,84 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class BrokerTests(TestBase): + """Tests for basic Broker functionality""" + + def test_amqp_basic_13(self): + """ + First, this test tries to receive a message with a no-ack + consumer. Second, this test tries to explicitely receive and + acknowledge a message with an acknowledging consumer. + """ + ch = self.channel + self.queue_declare(ch, queue = "myqueue") + + # No ack consumer + ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag + body = "test no-ack" + ch.basic_publish(routing_key = "myqueue", content = Content(body)) + msg = self.client.queue(ctag).get(timeout = 5) + self.assert_(msg.content.body == body) + + # Acknowleding consumer + self.queue_declare(ch, queue = "otherqueue") + ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag + body = "test ack" + ch.basic_publish(routing_key = "otherqueue", content = Content(body)) + msg = self.client.queue(ctag).get(timeout = 5) + ch.basic_ack(delivery_tag = msg.delivery_tag) + self.assert_(msg.content.body == body) + + # TODO: Ensure we get a failure if an ack consumer doesn't ack. + + def test_basic_delivery_immediate(self): + """ + Test basic message delivery where consume is issued before publish + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + + body = "Immediate Delivery" + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + + # TODO: Ensure we fail if immediate=True and there's no consumer. + + + def test_basic_delivery_queued(self): + """ + Test basic message delivery where publish is issued before consume + (i.e. requires queueing of the message) + """ + channel = self.channel + self.exchange_declare(channel, exchange="test-exchange", type="direct") + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + body = "Queued Delivery" + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=5) + self.assert_(msg.content.body == body) + diff --git a/python/tests/example.py b/python/tests/example.py new file mode 100644 index 0000000000..47d07fe9c5 --- /dev/null +++ b/python/tests/example.py @@ -0,0 +1,91 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class ExampleTest (TestBase): + """ + An example Qpid test, illustrating the unittest frameowkr and the + python Qpid client. The test class must inherit TestCase. The + test code uses the Qpid client to interact with a qpid broker and + verify it behaves as expected. + """ + + def test_example(self): + """ + An example test. Note that test functions must start with 'test_' + to be recognized by the test framework. + """ + + # By inheriting TestBase, self.client is automatically connected + # and self.channel is automatically opened as channel(1) + # Other channel methods mimic the protocol. + channel = self.channel + + # Now we can send regular commands. If you want to see what the method + # arguments mean or what other commands are available, you can use the + # python builtin help() method. For example: + #help(chan) + #help(chan.exchange_declare) + + # If you want browse the available protocol methods without being + # connected to a live server you can use the amqp-doc utility: + # + # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] + # + # Options: + # -e, --regexp use regex instead of glob when matching + + # Now that we know what commands are available we can use them to + # interact with the server. + + # Here we use ordinal arguments. + self.exchange_declare(channel, 0, "test", "direct") + + # Here we use keyword arguments. + self.queue_declare(channel, queue="test-queue") + channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") + + # Call Channel.basic_consume to register as a consumer. + # All the protocol methods return a message object. The message object + # has fields corresponding to the reply method fields, plus a content + # field that is filled if the reply includes content. In this case the + # interesting field is the consumer_tag. + reply = channel.basic_consume(queue="test-queue") + + # We can use the Client.queue(...) method to access the queue + # corresponding to our consumer_tag. + queue = self.client.queue(reply.consumer_tag) + + # Now lets publish a message and see if our consumer gets it. To do + # this we need to import the Content class. + body = "Hello World!" + channel.basic_publish(exchange="test", + routing_key="key", + content=Content(body)) + + # Now we'll wait for the message to arrive. We can use the timeout + # argument in case the server hangs. By default queue.get() will wait + # until a message arrives or the connection to the server dies. + msg = queue.get(timeout=10) + + # And check that we got the right response with assertEqual + self.assertEqual(body, msg.content.body) + + # Now acknowledge the message. + channel.basic_ack(msg.delivery_tag, True) + diff --git a/python/tests/exchange.py b/python/tests/exchange.py new file mode 100644 index 0000000000..b9b16bad78 --- /dev/null +++ b/python/tests/exchange.py @@ -0,0 +1,234 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for exchange behaviour. + +Test classes ending in 'RuleTests' are derived from rules in amqp.xml. +""" + +import logging, Queue +from qpid.testlib import TestBase +from qpid.content import Content + + +# TODO aconway 2006-09-01: Investigate and add tests as appropriate. +# Observered on C++: +# +# No exception raised for basic_consume on non-existent queue name. +# No exception for basic_publish with bad routing key. +# No exception for binding to non-existent exchange? +# queue_bind hangs with invalid exchange name +# +# Do server exceptions get propagated properly? +# Do Java exceptions propagate with any data (or just Closed()) + +class StandardExchangeVerifier: + """Verifies standard exchange behavior. + + Used as base class for classes that test standard exchanges.""" + + def verifyDirectExchange(self, ex): + """Verify that ex behaves like a direct exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + + def verifyFanOutExchange(self, ex): + """Verify that ex behaves like a fanout exchange.""" + self.queue_declare(queue="q") + self.channel.queue_bind(queue="q", exchange=ex) + self.queue_declare(queue="p") + self.channel.queue_bind(queue="p", exchange=ex) + self.assertPublishConsume(exchange=ex, queue=["q","p"]) + + +class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server SHOULD implement these standard exchange types: topic, headers. + + Client attempts to declare an exchange with each of these standard types. + """ + + def testDirect(self): + """Declare and test a direct exchange""" + self.exchange_declare(0, exchange="d", type="direct") + self.verifyDirectExchange("d") + + def testFanout(self): + """Declare and test a fanout exchange""" + self.exchange_declare(0, exchange="f", type="fanout") + self.verifyFanOutExchange("f") + + +class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): + """ + The server MUST, in each virtual host, pre-declare an exchange instance + for each standard exchange type that it implements, where the name of the + exchange instance is amq. followed by the exchange type name. + + Client creates a temporary queue and attempts to bind to each required + exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if + those types are defined). + """ + # TODO aconway 2006-09-01: Add tests for 3.1.3.1: + # - Test auto binding by q name + # - Test the nameless "default publish" exchange. + # - Auto created amq.fanout exchange + + def testAmqDirect(self): self.verifyDirectExchange("amq.direct") + + def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") + + def testAmqTopic(self): + self.exchange_declare(0, exchange="amq.topic", passive="true") + # TODO aconway 2006-09-14: verify topic behavior + + def testAmqHeaders(self): + self.exchange_declare(0, exchange="amq.headers", passive="true") + # TODO aconway 2006-09-14: verify headers behavior + +class DefaultExchangeRuleTests(TestBase): + """ + The server MUST predeclare a direct exchange to act as the default exchange + for content Publish methods and for default queue bindings. + + Client checks that the default exchange is active by specifying a queue + binding with no exchange name, and publishing a message with a suitable + routing key but without specifying the exchange name, then ensuring that + the message arrives in the queue correctly. + """ + + +class DefaultAccessRuleTests(TestBase): + """ + The server MUST NOT allow clients to access the default exchange except + by specifying an empty exchange name in the Queue.Bind and content Publish + methods. + """ + + +class ExtensionsRuleTests(TestBase): + """ + The server MAY implement other exchange types as wanted. + """ + + +class DeclareMethodMinimumRuleTests(TestBase): + """ + The server SHOULD support a minimum of 16 exchanges per virtual host and + ideally, impose no limit except as defined by available resources. + + The client creates as many exchanges as it can until the server reports + an error; the number of exchanges successfuly created must be at least + sixteen. + """ + + +class DeclareMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access to + the realm in which the exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeclareMethodExchangeFieldReservedRuleTests(TestBase): + """ + Exchange names starting with "amq." are reserved for predeclared and + standardised exchanges. The client MUST NOT attempt to create an exchange + starting with "amq.". + + + """ + + +class DeclareMethodTypeFieldTypedRuleTests(TestBase): + """ + Exchanges cannot be redeclared with different types. The client MUST not + attempt to redeclare an existing exchange with a different type than used + in the original Exchange.Declare method. + + + """ + + +class DeclareMethodTypeFieldSupportRuleTests(TestBase): + """ + The client MUST NOT attempt to create an exchange with a type that the + server does not support. + + + """ + + +class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): + """ + If set, and the exchange does not already exist, the server MUST raise a + channel exception with reply code 404 (not found). + + + """ + + +class DeclareMethodDurableFieldSupportRuleTests(TestBase): + """ + The server MUST support both durable and transient exchanges. + + + """ + + +class DeclareMethodDurableFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the durable field if the exchange already exists. + + + """ + + +class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): + """ + The server MUST ignore the auto-delete field if the exchange already + exists. + + + """ + + +class DeleteMethodTicketFieldValidityRuleTests(TestBase): + """ + The client MUST provide a valid access ticket giving "active" access + rights to the exchange's access realm. + + Client creates access ticket with wrong access rights and attempts to use + in this method. + """ + + +class DeleteMethodExchangeFieldExistsRuleTests(TestBase): + """ + The client MUST NOT attempt to delete an exchange that does not exist. + """ + + diff --git a/python/tests/queue.py b/python/tests/queue.py new file mode 100644 index 0000000000..92260a7d64 --- /dev/null +++ b/python/tests/queue.py @@ -0,0 +1,254 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase + +class QueueTests(TestBase): + """Tests for 'methods' on the amqp queue 'class'""" + + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + channel = self.channel + #setup, declare a queue and add some messages to it: + channel.exchange_declare(exchange="test-exchange", type="direct") + channel.queue_declare(queue="test-queue", exclusive=True) + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) + + #check that the queue now reports 3 messages: + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + reply = channel.queue_purge(queue="test-queue"); + self.assertEqual(3, reply.message_count) + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) + reply = channel.basic_consume(queue="test-queue", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=1) + self.assertEqual("four", msg.content.body) + + #check error conditions (use new channels): + channel = self.client.channel(2) + channel.channel_open() + try: + #queue specified but doesn't exist: + channel.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(3) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + #cleanup + channel = self.client.channel(4) + channel.channel_open() + channel.exchange_delete(exchange="test-exchange") + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened channel(1) + c1 = self.channel + # Here we open a second separate connection: + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + + #declare an exclusive queue: + c1.queue_declare(queue="exclusive-queue", exclusive="True") + try: + #other connection should not be allowed to declare this: + c2.queue_declare(queue="exclusive-queue", exclusive="True") + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except Closed, e: + self.assertChannelException(405, e.args[0]) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + channel = self.channel + #declare an exclusive queue: + channel.queue_declare(queue="passive-queue-1", exclusive="True") + channel.queue_declare(queue="passive-queue-1", passive="True") + try: + #other connection should not be allowed to declare this: + channel.queue_declare(queue="passive-queue-2", passive="True") + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + channel = self.channel + channel.queue_declare(queue="queue-1", exclusive="True") + + #straightforward case, both exchange & queue exist so no errors expected: + channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + + #bind the default queue for the channel (i.e. last one declared): + channel.queue_bind(exchange="amq.direct", routing_key="key2") + + #use the queue name where neither routing key nor queue are specified: + channel.queue_bind(exchange="amq.direct") + + #try and bind to non-existant exchange + try: + channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #need to reopen a channel: + channel = self.client.channel(2) + channel.channel_open() + + #try and bind non-existant queue: + try: + channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_delete_simple(self): + """ + Test basic queue deletion + """ + channel = self.client.channel(1) + channel.channel_open() + + #straight-forward case: + channel.queue_declare(queue="delete-me") + channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("a")) + channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("b")) + channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("c")) + reply = channel.queue_delete(queue="delete-me") + self.assertEqual(3, reply.message_count) + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="delete-me", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #check attempted deletion of non-existant queue is handled correctly: + channel = self.client.channel(2) + channel.channel_open() + try: + channel.queue_delete(queue="i-dont-exist", if_empty="True") + self.fail("Expected delete of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + channel = self.client.channel(1) + channel.channel_open() + + #create a queue and add a message to it (use default binding): + channel.queue_declare(queue="delete-me-2") + channel.queue_declare(queue="delete-me-2", passive="True") + channel.basic_publish(exchange="amq.direct", routing_key="delete-me-2", content=Content("message")) + + #try to delete, but only if empty: + try: + channel.queue_delete(queue="delete-me-2", if_empty="True") + self.fail("Expected delete if_empty to fail for non-empty queue") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + #need new channel now: + channel = self.client.channel(2) + channel.channel_open() + + #empty queue: + reply = channel.basic_consume(queue="delete-me-2", no_ack=True) + queue = self.client.queue(reply.consumer_tag) + msg = queue.get(timeout=1) + self.assertEqual("message", msg.content.body) + channel.basic_cancel(consumer_tag=reply.consumer_tag) + + #retry deletion on empty queue: + channel.queue_delete(queue="delete-me-2", if_empty="True") + + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-2", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + channel = self.client.channel(1) + channel.channel_open() + + #create a queue and register a consumer: + channel.queue_declare(queue="delete-me-3") + channel.queue_declare(queue="delete-me-3", passive="True") + reply = channel.basic_consume(queue="delete-me-3", no_ack=True) + + #need new channel now: + channel2 = self.client.channel(2) + channel2.channel_open() + #try to delete, but only if empty: + try: + channel2.queue_delete(queue="delete-me-3", if_unused="True") + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + + channel.basic_cancel(consumer_tag=reply.consumer_tag) + channel.queue_delete(queue="delete-me-3", if_unused="True") + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-3", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + |