diff options
Diffstat (limited to 'qpid/python')
66 files changed, 1213 insertions, 5111 deletions
diff --git a/qpid/python/README.txt b/qpid/python/README.txt index bae9f6ab0b..772271cffe 100644 --- a/qpid/python/README.txt +++ b/qpid/python/README.txt @@ -27,30 +27,24 @@ python client. The "tests_0-10", "tests_0-9", and "tests_0-8" directories contain protocol level conformance tests for AMQP brokers of the specified version. -Simplest way to run the tests: +The qpid-python-test script may be used to run these tests. It will by +default run the python unit tests and the 0-10 conformance tests: 1. Run a broker on the default port - 2. ./run-tests -s <version> + 2. ./qpid-python-test -Where <version> is one of "0-8", "0-9", or "0-10-errata". +If you wish to run the 0-8 or 0-9 conformence tests, they may be +selected as follows: -See the run-tests usage for for additional options: - - ./run-tests -h + 1. Run a broker on the default port -== Expected failures == + 2. ./qpid-python-test tests_0-8.* -Certain tests are expected to fail due to incomplete functionality or -unresolved interop issues. To skip expected failures for the C++ or -Java brokers: + -- or -- - ./run-tests -I <file-name> + ./qpid-python-test tests_0-9.* -Where <file-name> is one of the following files: +See the qpid-python-test usage for for additional options: - * cpp_failing_0-10.txt - * cpp_failing_0-9.txt - * cpp_failing_0-8.txt - * java_failing_0-9.txt - * java_failing_0-8.txt + ./qpid-python-test -h diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/python/cpp_failing_0-10.txt +++ /dev/null diff --git a/qpid/python/cpp_failing_0-8.txt b/qpid/python/cpp_failing_0-8.txt deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/python/cpp_failing_0-8.txt +++ /dev/null diff --git a/qpid/python/cpp_failing_0-9.txt b/qpid/python/cpp_failing_0-9.txt deleted file mode 100644 index 06c31080fb..0000000000 --- a/qpid/python/cpp_failing_0-9.txt +++ /dev/null @@ -1,4 +0,0 @@ -tests_0-9.message.MessageTests.test_checkpoint -tests_0-9.message.MessageTests.test_reject -tests_0-9.basic.BasicTests.test_get - diff --git a/qpid/python/java_failing_0-8.txt b/qpid/python/java_failing_0-8.txt deleted file mode 100644 index c13b40a42c..0000000000 --- a/qpid/python/java_failing_0-8.txt +++ /dev/null @@ -1,2 +0,0 @@ -tests_0-8.exchange.RecommendedTypesRuleTests.testTopic -tests_0-8.exchange.RequiredInstancesRuleTests.testAmqTopic diff --git a/qpid/python/java_failing_0-9.txt b/qpid/python/java_failing_0-9.txt deleted file mode 100644 index 7252d0f496..0000000000 --- a/qpid/python/java_failing_0-9.txt +++ /dev/null @@ -1,18 +0,0 @@ -ntests.basic.BasicTests.test_qos_prefetch_count -tests.basic.BasicTests.test_ack -tests.basic.BasicTests.test_cancel -tests.basic.BasicTests.test_consume_exclusive -tests.basic.BasicTests.test_consume_no_local -tests.basic.BasicTests.test_consume_queue_errors -tests.basic.BasicTests.test_consume_unique_consumers -tests.basic.BasicTests.test_get -tests.basic.BasicTests.test_qos_prefetch_size -tests.basic.BasicTests.test_recover_requeue - -tests.exchange.RecommendedTypesRuleTests.testTopic -tests.exchange.RequiredInstancesRuleTests.testAmqTopic - -tests.message.MessageTests.test_checkpoint -tests.message.MessageTests.test_reject - -tests.broker.BrokerTests.test_ping_pong diff --git a/qpid/python/mllib/dom.py b/qpid/python/mllib/dom.py index df2b88322a..486f7082e1 100644 --- a/qpid/python/mllib/dom.py +++ b/qpid/python/mllib/dom.py @@ -148,6 +148,21 @@ class Tag(Node): if name == k: return v + def _idx(self, attr): + idx = 0 + for k, v in self.attrs: + if k == attr: + return idx + idx += 1 + return None + + def set_attr(self, name, value): + idx = self._idx(name) + if idx is None: + self.attrs.append((name, value)) + else: + self.attrs[idx] = (name, value) + def dispatch(self, f): try: attr = "do_" + self.name diff --git a/qpid/python/pal2py b/qpid/python/pal2py deleted file mode 100755 index 544151bf76..0000000000 --- a/qpid/python/pal2py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import sys, os, xml - -from qpid.spec import load, pythonize -from textwrap import TextWrapper -from xml.sax.handler import ContentHandler - -class Block: - - def __init__(self, children): - self.children = children - - def emit(self, out): - for child in self.children: - if not hasattr(child, "emit"): - raise ValueError(child) - child.emit(out) - - if not self.children: - out.line("pass") - -class If: - - def __init__(self, expr, cons, alt = None): - self.expr = expr - self.cons = cons - self.alt = alt - - def emit(self, out): - out.line("if ") - self.expr.emit(out) - out.write(":") - out.level += 1 - self.cons.emit(out) - out.level -= 1 - if self.alt: - out.line("else:") - out.level += 1 - self.alt.emit(out) - out.level -= 1 - -class Stmt: - - def __init__(self, code): - self.code = code - - def emit(self, out): - out.line(self.code) - -class Expr: - - def __init__(self, code): - self.code = code - - def emit(self, out): - out.write(self.code) - -class Abort: - - def __init__(self, expr): - self.expr = expr - - def emit(self, out): - out.line("assert False, ") - self.expr.emit(out) - -WRAPPER = TextWrapper() - -def wrap(text): - return WRAPPER.wrap(" ".join(text.split())) - -class Doc: - - def __init__(self, text): - self.text = text - - def emit(self, out): - out.line('"""') - for line in wrap(self.text): - out.line(line) - out.line('"""') - -class Frame: - - def __init__(self, attrs): - self.attrs = attrs - self.children = [] - self.text = None - - def __getattr__(self, attr): - return self.attrs[attr] - -def isunicode(s): - if isinstance(s, str): - return False - for ch in s: - if ord(ch) > 127: - return True - return False - -def string_literal(s): - if s == None: - return None - if isunicode(s): - return "%r" % s - else: - return "%r" % str(s) - -TRUTH = { - "1": True, - "0": False, - "true": True, - "false": False - } - -LITERAL = { - "shortstr": string_literal, - "longstr": string_literal, - "bit": lambda s: TRUTH[s.lower()], - "longlong": lambda s: "%r" % long(s) - } - -def literal(s, field): - return LITERAL[field.type](s) - -def palexpr(s, field): - if s.startswith("$"): - return "msg.%s" % s[1:] - else: - return literal(s, field) - -class Translator(ContentHandler): - - def __init__(self, spec): - self.spec = spec - self.stack = [] - self.content = None - self.root = Frame(None) - self.push(self.root) - - def emit(self, out): - blk = Block(self.root.children) - blk.emit(out) - out.write("\n") - - def peek(self): - return self.stack[-1] - - def pop(self): - return self.stack.pop() - - def push(self, frame): - self.stack.append(frame) - - def startElement(self, name, attrs): - self.push(Frame(attrs)) - - def endElement(self, name): - frame = self.pop() - if hasattr(self, name): - child = getattr(self, name)(frame) - else: - child = self.handle(name, frame) - - if child: - self.peek().children.append(child) - - def characters(self, text): - frame = self.peek() - if frame.text: - frame.text += text - else: - frame.text = text - - def handle(self, name, frame): - for klass in self.spec.classes: - pyklass = pythonize(klass.name) - if name.startswith(pyklass): - name = name[len(pyklass) + 1:] - break - else: - raise ValueError("unknown class: %s" % name) - - for method in klass.methods: - pymethod = pythonize(method.name) - if name == pymethod: - break - else: - raise ValueError("unknown method: %s" % name) - - args = ["%s = %s" % (key, palexpr(val, method.fields.bypyname[key])) - for key, val in frame.attrs.items()] - if method.content and self.content: - args.append("content = %r" % string_literal(self.content)) - code = "ssn.%s_%s(%s)" % (pyklass, pymethod, ", ".join(args)) - if pymethod == "consume": - code = "consumer_tag = %s.consumer_tag" % code - return Stmt(code) - - def pal(self, frame): - return Block([Doc(frame.text)] + frame.children) - - def include(self, frame): - base, ext = os.path.splitext(frame.filename) - return Stmt("from %s import *" % base) - - def session(self, frame): - return Block([Stmt("cli = open()"), Stmt("ssn = cli.channel(0)"), - Stmt("ssn.channel_open()")] + frame.children) - - def empty(self, frame): - return If(Expr("msg == None"), Block(frame.children)) - - def abort(self, frame): - return Abort(Expr(string_literal(frame.text))) - - def wait(self, frame): - return Stmt("msg = ssn.queue(consumer_tag).get(timeout=%r)" % - (int(frame.timeout)/1000)) - - def basic_arrived(self, frame): - if frame.children: - return If(Expr("msg != None"), Block(frame.children)) - - def basic_content(self, frame): - self.content = frame.text - -class Emitter: - - def __init__(self, out): - self.out = out - self.level = 0 - - def write(self, code): - self.out.write(code) - - def line(self, code): - self.write("\n%s%s" % (" "*self.level, code)) - - def flush(self): - self.out.flush() - - def close(self): - self.out.close() - - -for f in sys.argv[2:]: - base, ext = os.path.splitext(f) - spec = load(sys.argv[1]) - t = Translator(spec) - xml.sax.parse(f, t) -# out = Emitter(open("%s.py" % base)) - out = Emitter(sys.stdout) - t.emit(out) - out.close() diff --git a/qpid/python/perftest b/qpid/python/perftest deleted file mode 100755 index f867566fd0..0000000000 --- a/qpid/python/perftest +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -def publisher(n): - import qpid - import sys - from qpid.client import Client - from qpid.content import Content - if len(sys.argv) >= 3: - n = int(sys.argv[2]) - client = Client("127.0.0.1", 5672) - client.start({"LOGIN": "guest", "PASSWORD": "guest"}) - channel = client.channel(1) - channel.session_open() - message = Content("message") - message["routing_key"] = "message_queue" - print "producing ", n, " messages" - for i in range(n): - channel.message_transfer(destination="amq.direct", content=message) - - print "producing final message" - message = Content("That's done") - message["routing_key"] = "message_queue" - channel.message_transfer(destination="amq.direct", content=message) - - print "consuming sync message" - consumer = "consumer" - queue = client.queue(consumer) - channel.message_subscribe(queue="sync_queue", destination=consumer) - channel.message_flow(consumer, 0, 0xFFFFFFFFL) - channel.message_flow(consumer, 1, 0xFFFFFFFFL) - queue.get(block = True) - print "done" - channel.session_close() - -def consumer(): - import sys - import qpid - from qpid.client import Client - from qpid.content import Content - client = Client("127.0.0.1", 5672) - client.start({"LOGIN": "guest", "PASSWORD": "guest"}) - channel = client.channel(1) - channel.session_open() - consumer = "consumer" - queue = client.queue(consumer) - channel.message_subscribe(queue="message_queue", destination=consumer) - channel.message_flow(consumer, 0, 0xFFFFFFFFL) - channel.message_flow(consumer, 1, 0xFFFFFFFFL) - final = "That's done" - content = "" - message = None - print "getting messages" - while content != final: - message = queue.get(block = True) - content = message.content.body - message.complete(cumulative=True) - - print "consumed all messages" - message = Content("message") - message["routing_key"] = "sync_queue" - channel.message_transfer(destination="amq.direct", content=message) - print "done" - channel.session_close() - -if __name__=='__main__': - import sys - import qpid - from timeit import Timer - from qpid.client import Client - from qpid.content import Content - client = Client("127.0.0.1", 5672) - client.start({"LOGIN": "guest", "PASSWORD": "guest"}) - channel = client.channel(1) - channel.session_open() - channel.queue_declare(queue="message_queue") - channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue") - channel.queue_declare(queue="sync_queue") - channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue") - channel.session_close() - - numMess = 100 - if len(sys.argv) >= 3: - numMess = int(sys.argv[2]) - if len(sys.argv) == 1: - print "error: please specify prod or cons" - elif sys.argv[1] == 'prod': - tprod = Timer("publisher(100)", "from __main__ import publisher") - tp = tprod.timeit(1) - print "produced and consumed" , numMess + 2 ,"messages in: ", tp - elif sys.argv[1] == 'cons': - tcons = Timer("consumer()", "from __main__ import consumer") - tc = tcons.timeit(1) - print "consumed " , numMess ," in: ", tc - else: - print "please specify prod or cons" diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py index 315d581fc6..8674736982 100644 --- a/qpid/python/qmf/console.py +++ b/qpid/python/qmf/console.py @@ -275,7 +275,7 @@ class Object(object): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 - sendCodec = Codec(self._broker.conn.spec) + sendCodec = Codec() seq = self._session.seqMgr._reserve((method, synchronous)) self._broker._setHeader(sendCodec, 'M', seq) self._objectId.encode(sendCodec) @@ -671,7 +671,7 @@ class Session: self.getResult = [] for agent in agentList: broker = agent.broker - sendCodec = Codec(broker.conn.spec) + sendCodec = Codec() try: self.cv.acquire() seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) @@ -749,7 +749,7 @@ class Session: # Send a package request # (effectively inc and dec outstanding by not doing anything) - sendCodec = Codec(broker.conn.spec) + sendCodec = Codec() seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'P', seq) smsg = broker._message(sendCodec.encoded) @@ -770,7 +770,7 @@ class Session: # Send a class request broker._incOutstanding() - sendCodec = Codec(broker.conn.spec) + sendCodec = Codec() seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'Q', seq) sendCodec.write_str8(pname) @@ -815,7 +815,7 @@ class Session: if unknown: # Send a schema request for the unknown class broker._incOutstanding() - sendCodec = Codec(broker.conn.spec) + sendCodec = Codec() seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'S', seq) classKey.encode(sendCodec) @@ -955,7 +955,7 @@ class Session: elif typecode == 19: data = codec.read_int64() # S63 elif typecode == 15: # FTABLE data = {} - sc = Codec(codec.spec, codec.read_vbin32()) + sc = Codec(codec.read_vbin32()) if sc.encoded: count = sc.read_uint32() while count > 0: @@ -986,7 +986,7 @@ class Session: data = self._decodeValue(codec, inner_type_code, broker) elif typecode == 21: # List #taken from codec10.read_list - sc = Codec(codec.spec, codec.read_vbin32()) + sc = Codec(codec.read_vbin32()) count = sc.read_uint32() data = [] while count > 0: @@ -995,7 +995,7 @@ class Session: count -= 1 elif typecode == 22: #Array #taken from codec10.read_array - sc = Codec(codec.spec, codec.read_vbin32()) + sc = Codec(codec.read_vbin32()) count = sc.read_uint32() type = sc.read_uint8() data = [] @@ -1027,7 +1027,7 @@ class Session: elif typecode == 19: codec.write_int64 (int(value)) # S64 elif typecode == 20: value._encodeUnmanaged(codec) # OBJECT elif typecode == 15: # FTABLE - sc = Codec(codec.spec) + sc = Codec() if value is not None: sc.write_uint32(len(value)) for k, v in value.items(): @@ -1039,7 +1039,7 @@ class Session: sc.write_uint32(0) codec.write_vbin32(sc.encoded) elif typecode == 21: # List - sc = Codec(codec.spec) + sc = Codec() self._encodeValue(sc, len(value), 3) for o in value: ltype=self.encoding(o) @@ -1047,7 +1047,7 @@ class Session: self._encodeValue(sc, o, ltype) codec.write_vbin32(sc.encoded) elif typecode == 22: # Array - sc = Codec(codec.spec) + sc = Codec() self._encodeValue(sc, len(value), 3) if len(value) > 0: ltype = self.encoding(value[0]) @@ -1159,7 +1159,7 @@ class Session: for method in schema.getMethods(): if name == method.name: aIdx = 0 - sendCodec = Codec(broker.conn.spec) + sendCodec = Codec() seq = self.seqMgr._reserve((method, False)) broker._setHeader(sendCodec, 'M', seq) objectId.encode(sendCodec) @@ -1690,7 +1690,7 @@ class Broker: self.connected = True self.session._handleBrokerConnect(self) - codec = Codec(self.conn.spec) + codec = Codec() self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) @@ -1809,7 +1809,7 @@ class Broker: self.cv.release() def _replyCb(self, msg): - codec = Codec(self.conn.spec, msg.body) + codec = Codec(msg.body) while True: opcode, seq = self._checkHeader(codec) if opcode == None: return diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test index 3bf0e6ccce..528acaa124 100755 --- a/qpid/python/qpid-python-test +++ b/qpid/python/qpid-python-test @@ -25,6 +25,7 @@ from fnmatch import fnmatchcase as match from getopt import GetoptError from logging import getLogger, StreamHandler, Formatter, Filter, \ WARN, DEBUG, ERROR +from qpid.harness import Skipped from qpid.util import URL levels = { @@ -50,6 +51,8 @@ parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN", parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append", dest="log_categories", default=[], help="log only categories matching CATEGORY pattern") +parser.add_option("-m", "--module", action="append", default=[], + dest="modules", help="add module to test search path") parser.add_option("-i", "--ignore", action="append", default=[], help="ignore tests matching IGNORE pattern") parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append", @@ -101,7 +104,10 @@ for a in args: includes.append(a.strip()) if not includes: - includes.append("*") + if opts.modules: + includes.append("*") + else: + includes.extend(["qpid.tests.*", "tests.*", "tests_0-10.*"]) def is_ignored(path): for p in excludes: @@ -117,15 +123,21 @@ def is_included(path): return True return False +def is_smart(): + return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb" + def width(): - if sys.stdout.isatty(): + if is_smart(): s = struct.pack("HHHH", 0, 0, 0, 0) fd_stdout = sys.stdout.fileno() x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s) rows, cols, xpx, ypx = struct.unpack("HHHH", x) return cols else: - return 80 + try: + return int(os.environ.get("COLUMNS", "80")) + except ValueError: + return 80 WIDTH = width() @@ -142,13 +154,14 @@ def vt100_attrs(*attrs): vt100_reset = vt100_attrs(0) KEYWORDS = {"pass": (32,), + "skip": (33,), "fail": (31,), "start": (34,), "total": (34,), "ignored": (33,), "selected": (34,)} -COLORIZE = sys.stdout.isatty() +COLORIZE = is_smart() def colorize_word(word, text=None): if text is None: @@ -165,9 +178,6 @@ def indent(text): lines = text.split("\n") return " %s" % "\n ".join(lines) -from qpid.testlib import testrunner -testrunner.setBroker(str(config.broker)) - class Interceptor: def __init__(self): @@ -264,16 +274,27 @@ root.setLevel(WARN) log = getLogger("qpid.test") +PASS = "pass" +SKIP = "skip" +FAIL = "fail" + class Runner: def __init__(self): self.exceptions = [] + self.skip = False def passed(self): return not self.exceptions + def skipped(self): + return self.skip + def failed(self): - return self.exceptions + return self.exceptions and not self.skip + + def halt(self): + return self.exceptions or self.skip def run(self, name, phase): try: @@ -281,18 +302,28 @@ class Runner: except KeyboardInterrupt: raise except: - self.exceptions.append((name, sys.exc_info())) + exi = sys.exc_info() + if issubclass(exi[0], Skipped): + self.skip = True + self.exceptions.append((name, exi)) def status(self): if self.passed(): - return "pass" + return PASS + elif self.skipped(): + return SKIP + elif self.failed(): + return FAIL else: - return "fail" + return None def print_exceptions(self): for name, info in self.exceptions: - print "Error during %s:" % name - print indent("".join(traceback.format_exception(*info))).rstrip() + if issubclass(info[0], Skipped): + print indent("".join(traceback.format_exception_only(*info[:2]))).rstrip() + else: + print "Error during %s:" % name + print indent("".join(traceback.format_exception(*info))).rstrip() ST_WIDTH = 8 @@ -329,11 +360,11 @@ def run_test(name, test, config): sys.stdout.write("\n") sys.stdout.write(output) print " %s" % colorize_word(runner.status()) - if runner.failed(): + if runner.failed() or runner.skipped(): runner.print_exceptions() root.setLevel(level) filter.patterns = patterns - return runner.passed() + return runner.status() class FunctionTest: @@ -373,13 +404,13 @@ class MethodTest: if hasattr(inst, "configure"): runner.run("configure", lambda: inst.configure(config)) - if runner.failed(): return runner + if runner.halt(): return runner if hasattr(inst, "setUp"): runner.run("setup", inst.setUp) - if runner.failed(): return runner + if runner.halt(): return runner elif hasattr(inst, "setup"): runner.run("setup", inst.setup) - if runner.failed(): return runner + if runner.halt(): return runner runner.run("test", test) @@ -473,7 +504,9 @@ class Harness: objects.append(child) self.scanned.append(obj) -modules = "qpid.tests", "tests", "tests_0-10" +modules = opts.modules +if not modules: + modules.extend(["qpid.tests", "tests", "tests_0-8", "tests_0-9", "tests_0-10"]) h = Harness() for name in modules: m = __import__(name, None, None, ["dummy"]) @@ -485,13 +518,17 @@ total = len(filtered) + len(ignored) passed = 0 failed = 0 +skipped = 0 for t in filtered: if list_only: print t.name() else: - if t.run(): + st = t.run() + if st == PASS: passed += 1 - else: + elif st == SKIP: + skipped += 1 + elif st == FAIL: failed += 1 if opts.hoe: break @@ -499,6 +536,10 @@ for t in filtered: run = passed + failed if not list_only: + if passed: + _pass = "pass" + else: + _pass = "fail" if failed: outcome = "fail" else: @@ -507,12 +548,22 @@ if not list_only: ign = "ignored" else: ign = "pass" + if skipped: + skip = "skip" + else: + skip = "pass" print colorize("Totals:", 1), \ colorize_word("total", "%s tests" % total) + ",", \ - colorize_word("pass", "%s passed" % passed) + ",", \ + colorize_word(_pass, "%s passed" % passed) + ",", \ + colorize_word(skip, "%s skipped" % skipped) + ",", \ colorize_word(ign, "%s ignored" % len(ignored)) + ",", \ colorize_word(outcome, "%s failed" % failed), if opts.hoe and failed > 0: print " -- (halted after %s)" % run else: print + +if failed or skipped: + sys.exit(1) +else: + sys.exit(0) diff --git a/qpid/python/qpid/assembler.py b/qpid/python/qpid/assembler.py deleted file mode 100644 index 92bb0aa0f8..0000000000 --- a/qpid/python/qpid/assembler.py +++ /dev/null @@ -1,118 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from codec010 import StringCodec -from framer import * -from logging import getLogger - -log = getLogger("qpid.io.seg") - -class Segment: - - def __init__(self, first, last, type, track, channel, payload): - self.id = None - self.offset = None - self.first = first - self.last = last - self.type = type - self.track = track - self.channel = channel - self.payload = payload - - def decode(self, spec): - segs = spec["segment_type"] - choice = segs.choices[self.type] - return getattr(self, "decode_%s" % choice.name)(spec) - - def decode_control(self, spec): - sc = StringCodec(spec, self.payload) - return sc.read_control() - - def decode_command(self, spec): - sc = StringCodec(spec, self.payload) - hdr, cmd = sc.read_command() - cmd.id = self.id - return hdr, cmd - - def decode_header(self, spec): - sc = StringCodec(spec, self.payload) - values = [] - while len(sc.encoded) > 0: - values.append(sc.read_struct32()) - return values - - def decode_body(self, spec): - return self.payload - - def __str__(self): - return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, - self.track, self.channel, self.payload) - - def __repr__(self): - return str(self) - -class Assembler(Framer): - - def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD): - Framer.__init__(self, sock) - self.max_payload = max_payload - self.fragments = {} - - def read_segment(self): - while True: - frame = self.read_frame() - - key = (frame.channel, frame.track) - seg = self.fragments.get(key) - if seg == None: - seg = Segment(frame.isFirstSegment(), frame.isLastSegment(), - frame.type, frame.track, frame.channel, "") - self.fragments[key] = seg - - seg.payload += frame.payload - - if frame.isLastFrame(): - self.fragments.pop(key) - log.debug("RECV %s", seg) - return seg - - def write_segment(self, segment): - remaining = segment.payload - - first = True - while first or remaining: - payload = remaining[:self.max_payload] - remaining = remaining[self.max_payload:] - - flags = 0 - if first: - flags |= FIRST_FRM - first = False - if not remaining: - flags |= LAST_FRM - if segment.first: - flags |= FIRST_SEG - if segment.last: - flags |= LAST_SEG - - frame = Frame(flags, segment.type, segment.track, segment.channel, - payload) - self.write_frame(frame) - - log.debug("SENT %s", segment) diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index 4605710de8..6107a4bc35 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -39,11 +39,8 @@ class Client: if spec: self.spec = spec else: - try: - name = os.environ["AMQP_SPEC"] - except KeyError: - raise EnvironmentError("environment variable AMQP_SPEC must be set") - self.spec = load(name) + from qpid_config import amqp_spec_0_9 + self.spec = load(amqp_spec_0_9) self.structs = StructFactory(self.spec) self.sessions = {} diff --git a/qpid/python/qpid/codec010.py b/qpid/python/qpid/codec010.py index f07362c38d..682743df19 100644 --- a/qpid/python/qpid/codec010.py +++ b/qpid/python/qpid/codec010.py @@ -20,23 +20,66 @@ import datetime from packer import Packer from datatypes import serial, timestamp, RangedSet, Struct, UUID +from ops import Compound, PRIMITIVE, COMPOUND class CodecException(Exception): pass +def direct(t): + return lambda x: t + +def map_str(s): + for c in s: + if ord(c) >= 0x80: + return "vbin16" + return "str16" + class Codec(Packer): - def __init__(self, spec): - self.spec = spec + ENCODINGS = { + unicode: direct("str16"), + str: map_str, + buffer: direct("vbin32"), + int: direct("int64"), + long: direct("int64"), + float: direct("double"), + None.__class__: direct("void"), + list: direct("list"), + tuple: direct("list"), + dict: direct("map"), + timestamp: direct("datetime"), + datetime.datetime: direct("datetime"), + UUID: direct("uuid"), + Compound: direct("struct32") + } + + def encoding(self, obj): + enc = self._encoding(obj.__class__, obj) + if enc is None: + raise CodecException("no encoding for %r" % obj) + return PRIMITIVE[enc] + + def _encoding(self, klass, obj): + if self.ENCODINGS.has_key(klass): + return self.ENCODINGS[klass](obj) + for base in klass.__bases__: + result = self._encoding(base, obj) + if result != None: + return result + + def read_primitive(self, type): + return getattr(self, "read_%s" % type.NAME)() + def write_primitive(self, type, v): + getattr(self, "write_%s" % type.NAME)(v) - def write_void(self, v): - assert v == None def read_void(self): return None + def write_void(self, v): + assert v == None - def write_bit(self, b): - if not b: raise ValueError(b) def read_bit(self): return True + def write_bit(self, b): + if not b: raise ValueError(b) def read_uint8(self): return self.unpack("!B") @@ -172,20 +215,8 @@ class Codec(Packer): self.write_uint32(len(b)) self.write(b) - def write_map(self, m): - sc = StringCodec(self.spec) - if m is not None: - sc.write_uint32(len(m)) - for k, v in m.items(): - type = self.spec.encoding(v) - if type == None: - raise CodecException("no encoding for %s" % v.__class__) - sc.write_str8(k) - sc.write_uint8(type.code) - type.encode(sc, v) - self.write_vbin32(sc.encoded) def read_map(self): - sc = StringCodec(self.spec, self.read_vbin32()) + sc = StringCodec(self.read_vbin32()) if not sc.encoded: return None count = sc.read_uint32() @@ -193,91 +224,132 @@ class Codec(Packer): while sc.encoded: k = sc.read_str8() code = sc.read_uint8() - type = self.spec.types[code] - v = type.decode(sc) + type = PRIMITIVE[code] + v = sc.read_primitive(type) result[k] = v return result + def write_map(self, m): + sc = StringCodec() + if m is not None: + sc.write_uint32(len(m)) + for k, v in m.items(): + type = self.encoding(v) + sc.write_str8(k) + sc.write_uint8(type.CODE) + sc.write_primitive(type, v) + self.write_vbin32(sc.encoded) + def read_array(self): + sc = StringCodec(self.read_vbin32()) + if not sc.encoded: + return None + type = PRIMITIVE[sc.read_uint8()] + count = sc.read_uint32() + result = [] + while count > 0: + result.append(sc.read_primitive(type)) + count -= 1 + return result def write_array(self, a): - sc = StringCodec(self.spec) + sc = StringCodec() if a is not None: if len(a) > 0: - type = self.spec.encoding(a[0]) + type = self.encoding(a[0]) else: - type = self.spec.encoding(None) - sc.write_uint8(type.code) + type = self.encoding(None) + sc.write_uint8(type.CODE) sc.write_uint32(len(a)) for o in a: - type.encode(sc, o) + sc.write_primitive(type, o) self.write_vbin32(sc.encoded) - def read_array(self): - sc = StringCodec(self.spec, self.read_vbin32()) + + def read_list(self): + sc = StringCodec(self.read_vbin32()) if not sc.encoded: return None - type = self.spec.types[sc.read_uint8()] count = sc.read_uint32() result = [] while count > 0: - result.append(type.decode(sc)) + type = PRIMITIVE[sc.read_uint8()] + result.append(sc.read_primitive(type)) count -= 1 return result - def write_list(self, l): - sc = StringCodec(self.spec) + sc = StringCodec() if l is not None: sc.write_uint32(len(l)) for o in l: - type = self.spec.encoding(o) - sc.write_uint8(type.code) - type.encode(sc, o) + type = self.encoding(o) + sc.write_uint8(type.CODE) + sc.write_primitive(type, o) self.write_vbin32(sc.encoded) - def read_list(self): - sc = StringCodec(self.spec, self.read_vbin32()) - if not sc.encoded: - return None - count = sc.read_uint32() - result = [] - while count > 0: - type = self.spec.types[sc.read_uint8()] - result.append(type.decode(sc)) - count -= 1 - return result def read_struct32(self): size = self.read_uint32() code = self.read_uint16() - type = self.spec.structs[code] - fields = type.decode_fields(self) - return Struct(type, **fields) + cls = COMPOUND[code] + op = cls() + self.read_fields(op) + return op def write_struct32(self, value): - sc = StringCodec(self.spec) - sc.write_uint16(value._type.code) - value._type.encode_fields(sc, value) - self.write_vbin32(sc.encoded) - - def read_control(self): - cntrl = self.spec.controls[self.read_uint16()] - return Struct(cntrl, **cntrl.decode_fields(self)) - def write_control(self, ctrl): - type = ctrl._type - self.write_uint16(type.code) - type.encode_fields(self, ctrl) - - def read_command(self): - type = self.spec.commands[self.read_uint16()] - hdr = self.spec["session.header"].decode(self) - cmd = Struct(type, **type.decode_fields(self)) - return hdr, cmd - def write_command(self, hdr, cmd): - self.write_uint16(cmd._type.code) - hdr._type.encode(self, hdr) - cmd._type.encode_fields(self, cmd) + self.write_compound(value) + + def read_compound(self, cls): + size = self.read_size(cls.SIZE) + if cls.CODE is not None: + code = self.read_uint16() + assert code == cls.CODE + op = cls() + self.read_fields(op) + return op + def write_compound(self, op): + sc = StringCodec() + if op.CODE is not None: + sc.write_uint16(op.CODE) + sc.write_fields(op) + self.write_size(op.SIZE, len(sc.encoded)) + self.write(sc.encoded) + + def read_fields(self, op): + flags = 0 + for i in range(op.PACK): + flags |= (self.read_uint8() << 8*i) + + for i in range(len(op.FIELDS)): + f = op.FIELDS[i] + if flags & (0x1 << i): + if COMPOUND.has_key(f.type): + value = self.read_compound(COMPOUND[f.type]) + else: + value = getattr(self, "read_%s" % f.type)() + setattr(op, f.name, value) + def write_fields(self, op): + flags = 0 + for i in range(len(op.FIELDS)): + f = op.FIELDS[i] + value = getattr(op, f.name) + if f.type == "bit": + present = value + else: + present = value != None + if present: + flags |= (0x1 << i) + for i in range(op.PACK): + self.write_uint8((flags >> 8*i) & 0xFF) + for i in range(len(op.FIELDS)): + f = op.FIELDS[i] + if flags & (0x1 << i): + if COMPOUND.has_key(f.type): + enc = self.write_compound + else: + enc = getattr(self, "write_%s" % f.type) + value = getattr(op, f.name) + enc(value) def read_size(self, width): if width > 0: attr = "read_uint%d" % (width*8) return getattr(self, attr)() - def write_size(self, width, n): if width > 0: attr = "write_uint%d" % (width*8) @@ -285,7 +357,6 @@ class Codec(Packer): def read_uuid(self): return UUID(self.unpack("16s")) - def write_uuid(self, s): if isinstance(s, UUID): s = s.bytes @@ -293,7 +364,6 @@ class Codec(Packer): def read_bin128(self): return self.unpack("16s") - def write_bin128(self, b): self.pack("16s", b) @@ -301,14 +371,13 @@ class Codec(Packer): class StringCodec(Codec): - def __init__(self, spec, encoded = ""): - Codec.__init__(self, spec) + def __init__(self, encoded = ""): self.encoded = encoded - def write(self, s): - self.encoded += s - def read(self, n): result = self.encoded[:n] self.encoded = self.encoded[n:] return result + + def write(self, s): + self.encoded += s diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index 5abab3802c..680f8f62e3 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -20,14 +20,14 @@ import datatypes, session from threading import Thread, Condition, RLock from util import wait, notify -from assembler import Assembler, Segment from codec010 import StringCodec +from framing import * from session import Session from generator import control_invoker from spec import SPEC from exceptions import * from logging import getLogger -import delegates +import delegates, socket class ChannelBusy(Exception): pass @@ -43,12 +43,12 @@ def client(*args, **kwargs): def server(*args, **kwargs): return delegates.Server(*args, **kwargs) -class Connection(Assembler): +from framer import Framer - def __init__(self, sock, spec=SPEC, delegate=client, **args): - Assembler.__init__(self, sock) - self.spec = spec +class Connection(Framer): + def __init__(self, sock, delegate=client, **args): + Framer.__init__(self, sock) self.lock = RLock() self.attached = {} self.sessions = {} @@ -66,6 +66,10 @@ class Connection(Assembler): self.channel_max = 65535 + self.op_enc = OpEncoder() + self.seg_enc = SegmentEncoder() + self.frame_enc = FrameEncoder() + self.delegate = delegate(self, **args) def attach(self, name, ch, delegate, force=False): @@ -145,15 +149,44 @@ class Connection(Assembler): raise ConnectionFailed(*self.close_code) def run(self): + frame_dec = FrameDecoder() + seg_dec = SegmentDecoder() + op_dec = OpDecoder() + while not self.closed: try: - seg = self.read_segment() - except Closed: + data = self.sock.recv(64*1024) + if not data: + self.detach_all() + break + except socket.timeout: + if self.aborted(): + self.detach_all() + raise Closed("connection timed out") + else: + continue + except socket.error, e: self.detach_all() - break - self.delegate.received(seg) + raise Closed(e) + frame_dec.write(data) + seg_dec.write(*frame_dec.read()) + op_dec.write(*seg_dec.read()) + for op in op_dec.read(): + self.delegate.received(op) self.sock.close() + def write_op(self, op): + self.sock_lock.acquire() + try: + self.op_enc.write(op) + self.seg_enc.write(*self.op_enc.read()) + self.frame_enc.write(*self.seg_enc.read()) + bytes = self.frame_enc.read() + self.write(bytes) + self.flush() + finally: + self.sock_lock.release() + def close(self, timeout=None): if not self.opened: return Channel(self, 0).connection_close(200) @@ -169,19 +202,17 @@ class Connection(Assembler): log = getLogger("qpid.io.ctl") -class Channel(control_invoker(SPEC)): +class Channel(control_invoker()): def __init__(self, connection, id): self.connection = connection self.id = id self.session = None - def invoke(self, type, args, kwargs): - ctl = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_control(ctl) - self.connection.write_segment(Segment(True, True, type.segment_type, - type.track, self.id, sc.encoded)) + def invoke(self, op, args, kwargs): + ctl = op(*args, **kwargs) + ctl.channel = self.id + self.connection.write_op(ctl) log.debug("SENT %s", ctl) def __str__(self): diff --git a/qpid/python/qpid/connection08.py b/qpid/python/qpid/connection08.py index be94a792cb..d34cfe2847 100644 --- a/qpid/python/qpid/connection08.py +++ b/qpid/python/qpid/connection08.py @@ -28,6 +28,7 @@ from cStringIO import StringIO from spec import load from codec import EOF from compat import SHUT_RDWR +from exceptions import VersionError class SockIO: @@ -73,6 +74,9 @@ def listen(host, port, predicate = lambda: True): s, a = sock.accept() yield SockIO(s) +class FramingError(Exception): + pass + class Connection: def __init__(self, io, spec): @@ -107,7 +111,16 @@ class Connection: def read_8_0(self): c = self.codec - type = self.spec.constants.byid[c.decode_octet()].name + tid = c.decode_octet() + try: + type = self.spec.constants.byid[tid].name + except KeyError: + if tid == ord('A') and c.unpack("!3s") == "MQP": + _, _, major, minor = c.unpack("4B") + raise VersionError("client: %s-%s, server: %s-%s" % + (self.spec.major, self.spec.minor, major, minor)) + else: + raise FramingError("unknown frame type: %s" % tid) channel = c.decode_short() body = c.decode_longstr() dec = codec.Codec(StringIO(body), self.spec) diff --git a/qpid/python/qpid/datatypes.py b/qpid/python/qpid/datatypes.py index 4cd3fade2c..bba3f5b9ab 100644 --- a/qpid/python/qpid/datatypes.py +++ b/qpid/python/qpid/datatypes.py @@ -84,7 +84,7 @@ class Message: def get(self, name): if self.headers: for h in self.headers: - if h._type.name == name: + if h.NAME == name: return h return None @@ -93,7 +93,7 @@ class Message: self.headers = [] idx = 0 while idx < len(self.headers): - if self.headers[idx]._type == header._type: + if self.headers[idx].NAME == header.NAME: self.headers[idx] = header return idx += 1 @@ -102,7 +102,7 @@ class Message: def clear(self, name): idx = 0 while idx < len(self.headers): - if self.headers[idx]._type.name == name: + if self.headers[idx].NAME == name: del self.headers[idx] return idx += 1 diff --git a/qpid/python/qpid/delegates.py b/qpid/python/qpid/delegates.py index 82bbe67ede..c74cc5a945 100644 --- a/qpid/python/qpid/delegates.py +++ b/qpid/python/qpid/delegates.py @@ -20,7 +20,9 @@ import os, connection, session from util import notify from datatypes import RangedSet +from exceptions import VersionError from logging import getLogger +from ops import Control import sys log = getLogger("qpid.io.ctl") @@ -29,26 +31,22 @@ class Delegate: def __init__(self, connection, delegate=session.client): self.connection = connection - self.spec = connection.spec self.delegate = delegate - self.control = self.spec["track.control"].value - def received(self, seg): - ssn = self.connection.attached.get(seg.channel) + def received(self, op): + ssn = self.connection.attached.get(op.channel) if ssn is None: - ch = connection.Channel(self.connection, seg.channel) + ch = connection.Channel(self.connection, op.channel) else: ch = ssn.channel - if seg.track == self.control: - ctl = seg.decode(self.spec) - log.debug("RECV %s", ctl) - attr = ctl._type.qname.replace(".", "_") - getattr(self, attr)(ch, ctl) + if isinstance(op, Control): + log.debug("RECV %s", op) + getattr(self, op.NAME)(ch, op) elif ssn is None: ch.session_detached() else: - ssn.received(seg) + ssn.received(op) def connection_close(self, ch, close): self.connection.close_code = (close.reply_code, close.reply_text) @@ -124,7 +122,8 @@ class Server(Delegate): def start(self): self.connection.read_header() - self.connection.write_header(self.spec.major, self.spec.minor) + # XXX + self.connection.write_header(0, 10) connection.Channel(self.connection, 0).connection_start(mechanisms=["ANONYMOUS"]) def connection_start_ok(self, ch, start_ok): @@ -156,8 +155,14 @@ class Client(Delegate): self.heartbeat = heartbeat def start(self): - self.connection.write_header(self.spec.major, self.spec.minor) - self.connection.read_header() + # XXX + cli_major = 0 + cli_minor = 10 + self.connection.write_header(cli_major, cli_minor) + magic, _, _, major, minor = self.connection.read_header() + if not (magic == "AMQP" and major == cli_major and minor == cli_minor): + raise VersionError("client: %s-%s, server: %s-%s" % + (cli_major, cli_minor, major, minor)) def connection_start(self, ch, start): r = "\0%s\0%s" % (self.username, self.password) diff --git a/qpid/python/qpid/exceptions.py b/qpid/python/qpid/exceptions.py index 7eaaf81ed4..2bd80b7ffe 100644 --- a/qpid/python/qpid/exceptions.py +++ b/qpid/python/qpid/exceptions.py @@ -19,3 +19,4 @@ class Closed(Exception): pass class Timeout(Exception): pass +class VersionError(Exception): pass diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py index 0d82e4378b..4cd0ae6f26 100644 --- a/qpid/python/qpid/framer.py +++ b/qpid/python/qpid/framer.py @@ -26,48 +26,6 @@ from logging import getLogger raw = getLogger("qpid.io.raw") frm = getLogger("qpid.io.frm") -FIRST_SEG = 0x08 -LAST_SEG = 0x04 -FIRST_FRM = 0x02 -LAST_FRM = 0x01 - -class Frame: - - HEADER = "!2BHxBH4x" - HEADER_SIZE = struct.calcsize(HEADER) - MAX_PAYLOAD = 65535 - struct.calcsize(HEADER) - - def __init__(self, flags, type, track, channel, payload): - if len(payload) > Frame.MAX_PAYLOAD: - raise ValueError("max payload size exceeded: %s" % len(payload)) - self.flags = flags - self.type = type - self.track = track - self.channel = channel - self.payload = payload - - def isFirstSegment(self): - return bool(FIRST_SEG & self.flags) - - def isLastSegment(self): - return bool(LAST_SEG & self.flags) - - def isFirstFrame(self): - return bool(FIRST_FRM & self.flags) - - def isLastFrame(self): - return bool(LAST_FRM & self.flags) - - def __repr__(self): - return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()), - int(self.isLastSegment()), - int(self.isFirstFrame()), - int(self.isLastFrame()), - self.type, - self.track, - self.channel, - self.payload) - class FramingError(Exception): pass class Framer(Packer): @@ -137,24 +95,3 @@ class Framer(Packer): self.flush() finally: self.sock_lock.release() - - def write_frame(self, frame): - self.sock_lock.acquire() - try: - size = len(frame.payload) + struct.calcsize(Frame.HEADER) - track = frame.track & 0x0F - self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) - self.write(frame.payload) - if frame.isLastSegment() and frame.isLastFrame(): - self.flush() - frm.debug("SENT %s", frame) - finally: - self.sock_lock.release() - - def read_frame(self): - flags, type, size, track, channel = self.unpack(Frame.HEADER) - if flags & 0xF0: raise FramingError() - payload = self.read(size - struct.calcsize(Frame.HEADER)) - frame = Frame(flags, type, track, channel, payload) - frm.debug("RECV %s", frame) - return frame diff --git a/qpid/python/qpid/framing.py b/qpid/python/qpid/framing.py index 7c5f68fbcc..0a8f26272c 100644 --- a/qpid/python/qpid/framing.py +++ b/qpid/python/qpid/framing.py @@ -18,8 +18,64 @@ # import struct -from qpid.framer import Frame, FIRST_SEG, LAST_SEG, FIRST_FRM, LAST_FRM -from qpid.assembler import Segment + +FIRST_SEG = 0x08 +LAST_SEG = 0x04 +FIRST_FRM = 0x02 +LAST_FRM = 0x01 + +class Frame: + + HEADER = "!2BHxBH4x" + HEADER_SIZE = struct.calcsize(HEADER) + MAX_PAYLOAD = 65535 - struct.calcsize(HEADER) + + def __init__(self, flags, type, track, channel, payload): + if len(payload) > Frame.MAX_PAYLOAD: + raise ValueError("max payload size exceeded: %s" % len(payload)) + self.flags = flags + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def isFirstSegment(self): + return bool(FIRST_SEG & self.flags) + + def isLastSegment(self): + return bool(LAST_SEG & self.flags) + + def isFirstFrame(self): + return bool(FIRST_FRM & self.flags) + + def isLastFrame(self): + return bool(LAST_FRM & self.flags) + + def __repr__(self): + return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()), + int(self.isLastSegment()), + int(self.isFirstFrame()), + int(self.isLastFrame()), + self.type, + self.track, + self.channel, + self.payload) + +class Segment: + + def __init__(self, first, last, type, track, channel, payload): + self.id = None + self.offset = None + self.first = first + self.last = last + self.type = type + self.track = track + self.channel = channel + self.payload = payload + + def __repr__(self): + return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type, + self.track, self.channel, self.payload) class FrameDecoder: @@ -140,3 +196,115 @@ class SegmentEncoder: result = self.frames self.frames = [] return result + +from ops import COMMANDS, CONTROLS, COMPOUND, Header, segment_type, track +from spec import SPEC + +from codec010 import StringCodec + +class OpEncoder: + + def __init__(self): + self.segments = [] + + def write(self, *ops): + for op in ops: + if COMMANDS.has_key(op.NAME): + seg_type = segment_type.command + seg_track = track.command + enc = self.encode_command(op) + elif CONTROLS.has_key(op.NAME): + seg_type = segment_type.control + seg_track = track.control + enc = self.encode_compound(op) + else: + raise ValueError(op) + seg = Segment(True, False, seg_type, seg_track, op.channel, enc) + self.segments.append(seg) + if hasattr(op, "headers") and op.headers is not None: + hdrs = "" + for h in op.headers: + hdrs += self.encode_compound(h) + seg = Segment(False, False, segment_type.header, seg_track, op.channel, + hdrs) + self.segments.append(seg) + if hasattr(op, "payload") and op.payload is not None: + self.segments.append(Segment(False, False, segment_type.body, seg_track, + op.channel, op.payload)) + self.segments[-1].last = True + + def encode_command(self, cmd): + sc = StringCodec() + sc.write_uint16(cmd.CODE) + sc.write_compound(Header(sync=cmd.sync)) + sc.write_fields(cmd) + return sc.encoded + + def encode_compound(self, op): + sc = StringCodec() + sc.write_compound(op) + return sc.encoded + + def read(self): + result = self.segments + self.segments = [] + return result + +class OpDecoder: + + def __init__(self): + self.op = None + self.ops = [] + + def write(self, *segments): + for seg in segments: + if seg.first: + if seg.type == segment_type.command: + self.op = self.decode_command(seg.payload) + elif seg.type == segment_type.control: + self.op = self.decode_control(seg.payload) + else: + raise ValueError(seg) + self.op.channel = seg.channel + elif seg.type == segment_type.header: + if self.op.headers is None: + self.op.headers = [] + self.op.headers.extend(self.decode_headers(seg.payload)) + elif seg.type == segment_type.body: + if self.op.payload is None: + self.op.payload = seg.payload + else: + self.op.payload += seg.payload + if seg.last: + self.ops.append(self.op) + self.op = None + + def decode_command(self, encoded): + sc = StringCodec(encoded) + code = sc.read_uint16() + cls = COMMANDS[code] + hdr = sc.read_compound(Header) + cmd = cls() + sc.read_fields(cmd) + cmd.sync = hdr.sync + return cmd + + def decode_control(self, encoded): + sc = StringCodec(encoded) + code = sc.read_uint16() + cls = CONTROLS[code] + ctl = cls() + sc.read_fields(ctl) + return ctl + + def decode_headers(self, encoded): + sc = StringCodec(encoded) + result = [] + while sc.encoded: + result.append(sc.read_struct32()) + return result + + def read(self): + result = self.ops + self.ops = [] + return result diff --git a/qpid/python/qpid/generator.py b/qpid/python/qpid/generator.py index 729425d6a3..02d11e5005 100644 --- a/qpid/python/qpid/generator.py +++ b/qpid/python/qpid/generator.py @@ -19,42 +19,38 @@ import sys -from spec010 import Control +from ops import * -def METHOD(module, inst): - method = lambda self, *args, **kwargs: self.invoke(inst, args, kwargs) +def METHOD(module, op): + method = lambda self, *args, **kwargs: self.invoke(op, args, kwargs) if sys.version_info[:2] > (2, 3): - method.__name__ = str(inst.pyname) - method.__doc__ = str(inst.pydoc) + method.__name__ = op.__name__ + method.__doc__ = op.__doc__ method.__module__ = module return method -def generate(spec, module, predicate=lambda x: True): - dict = {"spec": spec} +def generate(module, operations): + dict = {} - for name, enum in spec.enums.items(): - dict[name] = enum + for name, enum in ENUMS.items(): + if isinstance(name, basestring): + dict[name] = enum - for name, st in spec.structs_by_name.items(): - dict[name] = METHOD(module, st) + for name, op in COMPOUND.items(): + if isinstance(name, basestring): + dict[name] = METHOD(module, op) - for st in spec.structs.values(): - dict[st.name] = METHOD(module, st) - - for name, inst in spec.instructions.items(): - if predicate(inst): - dict[name] = METHOD(module, inst) + for name, op in operations.items(): + if isinstance(name, basestring): + dict[name] = METHOD(module, op) return dict -def invoker(name, spec, predicate=lambda x: True): - return type("%s_%s_%s" % (name, spec.major, spec.minor), - (), generate(spec, invoker.__module__, predicate)) +def invoker(name, operations): + return type(name, (), generate(invoker.__module__, operations)) -def command_invoker(spec): - is_command = lambda cmd: cmd.track == spec["track.command"].value - return invoker("CommandInvoker", spec, is_command) +def command_invoker(): + return invoker("CommandInvoker", COMMANDS) -def control_invoker(spec): - is_control = lambda inst: isinstance(inst, Control) - return invoker("ControlInvoker", spec, is_control) +def control_invoker(): + return invoker("ControlInvoker", CONTROLS) diff --git a/qpid/python/run-tests b/qpid/python/qpid/harness.py index 84b76ebfc1..ce48481612 100755..100644 --- a/qpid/python/run-tests +++ b/qpid/python/qpid/harness.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,18 +17,4 @@ # under the License. # -import sys, logging -from qpid.testlib import testrunner -from qpid.log import enable, WARN, DEBUG - -if "-vv" in sys.argv: - level = DEBUG -else: - level = WARN - -enable("qpid", level) - -if not testrunner.run(): sys.exit(1) - - - +class Skipped(Exception): pass diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index f06ef87709..9b3fecbf9b 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -34,8 +34,8 @@ import connection, time, socket, sys, traceback from codec010 import StringCodec from datatypes import timestamp, uuid4, RangedSet, Message as Message010 from logging import getLogger +from ops import PRIMITIVE from session import Client, INCOMPLETE -from spec import SPEC from threading import Thread, RLock, Condition from util import connect @@ -191,9 +191,12 @@ class Connection(Lockable): try: self._socket = connect(self.host, self.port) except socket.error, e: - raise ConnectError(*e.args) + raise ConnectError(e) self._conn = connection.Connection(self._socket) - self._conn.start() + try: + self._conn.start() + except connection.VersionError, e: + raise ConnectError(e) for ssn in self.sessions.values(): ssn._attach() @@ -263,8 +266,8 @@ FILTER_DEFAULTS = { def delegate(session): class Delegate(Client): - def message_transfer(self, cmd, headers, body): - session._message_transfer(cmd, headers, body) + def message_transfer(self, cmd): + session._message_transfer(cmd) return Delegate class Session(Lockable): @@ -314,9 +317,9 @@ class Session(Lockable): link._disconnected() @synchronized - def _message_transfer(self, cmd, headers, body): - m = Message010(body) - m.headers = headers + def _message_transfer(self, cmd): + m = Message010(cmd.payload) + m.headers = cmd.headers m.id = cmd.id msg = self._decode(m) rcv = self.receivers[int(cmd.destination)] @@ -812,16 +815,16 @@ class Receiver(Lockable): def codec(name): - type = SPEC.named[name] + type = PRIMITIVE[name] def encode(x): - sc = StringCodec(SPEC) - type.encode(sc, x) + sc = StringCodec() + sc.write_primitive(type, x) return sc.encoded def decode(x): - sc = StringCodec(SPEC, x) - return type.decode(sc) + sc = StringCodec(x) + return sc.read_primitive(type) return encode, decode diff --git a/qpid/python/qpid/ops.py b/qpid/python/qpid/ops.py new file mode 100644 index 0000000000..447f9953df --- /dev/null +++ b/qpid/python/qpid/ops.py @@ -0,0 +1,283 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os, mllib, cPickle as pickle +from util import fill + +class Primitive(object): + pass + +class Enum(object): + pass + +class Field: + + def __init__(self, name, type, default=None): + self.name = name + self.type = type + self.default = default + + def __repr__(self): + return "%s: %s" % (self.name, self.type) + +class Compound(object): + + UNENCODED=[] + + def __init__(self, *args, **kwargs): + args = list(args) + for f in self.ARGS: + if args: + a = args.pop(0) + else: + a = kwargs.pop(f.name, f.default) + setattr(self, f.name, a) + if args: + raise TypeError("%s takes at most %s arguments (%s given))" % + (self.__class__.__name__, len(self.ARGS), + len(self.ARGS) + len(args))) + if kwargs: + raise TypeError("got unexpected keyword argument '%s'" % kwargs.keys()[0]) + + def fields(self): + result = {} + for f in self.FIELDS: + result[f.name] = getattr(self, f.name) + return result + + def args(self): + result = {} + for f in self.ARGS: + result[f.name] = getattr(self, f.name) + return result + + def __getitem__(self, attr): + return getattr(self, attr) + + def __setitem__(self, attr, value): + setattr(self, attr, value) + + def dispatch(self, target, *args): + handler = "do_%s" % self.NAME + if hasattr(target, handler): + getattr(target, handler)(self, *args) + else: + print "UNHANDLED:", target, args + + def __repr__(self, extras=()): + return "%s(%s)" % (self.__class__.__name__, + ", ".join(["%s=%r" % (f.name, getattr(self, f.name)) + for f in self.ARGS + if getattr(self, f.name) is not f.default])) + +class Command(Compound): + UNENCODED=[Field("channel", "uint16", 0), + Field("id", "sequence-no", None), + Field("sync", "bit", False), + Field("headers", None, None), + Field("payload", None, None)] + +class Control(Compound): + UNENCODED=[Field("channel", "uint16", 0)] + +def pythonize(st): + if st is None: + return None + else: + return str(st.replace("-", "_")) + +def pydoc(op, children=()): + doc = "\n\n".join([fill(p.text(), 0) for p in op.query["doc"]]) + for ch in children: + doc += "\n\n " + pythonize(ch["@name"]) + " -- " + str(ch["@label"]) + ch_descs ="\n\n".join([fill(p.text(), 4) for p in ch.query["doc"]]) + if ch_descs: + doc += "\n\n" + ch_descs + return doc + +def studly(st): + return "".join([p.capitalize() for p in st.split("-")]) + +def klass(nd): + while nd.parent is not None: + if hasattr(nd.parent, "name") and nd.parent.name == "class": + return nd.parent + else: + nd = nd.parent + +def included(nd): + cls = klass(nd) + if cls is None: + return True + else: + return cls["@name"] not in ("file", "stream") + +def num(s): + if s: return int(s, 0) + +def code(nd): + c = num(nd["@code"]) + if c is None: + return None + else: + cls = klass(nd) + if cls is None: + return c + else: + return c | (num(cls["@code"]) << 8) + +def default(f): + if f["@type"] == "bit": + return False + else: + return None + +def make_compound(decl, base): + dict = {} + fields = decl.query["field"] + dict["__doc__"] = pydoc(decl, fields) + dict["NAME"] = pythonize(decl["@name"]) + dict["SIZE"] = num(decl["@size"]) + dict["CODE"] = code(decl) + dict["PACK"] = num(decl["@pack"]) + dict["FIELDS"] = [Field(pythonize(f["@name"]), resolve(f), default(f)) for f in fields] + dict["ARGS"] = dict["FIELDS"] + base.UNENCODED + return str(studly(decl["@name"])), (base,), dict + +def make_restricted(decl): + name = pythonize(decl["@name"]) + dict = {} + choices = decl.query["choice"] + dict["__doc__"] = pydoc(decl, choices) + dict["NAME"] = name + dict["TYPE"] = str(decl.parent["@type"]) + values = [] + for ch in choices: + val = int(ch["@value"], 0) + dict[pythonize(ch["@name"])] = val + values.append(val) + dict["VALUES"] = values + return name, (Enum,), dict + +def make_type(decl): + name = pythonize(decl["@name"]) + dict = {} + dict["__doc__"] = pydoc(decl) + dict["NAME"] = name + dict["CODE"] = code(decl) + return str(studly(decl["@name"])), (Primitive,), dict + +def make_command(decl): + decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"])) + decl.set_attr("size", "0") + decl.set_attr("pack", "2") + name, bases, dict = make_compound(decl, Command) + dict["RESULT"] = pythonize(decl["result/@type"]) or pythonize(decl["result/struct/@name"]) + return name, bases, dict + +def make_control(decl): + decl.set_attr("name", "%s-%s" % (decl.parent["@name"], decl["@name"])) + decl.set_attr("size", "0") + decl.set_attr("pack", "2") + return make_compound(decl, Control) + +def make_struct(decl): + return make_compound(decl, Compound) + +def make_enum(decl): + decl.set_attr("name", decl.parent["@name"]) + return make_restricted(decl) + + +vars = globals() + +def make(nd): + return vars["make_%s" % nd.name](nd) + +from qpid_config import amqp_spec as file +pclfile = "%s.ops.pcl" % file + +if False and (os.path.exists(pclfile) and + os.path.getmtime(pclfile) > os.path.getmtime(file)): + f = open(pclfile, "read") + types = pickle.load(f) + f.close() +else: + spec = mllib.xml_parse(file) + + def qualify(nd, field="@name"): + cls = klass(nd) + if cls is None: + return pythonize(nd[field]) + else: + return pythonize("%s.%s" % (cls["@name"], nd[field])) + + domains = dict([(qualify(d), pythonize(d["@type"])) + for d in spec.query["amqp/domain", included] + \ + spec.query["amqp/class/domain", included]]) + + def resolve(nd): + candidates = qualify(nd, "@type"), pythonize(nd["@type"]) + for c in candidates: + if domains.has_key(c): + while domains.has_key(c): + c = domains[c] + return c + else: + return c + + type_decls = \ + spec.query["amqp/class/command", included] + \ + spec.query["amqp/class/control", included] + \ + spec.query["amqp/class/command/result/struct", included] + \ + spec.query["amqp/class/struct", included] + \ + spec.query["amqp/class/domain/enum", included] + \ + spec.query["amqp/domain/enum", included] + \ + spec.query["amqp/type"] + types = [make(nd) for nd in type_decls] + + if os.access(os.path.dirname(os.path.abspath(pclfile)), os.W_OK): + f = open(pclfile, "write") + pickle.dump(types, f) + f.close() + +ENUMS = {} +PRIMITIVE = {} +COMPOUND = {} +COMMANDS = {} +CONTROLS = {} + +for name, bases, dict in types: + t = type(name, bases, dict) + vars[name] = t + + if issubclass(t, Command): + COMMANDS[t.NAME] = t + COMMANDS[t.CODE] = t + elif issubclass(t, Control): + CONTROLS[t.NAME] = t + CONTROLS[t.CODE] = t + elif issubclass(t, Compound): + COMPOUND[t.NAME] = t + if t.CODE is not None: + COMPOUND[t.CODE] = t + elif issubclass(t, Primitive): + PRIMITIVE[t.NAME] = t + PRIMITIVE[t.CODE] = t + elif issubclass(t, Enum): + ENUMS[t.NAME] = t diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 18d7848b8d..2bc9844351 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -25,7 +25,7 @@ incoming method frames to a delegate. """ import thread, threading, traceback, socket, sys, logging -from connection08 import EOF, Method, Header, Body, Request, Response +from connection08 import EOF, Method, Header, Body, Request, Response, VersionError from message import Message from queue import Queue, Closed as QueueClosed from content import Content @@ -95,6 +95,8 @@ class Peer: break ch = self.channel(frame.channel) ch.receive(frame, self.work) + except VersionError, e: + self.closed(e) except: self.fatal() diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py index 3b8bd18469..4413a22899 100644 --- a/qpid/python/qpid/session.py +++ b/qpid/python/qpid/session.py @@ -22,9 +22,9 @@ from spec import SPEC from generator import command_invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec -from assembler import Segment from queue import Queue from datatypes import Message, serial +from ops import Command, MessageTransfer from util import wait, notify from exceptions import * from logging import getLogger @@ -44,7 +44,7 @@ def server(*args): INCOMPLETE = object() -class Session(command_invoker(SPEC)): +class Session(command_invoker()): def __init__(self, name, auto_sync=True, timeout=10, delegate=client): self.name = name @@ -67,8 +67,6 @@ class Session(command_invoker(SPEC)): self.results = {} self.exceptions = [] - self.assembly = None - self.delegate = delegate(self) def incoming(self, destination): @@ -134,68 +132,51 @@ class Session(command_invoker(SPEC)): finally: self.lock.release() - def invoke(self, type, args, kwargs): - # XXX - if not hasattr(type, "track"): - return type.new(args, kwargs) - - self.invoke_lock.acquire() - try: - return self.do_invoke(type, args, kwargs) - finally: - self.invoke_lock.release() + def invoke(self, op, args, kwargs): + if issubclass(op, Command): + self.invoke_lock.acquire() + try: + return self.do_invoke(op, args, kwargs) + finally: + self.invoke_lock.release() + else: + return op(*args, **kwargs) - def do_invoke(self, type, args, kwargs): + def do_invoke(self, op, args, kwargs): if self._closing: raise SessionClosed() if self.channel == None: raise SessionDetached() - if type.segments: - if len(args) == len(type.fields) + 1: + if op == MessageTransfer: + if len(args) == len(op.FIELDS) + 1: message = args[-1] args = args[:-1] else: message = kwargs.pop("message", None) - else: - message = None - - hdr = Struct(self.spec["session.header"]) - hdr.sync = self.auto_sync or kwargs.pop("sync", False) - self.need_sync = not hdr.sync + if message is not None: + kwargs["headers"] = message.headers + kwargs["payload"] = message.body - cmd = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_command(hdr, cmd) + cmd = op(*args, **kwargs) + cmd.sync = self.auto_sync or cmd.sync + self.need_sync = not cmd.sync + cmd.channel = self.channel.id - seg = Segment(True, (message == None or - (message.headers == None and message.body == None)), - type.segment_type, type.track, self.channel.id, sc.encoded) - - if type.result: + if op.RESULT: result = Future(exception=SessionException) self.results[self.sender.next_id] = result - self.send(seg) - - log.debug("SENT %s %s %s", seg.id, hdr, cmd) - - if message != None: - if message.headers != None: - sc = StringCodec(self.spec) - for st in message.headers: - sc.write_struct32(st) - seg = Segment(False, message.body == None, self.spec["segment_type.header"].value, - type.track, self.channel.id, sc.encoded) - self.send(seg) - if message.body != None: - seg = Segment(False, True, self.spec["segment_type.body"].value, - type.track, self.channel.id, message.body) - self.send(seg) - msg.debug("SENT %s", message) - - if type.result: + log.debug("SENDING %s", cmd) + + self.send(cmd) + + log.debug("SENT %s", cmd) + if op == MessageTransfer: + msg.debug("SENT %s", cmd) + + if op.RESULT: if self.auto_sync: return result.get(self.timeout) else: @@ -203,81 +184,47 @@ class Session(command_invoker(SPEC)): elif self.auto_sync: self.sync(self.timeout) - def received(self, seg): - self.receiver.received(seg) - if seg.first: - assert self.assembly == None - self.assembly = [] - self.assembly.append(seg) - if seg.last: - self.dispatch(self.assembly) - self.assembly = None - - def dispatch(self, assembly): - segments = assembly[:] - - hdr, cmd = assembly.pop(0).decode(self.spec) - log.debug("RECV %s %s %s", cmd.id, hdr, cmd) - - args = [] - - for st in cmd._type.segments: - if assembly: - seg = assembly[0] - if seg.type == st.segment_type: - args.append(seg.decode(self.spec)) - assembly.pop(0) - continue - args.append(None) - - assert len(assembly) == 0 + def received(self, cmd): + self.receiver.received(cmd) + self.dispatch(cmd) - attr = cmd._type.qname.replace(".", "_") - result = getattr(self.delegate, attr)(cmd, *args) + def dispatch(self, cmd): + log.debug("RECV %s", cmd) - if cmd._type.result: + result = getattr(self.delegate, cmd.NAME)(cmd) + if result is INCOMPLETE: + return + elif result is not None: self.execution_result(cmd.id, result) - if result is not INCOMPLETE: - for seg in segments: - self.receiver.completed(seg) - # XXX: don't forget to obey sync for manual completion as well - if hdr.sync: - self.channel.session_completed(self.receiver._completed) + self.receiver.completed(cmd) + # XXX: don't forget to obey sync for manual completion as well + if cmd.sync: + self.channel.session_completed(self.receiver._completed) - def send(self, seg): - self.sender.send(seg) - - def __str__(self): - return '<Session: %s, %s>' % (self.name, self.channel) + def send(self, cmd): + self.sender.send(cmd) def __repr__(self): - return str(self) + return '<Session: %s, %s>' % (self.name, self.channel) class Receiver: def __init__(self, session): self.session = session self.next_id = None - self.next_offset = None self._completed = RangedSet() - def received(self, seg): - if self.next_id == None or self.next_offset == None: + def received(self, cmd): + if self.next_id == None: raise Exception("todo") - seg.id = self.next_id - seg.offset = self.next_offset - if seg.last: - self.next_id += 1 - self.next_offset = 0 - else: - self.next_offset += len(seg.payload) + cmd.id = self.next_id + self.next_id += 1 - def completed(self, seg): - if seg.id == None: - raise ValueError("cannot complete unidentified segment") - if seg.last: - self._completed.add(seg.id) + def completed(self, cmd): + if cmd.id == None: + raise ValueError("cannot complete unidentified command") + self._completed.add(cmd.id) def known_completed(self, commands): completed = RangedSet() @@ -294,30 +241,24 @@ class Sender: def __init__(self, session): self.session = session self.next_id = serial(0) - self.next_offset = 0 - self.segments = [] + self.commands = [] self._completed = RangedSet() - def send(self, seg): - seg.id = self.next_id - seg.offset = self.next_offset - if seg.last: - self.next_id += 1 - self.next_offset = 0 - else: - self.next_offset += len(seg.payload) - self.segments.append(seg) + def send(self, cmd): + cmd.id = self.next_id + self.next_id += 1 if self.session.send_id: self.session.send_id = False - self.session.channel.session_command_point(seg.id, seg.offset) - self.session.channel.connection.write_segment(seg) + self.session.channel.session_command_point(cmd.id, 0) + self.commands.append(cmd) + self.session.channel.connection.write_op(cmd) def completed(self, commands): idx = 0 - while idx < len(self.segments): - seg = self.segments[idx] - if seg.id in commands: - del self.segments[idx] + while idx < len(self.commands): + cmd = self.commands[idx] + if cmd.id in commands: + del self.commands[idx] else: idx += 1 for range in commands.ranges: @@ -332,7 +273,7 @@ class Incoming(Queue): def start(self): self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) - for unit in self.session.credit_unit.values(): + for unit in self.session.credit_unit.VALUES: self.session.message_flow(self.destination, unit, 0xFFFFFFFFL) def stop(self): @@ -356,9 +297,9 @@ class Delegate: class Client(Delegate): - def message_transfer(self, cmd, headers, body): - m = Message(body) - m.headers = headers + def message_transfer(self, cmd): + m = Message(cmd.payload) + m.headers = cmd.headers m.id = cmd.id messages = self.session.incoming(cmd.destination) messages.put(m) diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index cd76c70c5c..e9bfef1fa6 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -29,7 +29,7 @@ class so that the generated code can be reused in a variety of situations. """ -import os, mllib, spec08, spec010 +import os, mllib, spec08 def default(): try: @@ -54,7 +54,7 @@ def load(specfile, *errata): minor = doc["amqp/@minor"] if major == "0" and minor == "10": - return spec010.load(specfile, *errata) + return None else: return spec08.load(specfile, *errata) diff --git a/qpid/python/qpid/spec010.py b/qpid/python/qpid/spec010.py deleted file mode 100644 index eabc8e2983..0000000000 --- a/qpid/python/qpid/spec010.py +++ /dev/null @@ -1,708 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, cPickle, datatypes, datetime -from codec010 import StringCodec -from util import mtime, fill - -class Node: - - def __init__(self, children): - self.children = children - self.named = {} - self.docs = [] - self.rules = [] - - def register(self): - for ch in self.children: - ch.register(self) - - def resolve(self): - for ch in self.children: - ch.resolve() - - def __getitem__(self, name): - path = name.split(".", 1) - nd = self.named - for step in path: - nd = nd[step] - return nd - - def __iter__(self): - return iter(self.children) - -class Anonymous: - - def __init__(self, children): - self.children = children - - def register(self, node): - for ch in self.children: - ch.register(node) - - def resolve(self): - for ch in self.children: - ch.resolve() - -class Named: - - def __init__(self, name): - self.name = name - self.qname = None - - def register(self, node): - self.spec = node.spec - self.klass = node.klass - node.named[self.name] = self - if node.qname: - self.qname = "%s.%s" % (node.qname, self.name) - else: - self.qname = self.name - - def __str__(self): - return self.qname - - def __repr__(self): - return str(self) - -class Lookup: - - def lookup(self, name): - value = None - if self.klass: - try: - value = self.klass[name] - except KeyError: - pass - if not value: - value = self.spec[name] - return value - -class Coded: - - def __init__(self, code): - self.code = code - -class Constant(Named, Node): - - def __init__(self, name, value, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.value = value - - def register(self, node): - Named.register(self, node) - node.constants.append(self) - Node.register(self) - -class Type(Named, Node): - - def __init__(self, name, children): - Named.__init__(self, name) - Node.__init__(self, children) - - def is_present(self, value): - return value != None - - def register(self, node): - Named.register(self, node) - Node.register(self) - -class Primitive(Coded, Type): - - def __init__(self, name, code, fixed, variable, children): - Coded.__init__(self, code) - Type.__init__(self, name, children) - self.fixed = fixed - self.variable = variable - - def register(self, node): - Type.register(self, node) - if self.code is not None: - self.spec.types[self.code] = self - - def is_present(self, value): - if self.fixed == 0: - return value - else: - return Type.is_present(self, value) - - def encode(self, codec, value): - getattr(codec, "write_%s" % self.name)(value) - - def decode(self, codec): - return getattr(codec, "read_%s" % self.name)() - -class Domain(Type, Lookup): - - def __init__(self, name, type, children): - Type.__init__(self, name, children) - self.type = type - self.choices = {} - - def resolve(self): - self.type = self.lookup(self.type) - Node.resolve(self) - - def encode(self, codec, value): - self.type.encode(codec, value) - - def decode(self, codec): - return self.type.decode(codec) - -class Enum: - - def __init__(self, name): - self.name = name - self._names = () - self._values = () - - def values(self): - return self._values - - def __repr__(self): - return "%s(%s)" % (self.name, ", ".join(self._names)) - -class Choice(Named, Node): - - def __init__(self, name, value, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.value = value - - def register(self, node): - Named.register(self, node) - node.choices[self.value] = self - Node.register(self) - try: - enum = node.spec.enums[node.name] - except KeyError: - enum = Enum(node.name) - node.spec.enums[node.name] = enum - setattr(enum, self.name, self.value) - enum._names += (self.name,) - enum._values += (self.value,) - -class Composite(Type, Coded): - - def __init__(self, name, label, code, size, pack, children): - Coded.__init__(self, code) - Type.__init__(self, name, children) - self.label = label - self.fields = [] - self.size = size - self.pack = pack - - def new(self, args, kwargs): - return datatypes.Struct(self, *args, **kwargs) - - def decode(self, codec): - codec.read_size(self.size) - if self.code is not None: - code = codec.read_uint16() - assert self.code == code - return datatypes.Struct(self, **self.decode_fields(codec)) - - def decode_fields(self, codec): - flags = 0 - for i in range(self.pack): - flags |= (codec.read_uint8() << 8*i) - - result = {} - - for i in range(len(self.fields)): - f = self.fields[i] - if flags & (0x1 << i): - result[f.name] = f.type.decode(codec) - else: - result[f.name] = None - return result - - def encode(self, codec, value): - sc = StringCodec(self.spec) - if self.code is not None: - sc.write_uint16(self.code) - self.encode_fields(sc, value) - codec.write_size(self.size, len(sc.encoded)) - codec.write(sc.encoded) - - def encode_fields(self, codec, values): - flags = 0 - for i in range(len(self.fields)): - f = self.fields[i] - if f.type.is_present(values[f.name]): - flags |= (0x1 << i) - for i in range(self.pack): - codec.write_uint8((flags >> 8*i) & 0xFF) - for i in range(len(self.fields)): - f = self.fields[i] - if flags & (0x1 << i): - f.type.encode(codec, values[f.name]) - - def docstring(self): - docs = [] - if self.label: - docs.append(self.label) - docs += [d.text for d in self.docs] - s = "\n\n".join([fill(t, 2) for t in docs]) - for f in self.fields: - fdocs = [] - if f.label: - fdocs.append(f.label) - else: - fdocs.append("") - fdocs += [d.text for d in f.docs] - s += "\n\n" + "\n\n".join([fill(fdocs[0], 4, f.name)] + - [fill(t, 4) for t in fdocs[1:]]) - return s - - -class Field(Named, Node, Lookup): - - def __init__(self, name, label, type, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.label = label - self.type = type - self.exceptions = [] - - def default(self): - return None - - def register(self, node): - Named.register(self, node) - node.fields.append(self) - Node.register(self) - - def resolve(self): - self.type = self.lookup(self.type) - Node.resolve(self) - - def __str__(self): - return "%s: %s" % (self.qname, self.type.qname) - -class Struct(Composite): - - def register(self, node): - Composite.register(self, node) - if self.code is not None: - self.spec.structs[self.code] = self - self.spec.structs_by_name[self.name] = self - self.pyname = self.name - self.pydoc = self.docstring() - - def __str__(self): - fields = ",\n ".join(["%s: %s" % (f.name, f.type.qname) - for f in self.fields]) - return "%s {\n %s\n}" % (self.qname, fields) - -class Segment: - - def __init__(self): - self.segment_type = None - - def register(self, node): - self.spec = node.spec - self.klass = node.klass - node.segments.append(self) - Node.register(self) - -class Instruction(Composite, Segment): - - def __init__(self, name, label, code, children): - Composite.__init__(self, name, label, code, 0, 2, children) - Segment.__init__(self) - self.track = None - self.handlers = [] - - def __str__(self): - return "%s(%s)" % (self.qname, ", ".join(["%s: %s" % (f.name, f.type.qname) - for f in self.fields])) - - def register(self, node): - Composite.register(self, node) - self.pyname = self.qname.replace(".", "_") - self.pydoc = self.docstring() - self.spec.instructions[self.pyname] = self - -class Control(Instruction): - - def __init__(self, name, code, label, children): - Instruction.__init__(self, name, code, label, children) - self.response = None - - def register(self, node): - Instruction.register(self, node) - node.controls.append(self) - self.spec.controls[self.code] = self - self.segment_type = self.spec["segment_type.control"].value - self.track = self.spec["track.control"].value - -class Command(Instruction): - - def __init__(self, name, label, code, children): - Instruction.__init__(self, name, label, code, children) - self.result = None - self.exceptions = [] - self.segments = [] - - def register(self, node): - Instruction.register(self, node) - node.commands.append(self) - self.spec.commands[self.code] = self - self.segment_type = self.spec["segment_type.command"].value - self.track = self.spec["track.command"].value - -class Header(Segment, Node): - - def __init__(self, children): - Segment.__init__(self) - Node.__init__(self, children) - self.entries = [] - - def register(self, node): - Segment.register(self, node) - self.segment_type = self.spec["segment_type.header"].value - Node.register(self) - -class Entry(Lookup): - - def __init__(self, type): - self.type = type - - def register(self, node): - self.spec = node.spec - self.klass = node.klass - node.entries.append(self) - - def resolve(self): - self.type = self.lookup(self.type) - -class Body(Segment, Node): - - def __init__(self, children): - Segment.__init__(self) - Node.__init__(self, children) - - def register(self, node): - Segment.register(self, node) - self.segment_type = self.spec["segment_type.body"].value - Node.register(self) - - def resolve(self): pass - -class Class(Named, Coded, Node): - - def __init__(self, name, code, children): - Named.__init__(self, name) - Coded.__init__(self, code) - Node.__init__(self, children) - self.controls = [] - self.commands = [] - - def register(self, node): - Named.register(self, node) - self.klass = self - node.classes.append(self) - Node.register(self) - -class Doc: - - def __init__(self, type, title, text): - self.type = type - self.title = title - self.text = text - - def register(self, node): - node.docs.append(self) - - def resolve(self): pass - -class Role(Named, Node): - - def __init__(self, name, children): - Named.__init__(self, name) - Node.__init__(self, children) - - def register(self, node): - Named.register(self, node) - Node.register(self) - -class Rule(Named, Node): - - def __init__(self, name, children): - Named.__init__(self, name) - Node.__init__(self, children) - - def register(self, node): - Named.register(self, node) - node.rules.append(self) - Node.register(self) - -class Exception(Named, Node): - - def __init__(self, name, error_code, children): - Named.__init__(self, name) - Node.__init__(self, children) - self.error_code = error_code - - def register(self, node): - Named.register(self, node) - node.exceptions.append(self) - Node.register(self) - -def direct(t): - return lambda x: t - -def map_str(s): - for c in s: - if ord(c) >= 0x80: - return "vbin16" - return "str16" - -class Spec(Node): - - ENCODINGS = { - unicode: direct("str16"), - str: map_str, - buffer: direct("vbin32"), - int: direct("int64"), - long: direct("int64"), - float: direct("double"), - None.__class__: direct("void"), - list: direct("list"), - tuple: direct("list"), - dict: direct("map"), - datatypes.timestamp: direct("datetime"), - datetime.datetime: direct("datetime"), - datatypes.UUID: direct("uuid") - } - - def __init__(self, major, minor, port, children): - Node.__init__(self, children) - self.major = major - self.minor = minor - self.port = port - self.constants = [] - self.classes = [] - self.types = {} - self.qname = None - self.spec = self - self.klass = None - self.instructions = {} - self.controls = {} - self.commands = {} - self.structs = {} - self.structs_by_name = {} - self.enums = {} - - def encoding(self, obj): - return self._encoding(obj.__class__, obj) - - def _encoding(self, klass, obj): - if Spec.ENCODINGS.has_key(klass): - return self.named[Spec.ENCODINGS[klass](obj)] - for base in klass.__bases__: - result = self._encoding(base, obj) - if result != None: - return result - -class Implement: - - def __init__(self, handle): - self.handle = handle - - def register(self, node): - node.handlers.append(self.handle) - - def resolve(self): pass - -class Response(Node): - - def __init__(self, name, children): - Node.__init__(self, children) - self.name = name - - def register(self, node): - Node.register(self) - -class Result(Node, Lookup): - - def __init__(self, type, children): - self.type = type - Node.__init__(self, children) - - def register(self, node): - node.result = self - self.qname = node.qname - self.klass = node.klass - self.spec = node.spec - Node.register(self) - - def resolve(self): - self.type = self.lookup(self.type) - Node.resolve(self) - -import mllib - -def num(s): - if s: return int(s, 0) - -REPLACE = {" ": "_", "-": "_"} -KEYWORDS = {"global": "global_", - "return": "return_"} - -def id(name): - name = str(name) - for key, val in REPLACE.items(): - name = name.replace(key, val) - try: - name = KEYWORDS[name] - except KeyError: - pass - return name - -class Loader: - - def __init__(self): - self.class_code = 0 - - def code(self, nd): - c = num(nd["@code"]) - if c is None: - return None - else: - return c | (self.class_code << 8) - - def list(self, q): - result = [] - for nd in q: - result.append(nd.dispatch(self)) - return result - - def children(self, n): - return self.list(n.query["#tag"]) - - def data(self, d): - return d.data - - def do_amqp(self, a): - return Spec(num(a["@major"]), num(a["@minor"]), num(a["@port"]), - self.children(a)) - - def do_type(self, t): - return Primitive(id(t["@name"]), self.code(t), num(t["@fixed-width"]), - num(t["@variable-width"]), self.children(t)) - - def do_constant(self, c): - return Constant(id(c["@name"]), num(c["@value"]), self.children(c)) - - def do_domain(self, d): - return Domain(id(d["@name"]), id(d["@type"]), self.children(d)) - - def do_enum(self, e): - return Anonymous(self.children(e)) - - def do_choice(self, c): - return Choice(id(c["@name"]), num(c["@value"]), self.children(c)) - - def do_class(self, c): - code = num(c["@code"]) - self.class_code = code - children = self.children(c) - children += self.list(c.query["command/result/struct"]) - self.class_code = 0 - return Class(id(c["@name"]), code, children) - - def do_doc(self, doc): - text = reduce(lambda x, y: x + y, self.list(doc.children)) - return Doc(doc["@type"], doc["@title"], text) - - def do_xref(self, x): - return x["@ref"] - - def do_role(self, r): - return Role(id(r["@name"]), self.children(r)) - - def do_control(self, c): - return Control(id(c["@name"]), c["@label"], self.code(c), self.children(c)) - - def do_rule(self, r): - return Rule(id(r["@name"]), self.children(r)) - - def do_implement(self, i): - return Implement(id(i["@handle"])) - - def do_response(self, r): - return Response(id(r["@name"]), self.children(r)) - - def do_field(self, f): - return Field(id(f["@name"]), f["@label"], id(f["@type"]), self.children(f)) - - def do_struct(self, s): - return Struct(id(s["@name"]), s["@label"], self.code(s), num(s["@size"]), - num(s["@pack"]), self.children(s)) - - def do_command(self, c): - return Command(id(c["@name"]), c["@label"], self.code(c), self.children(c)) - - def do_segments(self, s): - return Anonymous(self.children(s)) - - def do_header(self, h): - return Header(self.children(h)) - - def do_entry(self, e): - return Entry(id(e["@type"])) - - def do_body(self, b): - return Body(self.children(b)) - - def do_result(self, r): - type = r["@type"] - if not type: - type = r["struct/@name"] - return Result(id(type), self.list(r.query["#tag", lambda x: x.name != "struct"])) - - def do_exception(self, e): - return Exception(id(e["@name"]), id(e["@error-code"]), self.children(e)) - -def load(xml): - fname = xml + ".pcl" - - if os.path.exists(fname) and mtime(fname) > mtime(__file__): - file = open(fname, "r") - s = cPickle.load(file) - file.close() - else: - doc = mllib.xml_parse(xml) - s = doc["amqp"].dispatch(Loader()) - s.register() - s.resolve() - - try: - file = open(fname, "w") - except IOError: - file = None - - if file: - cPickle.dump(s, file) - file.close() - - return s diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 12b2781561..1439b892ea 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -21,239 +21,13 @@ # Support library for qpid python tests. # -import sys, re, unittest, os, random, logging, traceback -import qpid.client, qpid.spec, qmf.console +import unittest, traceback, socket +import qpid.client, qmf.console import Queue -from fnmatch import fnmatch -from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message - -#0-10 support -from qpid.connection import Connection -from qpid.spec010 import load -from qpid.util import connect, ssl, URL - -def findmodules(root): - """Find potential python modules under directory root""" - found = [] - for dirpath, subdirs, files in os.walk(root): - modpath = dirpath.replace(os.sep, '.') - if not re.match(r'\.svn$', dirpath): # Avoid SVN directories - for f in files: - match = re.match(r'(.+)\.py$', f) - if match and f != '__init__.py': - found.append('.'.join([modpath, match.group(1)])) - return found - -def default(value, default): - if (value == None): return default - else: return value - -class TestRunner: - - SPEC_FOLDER = "../specs" - qpidd = os.getenv("QPIDD") - - """Runs unit tests. - - Parses command line arguments, provides utility functions for tests, - runs the selected test suite. - """ - - def _die(self, message = None): - if message: print message - print """ -run-tests [options] [test*] -The name of a test is package.module.ClassName.testMethod -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: - 0-8 - use the default 0-8 specification. - 0-9 - use the default 0-9 specification. - 0-10-errata - use the 0-10 specification with qpid errata. - -e/--errata <errata.xml> : file containing amqp XML errata - -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to - -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD - env to point to broker executable. broker-args will be - prepended with "--daemon --port=0" - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. - -S/--skip-self-test : skips the client self tests in the 'tests folder' - -F/--spec-folder : folder that contains the specs to be loaded - """ - sys.exit(1) - - def startBroker(self, brokerArgs): - """Start a single broker daemon""" - if TestRunner.qpidd == None: - self._die("QPIDD environment var must point to qpidd when using -B/--start-broker") - cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs) - portStr = os.popen(cmd).read() - if len(portStr) == 0: - self._die("%s failed to start" % TestRunner.qpidd) - port = int(portStr) - pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read()) - print "Started broker: pid=%d, port=%d" % (pid, port) - self.brokerTuple = (pid, port) - self.setBroker("localhost:%d" % port) - - def stopBroker(self): - """Stop the broker using qpidd -q""" - if self.brokerTuple: - ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q") - if ret != 0: - self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret)) - print "Stopped broker: pid=%d, port=%d" % self.brokerTuple - - def killBroker(self): - """Kill the broker using kill -9 (SIGTERM)""" - if self.brokerTuple: - os.kill(self.brokerTuple[0], signal.SIGTERM) - print "Killed broker: pid=%d, port=%d" % self.brokerTuple - - def setBroker(self, broker): - try: - self.url = URL(broker) - except ValueError: - self._die("'%s' is not a valid broker" % (broker)) - self.user = default(self.url.user, "guest") - self.password = default(self.url.password, "guest") - self.host = self.url.host - if self.url.scheme == URL.AMQPS: - self.ssl = True - default_port = 5671 - else: - self.ssl = False - default_port = 5672 - self.port = default(self.url.port, default_port) - - def ignoreFile(self, filename): - f = file(filename) - for line in f.readlines(): self.ignore.append(line.strip()) - f.close() - - def use08spec(self): - "True if we are running with the old 0-8 spec." - # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. - return self.spec.major==8 and self.spec.minor==0 - - def use09spec(self): - "True if we are running with the 0-9 (non-wip) spec." - return self.spec.major==0 and self.spec.minor==9 - - def _parseargs(self, args): - # Defaults - self.setBroker("localhost") - self.verbose = 1 - self.ignore = [] - self.specfile = "0-8" - self.errata = [] - self.skip_self_test = False - - try: - opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:", - ["help", "spec", "errata=", "broker=", - "start-broker=", "verbose", "skip-self-test", "ignore", - "ignore-file", "spec-folder"]) - except GetoptError, e: - self._die(str(e)) - # check for mutually exclusive options - if "-B" in opts or "--start-broker" in opts: - if "-b" in opts or "--broker" in opts: - self._die("Cannot use -B/--start-broker and -b/broker options together") - for opt, value in opts: - if opt in ("-?", "-h", "--help"): self._die() - if opt in ("-s", "--spec"): self.specfile = value - if opt in ("-e", "--errata"): self.errata.append(value) - if opt in ("-b", "--broker"): self.setBroker(value) - if opt in ("-B", "--start-broker"): self.startBroker(value) - if opt in ("-v", "--verbose"): self.verbose = 2 - if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) - if opt in ("-i", "--ignore"): self.ignore.append(value) - if opt in ("-I", "--ignore-file"): self.ignoreFile(value) - if opt in ("-S", "--skip-self-test"): self.skip_self_test = True - if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value - - # Abbreviations for default settings. - if (self.specfile == "0-10"): - self.spec = load(self.get_spec_file("amqp.0-10.xml")) - elif (self.specfile == "0-10-errata"): - self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) - else: - if (self.specfile == "0-8"): - self.specfile = self.get_spec_file("amqp.0-8.xml") - elif (self.specfile == "0-9"): - self.specfile = self.get_spec_file("amqp.0-9.xml") - self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) - - if (self.specfile == None): - self._die("No XML specification provided") - print "Using specification from:", self.specfile - - self.spec = qpid.spec.load(self.specfile, *self.errata) - - if len(self.tests) == 0: - if not self.skip_self_test: - self.tests=findmodules("tests") - if self.use08spec() or self.use09spec(): - self.tests+=findmodules("tests_0-8") - elif (self.spec.major == 99 and self.spec.minor == 0): - self.tests+=findmodules("tests_0-10_preview") - elif (self.spec.major == 0 and self.spec.minor == 10): - self.tests+=findmodules("tests_0-10") - - def testSuite(self): - class IgnoringTestSuite(unittest.TestSuite): - def addTest(self, test): - if isinstance(test, unittest.TestCase): - for pattern in testrunner.ignore: - if fnmatch(test.id(), pattern): - return - unittest.TestSuite.addTest(self, test) - - # Use our IgnoringTestSuite in the test loader. - unittest.TestLoader.suiteClass = IgnoringTestSuite - return unittest.defaultTestLoader.loadTestsFromNames(self.tests) - - def run(self, args=sys.argv[1:]): - self.brokerTuple = None - self._parseargs(args) - runner = unittest.TextTestRunner(descriptions=False, - verbosity=self.verbose) - result = runner.run(self.testSuite()) - - if (self.ignore): - print "=======================================" - print "NOTE: the following tests were ignored:" - for t in self.ignore: print t - print "=======================================" - - self.stopBroker() - return result.wasSuccessful() - - def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): - """Connect to the broker, returns a qpid.client.Client""" - host = host or self.host - port = port or self.port - spec = spec or self.spec - user = user or self.user - password = password or self.password - client = qpid.client.Client(host, port, spec) - if self.use08spec(): - client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) - else: - client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) - return client - - def get_spec_file(self, fname): - return TestRunner.SPEC_FOLDER + os.sep + fname - -# Global instance for tests to call connect. -testrunner = TestRunner() - +from qpid.harness import Skipped +from qpid.exceptions import VersionError class TestBase(unittest.TestCase): """Base class for Qpid test cases. @@ -267,6 +41,9 @@ class TestBase(unittest.TestCase): resources to clean up later. """ + def configure(self, config): + self.config = config + def setUp(self): self.queues = [] self.exchanges = [] @@ -293,9 +70,26 @@ class TestBase(unittest.TestCase): else: self.client.close() - def connect(self, *args, **keys): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None): """Create a new connction, return the Client object""" - return testrunner.connect(*args, **keys) + host = host or self.config.broker.host + port = port or self.config.broker.port or 5672 + user = user or "guest" + password = password or "guest" + client = qpid.client.Client(host, port) + try: + if client.spec.major == 8 and client.spec.minor == 0: + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) + else: + client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) + except qpid.client.Closed, e: + if isinstance(e.args[0], VersionError): + raise Skipped(e.args[0]) + else: + raise e + except socket.error, e: + raise Skipped(e) + return client def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel @@ -319,17 +113,8 @@ class TestBase(unittest.TestCase): def consume(self, queueName): """Consume from named queue returns the Queue object.""" - if testrunner.use08spec() or testrunner.use09spec(): - reply = self.channel.basic_consume(queue=queueName, no_ack=True) - return self.client.queue(reply.consumer_tag) - else: - if not "uniqueTag" in dir(self): self.uniqueTag = 1 - else: self.uniqueTag += 1 - consumer_tag = "tag" + str(self.uniqueTag) - self.channel.message_subscribe(queue=queueName, destination=consumer_tag) - self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) - self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) - return self.client.queue(consumer_tag) + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) def subscribe(self, channel=None, **keys): channel = channel or self.channel @@ -350,24 +135,14 @@ class TestBase(unittest.TestCase): Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() - if testrunner.use08spec() or testrunner.use09spec(): - self.channel.basic_publish( - exchange=exchange, - content=Content(body, properties=properties), - routing_key=routing_key) - else: - self.channel.message_transfer( - destination=exchange, - content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) + self.channel.basic_publish( + exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) msg = queue.get(timeout=1) - if testrunner.use08spec() or testrunner.use09spec(): - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content.properties) - else: - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content['application_headers']) + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content.properties) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ @@ -394,11 +169,20 @@ class TestBase(unittest.TestCase): self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) +#0-10 support +from qpid.connection import Connection +from qpid.util import connect, ssl, URL + class TestBase010(unittest.TestCase): """ Base class for Qpid test cases. using the final 0-10 spec """ + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + def setUp(self): self.conn = self.connect() self.session = self.conn.session("test-session", timeout=10) @@ -406,15 +190,26 @@ class TestBase010(unittest.TestCase): def startQmf(self, handler=None): self.qmf = qmf.console.Session(handler) - self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) + self.qmf_broker = self.qmf.addBroker(str(self.broker)) def connect(self, host=None, port=None): - sock = connect(host or testrunner.host, port or testrunner.port) - if testrunner.url.scheme == URL.AMQPS: + url = self.broker + if url.scheme == URL.AMQPS: + default_port = 5671 + else: + default_port = 5672 + try: + sock = connect(host or url.host, port or url.port or default_port) + except socket.error, e: + raise Skipped(e) + if url.scheme == URL.AMQPS: sock = ssl(sock) - conn = Connection(sock, username=testrunner.user, - password=testrunner.password) - conn.start(timeout=10) + conn = Connection(sock, username=url.user or "guest", + password=url.password or "guest") + try: + conn.start(timeout=10) + except VersionError, e: + raise Skipped(e) return conn def tearDown(self): diff --git a/qpid/python/qpid/tests/framing.py b/qpid/python/qpid/tests/framing.py index 4cd596b583..0b33df8b9a 100644 --- a/qpid/python/qpid/tests/framing.py +++ b/qpid/python/qpid/tests/framing.py @@ -40,6 +40,50 @@ class Base(Test): assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2) assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2) + def cmp_list(self, l1, l2): + if l1 is None: + assert l2 is None + return + + assert len(l1) == len(l2) + for v1, v2 in zip(l1, l2): + if isinstance(v1, Compound): + self.cmp_ops(v1, v2) + else: + assert v1 == v2 + + def cmp_ops(self, op1, op2): + if op1 is None: + assert op2 is None + return + + assert op1.__class__ == op2.__class__ + cls = op1.__class__ + assert op1.NAME == op2.NAME + assert op1.CODE == op2.CODE + assert op1.FIELDS == op2.FIELDS + for f in cls.FIELDS: + v1 = getattr(op1, f.name) + v2 = getattr(op2, f.name) + if COMPOUND.has_key(f.type) or f.type == "struct32": + self.cmp_ops(v1, v2) + elif f.type in ("list", "array"): + self.cmp_list(v1, v2) + else: + assert v1 == v2, "expected: %r, got %r" % (v1, v2) + + if issubclass(cls, Command) or issubclass(cls, Control): + assert op1.channel == op2.channel + + if issubclass(cls, Command): + assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync) + assert (op1.headers is None and op2.headers is None) or \ + (op1.headers is not None and op2.headers is not None) + if op1.headers is not None: + assert len(op1.headers) == len(op2.headers) + for h1, h2 in zip(op1.headers, op2.headers): + self.cmp_ops(h1, h2) + class FrameTest(Base): def enc_dec(self, frames, encoded=None): @@ -171,3 +215,75 @@ class SegmentTest(Base): for i in range(0, 8, 2)] self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2) + +from qpid.ops import * + +class OpTest(Base): + + def enc_dec(self, ops): + enc = OpEncoder() + dec = OpDecoder() + enc.write(*ops) + segs = enc.read() + dec.write(*segs) + dops = dec.read() + assert len(ops) == len(dops) + for op1, op2 in zip(ops, dops): + self.cmp_ops(op1, op2) + + def testEmtpyMT(self): + self.enc_dec([MessageTransfer()]) + + def testEmptyMTSync(self): + self.enc_dec([MessageTransfer(sync=True)]) + + def testMT(self): + self.enc_dec([MessageTransfer(destination="asdf")]) + + def testSyncMT(self): + self.enc_dec([MessageTransfer(destination="asdf", sync=True)]) + + def testEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(payload="")]) + + def testPayloadMT(self): + self.enc_dec([MessageTransfer(payload="test payload")]) + + def testHeadersEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])]) + + def testHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")]) + + def testMultiHeadersEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])]) + + def testMultiHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")]) + + def testContentTypeHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")]) + + def testMulti(self): + self.enc_dec([MessageTransfer(), + MessageTransfer(sync=True), + MessageTransfer(destination="one"), + MessageTransfer(destination="two", sync=True), + MessageTransfer(destination="three", payload="test payload")]) + + def testControl(self): + self.enc_dec([SessionAttach(name="asdf")]) + + def testMixed(self): + self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")]) + + def testChannel(self): + self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)]) + + def testCompound(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])]) + + def testListCompound(self): + self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"), + Xid(global_id="two"), + Xid(global_id="three")]))]) diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 8a142d6c96..7706ebbabe 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -22,6 +22,7 @@ import time from qpid.tests import Test +from qpid.harness import Skipped from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty @@ -42,7 +43,10 @@ class Base(Test): def setup(self): self.test_id = uuid4() self.broker = self.config.broker - self.conn = self.setup_connection() + try: + self.conn = self.setup_connection() + except ConnectError, e: + raise Skipped(e) self.ssn = self.setup_session() self.snd = self.setup_sender() self.rcv = self.setup_receiver() @@ -65,7 +69,7 @@ class Base(Test): receiver = ssn.receiver("ping-queue") msg = receiver.fetch(0) ssn.acknowledge() - assert msg.content == content + assert msg.content == content, "expected %r, got %r" % (content, msg.content) def drain(self, rcv, limit=None): contents = [] diff --git a/qpid/python/qpid_config.py b/qpid/python/qpid_config.py index 8f987e9962..3cf6b69b7e 100644 --- a/qpid/python/qpid_config.py +++ b/qpid/python/qpid_config.py @@ -21,3 +21,5 @@ import os qpid_home = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) amqp_spec = os.path.join(qpid_home, "specs", "amqp.0-10-qpid-errata.xml") +amqp_spec_0_8 = os.path.join(qpid_home, "specs", "amqp.0-8.xml") +amqp_spec_0_9 = os.path.join(qpid_home, "specs", "amqp.0-9.xml") diff --git a/qpid/python/rule2test b/qpid/python/rule2test deleted file mode 100755 index 10f151366e..0000000000 --- a/qpid/python/rule2test +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Convert rules to tests -# -import sys, re, os.path -from getopt import getopt, GetoptError -from string import capitalize -from xml import dom -from xml.dom.minidom import parse - -def camelcase(s): - """Convert 'string like this' to 'StringLikeThis'""" - return "".join([capitalize(w) for w in re.split(re.compile("\W*"), s)]) - -def uncapitalize(s): return s[0].lower()+s[1:] - -def ancestors(node): - "Return iterator of ancestors from top-level element to node" - def generator(node): - while node and node.parentNode: - yield node - node = node.parentNode - return reversed(list(generator(node))) - -def tagAndName(element): - nameAttr = element.getAttribute("name"); - if (nameAttr) : return camelcase(nameAttr) + camelcase(element.tagName) - else: return camelcase(element.tagName) - -def nodeText(n): - """Recursively collect text from all text nodes under n""" - if n.nodeType == dom.Node.TEXT_NODE: - return n.data - if n.childNodes: - return reduce(lambda t, c: t + nodeText(c), n.childNodes, "") - return "" - -def cleanup(docString, level=8): - unindent = re.sub("\n[ \t]*", "\n", docString.strip()) - emptyLines = re.sub("\n\n\n", "\n\n", unindent) - indented = re.sub("\n", "\n"+level*" ", emptyLines) - return level*" " + indented - -def printTest(test, docstring): - print "class %s(TestBase):" % test - print ' """' - print docstring - print ' """' - print - print - -def printTests(doc, module): - """Returns dictionary { classname : [ (methodname, docstring)* ] * }""" - tests = {} - rules = doc.getElementsByTagName("rule") - for r in rules: - path = list(ancestors(r)) - if module == path[1].getAttribute("name").lower(): - test = "".join(map(tagAndName, path[2:])) + "Tests" - docstring = cleanup(nodeText(r), 4) - printTest(test, docstring) - -def usage(message=None): - if message: print >>sys.stderr, message - print >>sys.stderr, """ -rule2test [options] <amqpclass> - -Print test classes for each rule for the amqpclass in amqp.xml. - -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : file containing amqp XML spec -""" - return 1 - -def main(argv): - try: opts, args = getopt(argv[1:], "h?s:", ["help", "spec="]) - except GetoptError, e: return usage(e) - spec = "../specs/amqp.xml" # Default - for opt, val in opts: - if (opt in ("-h", "-?", "--help")): return usage() - if (opt in ("-s", "--spec")): spec = val - doc = parse(spec) - if len(args) == 0: return usage() - printTests(doc, args[0]) - return 0 - -if (__name__ == "__main__"): sys.exit(main(sys.argv)) diff --git a/qpid/python/tests/__init__.py b/qpid/python/tests/__init__.py index 8ad514fc2f..1e495f3af3 100644 --- a/qpid/python/tests/__init__.py +++ b/qpid/python/tests/__init__.py @@ -19,12 +19,4 @@ # under the License. # -from codec import * -from queue import * -from spec import * -from framer import * -from assembler import * -from datatypes import * -from connection import * -from spec010 import * -from codec010 import * +import codec, queue, datatypes, connection, spec010, codec010 diff --git a/qpid/python/tests/assembler.py b/qpid/python/tests/assembler.py deleted file mode 100644 index f4e37084b6..0000000000 --- a/qpid/python/tests/assembler.py +++ /dev/null @@ -1,78 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.assembler import * - -PORT = 1234 - -class AssemblerTest(TestCase): - - def setUp(self): - started = Event() - self.running = True - - def run(): - running = True - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - asm = Assembler(s) - try: - asm.write_header(*asm.read_header()[-2:]) - while True: - seg = asm.read_segment() - asm.write_segment(seg) - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - assert started.isSet() - - def tearDown(self): - self.running = False - self.server.join() - - def test(self): - asm = Assembler(connect("0.0.0.0", PORT), max_payload = 1) - asm.write_header(0, 10) - asm.write_segment(Segment(True, False, 1, 2, 3, "TEST")) - asm.write_segment(Segment(False, True, 1, 2, 3, "ING")) - - assert asm.read_header() == ("AMQP", 1, 1, 0, 10) - - seg = asm.read_segment() - assert seg.first == True - assert seg.last == False - assert seg.type == 1 - assert seg.track == 2 - assert seg.channel == 3 - assert seg.payload == "TEST" - - seg = asm.read_segment() - assert seg.first == False - assert seg.last == True - assert seg.type == 1 - assert seg.track == 2 - assert seg.channel == 3 - assert seg.payload == "ING" diff --git a/qpid/python/tests/codec.py b/qpid/python/tests/codec.py index 4bd3675af8..9b51b4713c 100644 --- a/qpid/python/tests/codec.py +++ b/qpid/python/tests/codec.py @@ -23,7 +23,6 @@ from qpid.codec import Codec from qpid.spec import load from cStringIO import StringIO from qpid.reference import ReferenceId -from qpid.testlib import testrunner __doc__ = """ @@ -54,13 +53,8 @@ __doc__ = """ """ -SPEC = None - -def spec(): - global SPEC - if SPEC == None: - SPEC = load(testrunner.get_spec_file("amqp.0-8.xml")) - return SPEC +from qpid_config import amqp_spec_0_8 +SPEC = load(amqp_spec_0_8) # -------------------------------------- # -------------------------------------- @@ -76,7 +70,7 @@ class BaseDataTypes(unittest.TestCase): """ standard setUp for unitetest (refer unittest documentation for details) """ - self.codec = Codec(StringIO(), spec()) + self.codec = Codec(StringIO(), SPEC) # ------------------ def tearDown(self): @@ -507,7 +501,7 @@ def test(type, value): else: values = [value] stream = StringIO() - codec = Codec(stream, spec()) + codec = Codec(stream, SPEC) for v in values: codec.encode(type, v) codec.flush() diff --git a/qpid/python/tests/codec010.py b/qpid/python/tests/codec010.py index a1f89dc3f4..787ebc146f 100644 --- a/qpid/python/tests/codec010.py +++ b/qpid/python/tests/codec010.py @@ -20,21 +20,17 @@ import time from unittest import TestCase -from qpid.spec010 import load from qpid.codec010 import StringCodec -from qpid.testlib import testrunner from qpid.datatypes import timestamp, uuid4 +from qpid.ops import PRIMITIVE class CodecTest(TestCase): - def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10.xml")) - def check(self, type, value, compare=True): - t = self.spec[type] - sc = StringCodec(self.spec) - t.encode(sc, value) - decoded = t.decode(sc) + t = PRIMITIVE[type] + sc = StringCodec() + sc.write_primitive(t, value) + decoded = sc.read_primitive(t) if compare: assert decoded == value, "%s, %s" % (decoded, value) return decoded diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py index 19cdad9f97..d340e4e9c1 100644 --- a/qpid/python/tests/connection.py +++ b/qpid/python/tests/connection.py @@ -22,10 +22,10 @@ from unittest import TestCase from qpid.util import connect, listen from qpid.connection import * from qpid.datatypes import Message -from qpid.testlib import testrunner from qpid.delegates import Server from qpid.queue import Queue from qpid.session import Delegate +from qpid.ops import QueueQueryResult PORT = 1234 @@ -51,12 +51,12 @@ class TestSession(Delegate): pass def queue_query(self, qq): - return qq._type.result.type.new((qq.queue,), {}) + return QueueQueryResult(qq.queue) - def message_transfer(self, cmd, headers, body): + def message_transfer(self, cmd): if cmd.destination == "echo": - m = Message(body) - m.headers = headers + m = Message(cmd.payload) + m.headers = cmd.headers self.session.message_transfer(cmd.destination, cmd.accept_mode, cmd.acquire_mode, m) elif cmd.destination == "abort": @@ -64,7 +64,7 @@ class TestSession(Delegate): elif cmd.destination == "heartbeat": self.session.channel.connection_heartbeat() else: - self.queue.put((cmd, headers, body)) + self.queue.put(cmd) class ConnectionTest(TestCase): @@ -134,17 +134,17 @@ class ConnectionTest(TestCase): ssn.message_transfer(d) for d in destinations: - cmd, header, body = self.queue.get(10) + cmd = self.queue.get(10) assert cmd.destination == d - assert header == None - assert body == None + assert cmd.headers == None + assert cmd.payload == None msg = Message("this is a test") ssn.message_transfer("four", message=msg) - cmd, header, body = self.queue.get(10) + cmd = self.queue.get(10) assert cmd.destination == "four" - assert header == None - assert body == msg.body + assert cmd.headers == None + assert cmd.payload == msg.body qq = ssn.queue_query("asdf") assert qq.queue == "asdf" diff --git a/qpid/python/tests/datatypes.py b/qpid/python/tests/datatypes.py index e9e09094fa..1a60bb4107 100644 --- a/qpid/python/tests/datatypes.py +++ b/qpid/python/tests/datatypes.py @@ -18,9 +18,8 @@ # from unittest import TestCase -from qpid.testlib import testrunner -from qpid.spec010 import load from qpid.datatypes import * +from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties class SerialTest(TestCase): @@ -176,10 +175,9 @@ class UUIDTest(TestCase): class MessageTest(TestCase): def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml")) - self.mp = Struct(self.spec["message.message_properties"]) - self.dp = Struct(self.spec["message.delivery_properties"]) - self.fp = Struct(self.spec["message.fragment_properties"]) + self.mp = MessageProperties() + self.dp = DeliveryProperties() + self.fp = FragmentProperties() def testHas(self): m = Message(self.mp, self.dp, self.fp, "body") @@ -207,7 +205,7 @@ class MessageTest(TestCase): def testSetReplace(self): m = Message(self.mp, self.dp, self.fp, "body") - dp = Struct(self.spec["message.delivery_properties"]) + dp = DeliveryProperties() assert m.get("delivery_properties") == self.dp assert m.get("delivery_properties") != dp m.set(dp) diff --git a/qpid/python/tests/framer.py b/qpid/python/tests/framer.py deleted file mode 100644 index e99166721c..0000000000 --- a/qpid/python/tests/framer.py +++ /dev/null @@ -1,95 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.framer import * - -PORT = 1234 - -class FramerTest(TestCase): - - def setUp(self): - self.running = True - started = Event() - def run(): - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - conn = Framer(s) - try: - conn.write_header(*conn.read_header()[-2:]) - while True: - frame = conn.read_frame() - conn.write_frame(frame) - conn.flush() - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - assert started.isSet() - - def tearDown(self): - self.running = False - self.server.join(3) - - def test(self): - c = Framer(connect("0.0.0.0", PORT)) - - c.write_header(0, 10) - assert c.read_header() == ("AMQP", 1, 1, 0, 10) - - c.write_frame(Frame(FIRST_FRM, 1, 2, 3, "THIS")) - c.write_frame(Frame(0, 1, 2, 3, "IS")) - c.write_frame(Frame(0, 1, 2, 3, "A")) - c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST")) - c.flush() - - f = c.read_frame() - assert f.flags & FIRST_FRM - assert not (f.flags & LAST_FRM) - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "THIS" - - f = c.read_frame() - assert f.flags == 0 - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "IS" - - f = c.read_frame() - assert f.flags == 0 - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "A" - - f = c.read_frame() - assert f.flags & LAST_FRM - assert not (f.flags & FIRST_FRM) - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "TEST" diff --git a/qpid/python/tests/spec.py b/qpid/python/tests/spec.py deleted file mode 100644 index d5ea1d682a..0000000000 --- a/qpid/python/tests/spec.py +++ /dev/null @@ -1,74 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from unittest import TestCase -from qpid.spec import load -from qpid.testlib import testrunner - -class SpecTest(TestCase): - - def check_load(self, *urls): - spec = load(*map(testrunner.get_spec_file, urls)) - qdecl = spec.method("queue_declare") - assert qdecl != None - assert not qdecl.content - - queue = qdecl.fields.byname["queue"] - assert queue != None - assert queue.domain.name == "queue_name" - assert queue.type == "shortstr" - - qdecl_ok = spec.method("queue_declare_ok") - - # 0-8 is actually 8-0 - if (spec.major == 8 and spec.minor == 0 or - spec.major == 0 and spec.minor == 9): - assert qdecl_ok != None - - assert len(qdecl.responses) == 1 - assert qdecl_ok in qdecl.responses - - publish = spec.method("basic_publish") - assert publish != None - assert publish.content - - if (spec.major == 0 and spec.minor == 10): - assert qdecl_ok == None - reply_to = spec.domains.byname["reply_to"] - assert reply_to.type.size == 2 - assert reply_to.type.pack == 2 - assert len(reply_to.type.fields) == 2 - - qq = spec.method("queue_query") - assert qq != None - assert qq.result.size == 4 - assert qq.result.type != None - args = qq.result.fields.byname["arguments"] - assert args.type == "table" - - def test_load_0_8(self): - self.check_load("amqp.0-8.xml") - - def test_load_0_9(self): - self.check_load("amqp.0-9.xml") - - def test_load_0_9_errata(self): - self.check_load("amqp.0-9.xml", "amqp-errata.0-9.xml") - - def test_load_0_10(self): - self.check_load("amqp.0-10-preview.xml") diff --git a/qpid/python/tests/spec010.py b/qpid/python/tests/spec010.py index df9cb9590a..ac04e1ee02 100644 --- a/qpid/python/tests/spec010.py +++ b/qpid/python/tests/spec010.py @@ -19,66 +19,56 @@ import os, tempfile, shutil, stat from unittest import TestCase -from qpid.spec010 import load from qpid.codec010 import Codec, StringCodec -from qpid.testlib import testrunner -from qpid.datatypes import Struct +from qpid.ops import * class SpecTest(TestCase): - def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml")) - def testSessionHeader(self): - hdr = self.spec["session.header"] - sc = StringCodec(self.spec) - hdr.encode(sc, Struct(hdr, sync=True)) + sc = StringCodec() + sc.write_compound(Header(sync=True)) assert sc.encoded == "\x01\x01" - sc = StringCodec(self.spec) - hdr.encode(sc, Struct(hdr, sync=False)) + sc = StringCodec() + sc.write_compound(Header(sync=False)) assert sc.encoded == "\x01\x00" - def encdec(self, type, value): - sc = StringCodec(self.spec) - type.encode(sc, value) - decoded = type.decode(sc) + def encdec(self, value): + sc = StringCodec() + sc.write_compound(value) + decoded = sc.read_compound(value.__class__) return decoded def testMessageProperties(self): - mp = self.spec["message.message_properties"] - rt = self.spec["message.reply_to"] - - props = Struct(mp, content_length=3735928559L, - reply_to=Struct(rt, exchange="the exchange name", - routing_key="the routing key")) - dec = self.encdec(mp, props) + props = MessageProperties(content_length=3735928559L, + reply_to=ReplyTo(exchange="the exchange name", + routing_key="the routing key")) + dec = self.encdec(props) assert props.content_length == dec.content_length assert props.reply_to.exchange == dec.reply_to.exchange assert props.reply_to.routing_key == dec.reply_to.routing_key def testMessageSubscribe(self): - ms = self.spec["message.subscribe"] - cmd = Struct(ms, exclusive=True, destination="this is a test") - dec = self.encdec(self.spec["message.subscribe"], cmd) + cmd = MessageSubscribe(exclusive=True, destination="this is a test") + dec = self.encdec(cmd) assert cmd.exclusive == dec.exclusive assert cmd.destination == dec.destination def testXid(self): - xid = self.spec["dtx.xid"] - sc = StringCodec(self.spec) - st = Struct(xid, format=0, global_id="gid", branch_id="bid") - xid.encode(sc, st) + sc = StringCodec() + xid = Xid(format=0, global_id="gid", branch_id="bid") + sc.write_compound(xid) assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' - assert xid.decode(sc).__dict__ == st.__dict__ + dec = sc.read_compound(Xid) + assert xid.__dict__ == dec.__dict__ - def testLoadReadOnly(self): - spec = "amqp.0-10-qpid-errata.xml" - f = testrunner.get_spec_file(spec) - dest = tempfile.mkdtemp() - shutil.copy(f, dest) - shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) - os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) - fname = os.path.join(dest, spec) - load(fname) - assert not os.path.exists("%s.pcl" % fname) +# def testLoadReadOnly(self): +# spec = "amqp.0-10-qpid-errata.xml" +# f = testrunner.get_spec_file(spec) +# dest = tempfile.mkdtemp() +# shutil.copy(f, dest) +# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) +# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) +# fname = os.path.join(dest, spec) +# load(fname) +# assert not os.path.exists("%s.pcl" % fname) diff --git a/qpid/python/tests_0-10/__init__.py b/qpid/python/tests_0-10/__init__.py index 1fd7f72357..f9315a6f90 100644 --- a/qpid/python/tests_0-10/__init__.py +++ b/qpid/python/tests_0-10/__init__.py @@ -24,6 +24,7 @@ from broker import * from dtx import * from example import * from exchange import * +from management import * from message import * from query import * from queue import * diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py index 29394c6a7d..5cd0caba40 100644 --- a/qpid/python/tests_0-10/management.py +++ b/qpid/python/tests_0-10/management.py @@ -29,7 +29,7 @@ class ManagementTest (TestBase010): Tests for the management hooks """ - def test_broker_connectivity_oldAPI (self): + def disabled_test_broker_connectivity_oldAPI (self): """ Call the "echo" method on the broker to verify it is alive and talking. """ diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index f80eca6363..d40d5e811e 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -477,7 +477,7 @@ class MessageTests(TestBase010): #send message A ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A")) - for unit in ssn.credit_unit.values(): + for unit in ssn.credit_unit.VALUES: ssn.message_flow("c", unit, 0xFFFFFFFFL) q = ssn.incoming("c") @@ -490,7 +490,7 @@ class MessageTests(TestBase010): ssn.channel.session_completed(ssn.receiver._completed) ssn.message_accept(RangedSet(msgA.id)) - for unit in ssn.credit_unit.values(): + for unit in ssn.credit_unit.VALUES: ssn.message_flow("c", unit, 0xFFFFFFFFL) #send message B diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py index 463fbcb888..8cdc539a08 100644 --- a/qpid/python/tests_0-10/tx.py +++ b/qpid/python/tests_0-10/tx.py @@ -19,7 +19,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.datatypes import Message, RangedSet -from qpid.testlib import testrunner, TestBase010 +from qpid.testlib import TestBase010 class TxTests(TestBase010): """ diff --git a/qpid/python/tests_0-8/__init__.py b/qpid/python/tests_0-8/__init__.py index 9a09d2d04f..526f2452f8 100644 --- a/qpid/python/tests_0-8/__init__.py +++ b/qpid/python/tests_0-8/__init__.py @@ -18,3 +18,5 @@ # specific language governing permissions and limitations # under the License. # + +import basic, broker, example, exchange, queue, testlib, tx diff --git a/qpid/python/tests_0-8/basic.py b/qpid/python/tests_0-8/basic.py index 95ca0d7287..d5837fc19c 100644 --- a/qpid/python/tests_0-8/basic.py +++ b/qpid/python/tests_0-8/basic.py @@ -19,7 +19,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class BasicTests(TestBase): """Tests for 'methods' on the amqp basic 'class'""" @@ -219,10 +219,11 @@ class BasicTests(TestBase): channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four channel.basic_cancel(consumer_tag=subscription.consumer_tag) - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) channel.basic_recover(requeue=True) + + subscription2 = channel.basic_consume(queue="test-requeue") + queue2 = self.client.queue(subscription2.consumer_tag) msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) diff --git a/qpid/python/tests_0-8/broker.py b/qpid/python/tests_0-8/broker.py index d9ac69c5e3..7f3fe7530e 100644 --- a/qpid/python/tests_0-8/broker.py +++ b/qpid/python/tests_0-8/broker.py @@ -19,15 +19,15 @@ from qpid.client import Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class BrokerTests(TestBase): """Tests for basic Broker functionality""" - def test_amqp_basic_13(self): + def test_ack_and_no_ack(self): """ First, this test tries to receive a message with a no-ack - consumer. Second, this test tries to explicitely receive and + consumer. Second, this test tries to explicitly receive and acknowledge a message with an acknowledging consumer. """ ch = self.channel @@ -40,7 +40,7 @@ class BrokerTests(TestBase): msg = self.client.queue(ctag).get(timeout = 5) self.assert_(msg.content.body == body) - # Acknowleding consumer + # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag body = "test ack" @@ -102,3 +102,19 @@ class BrokerTests(TestBase): except Closed, e: self.assertConnectionException(504, e.args[0]) + def test_channel_flow(self): + channel = self.channel + channel.queue_declare(queue="flow_test_queue", exclusive=True) + ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag + incoming = self.client.queue(ctag) + + channel.channel_flow(active=False) + channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) + try: + incoming.get(timeout=1) + self.fail("Received message when flow turned off.") + except Empty: None + + channel.channel_flow(active=True) + msg = incoming.get(timeout=1) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff --git a/qpid/python/tests_0-8/example.py b/qpid/python/tests_0-8/example.py index a1949ccb9f..d82bad1f61 100644 --- a/qpid/python/tests_0-8/example.py +++ b/qpid/python/tests_0-8/example.py @@ -18,7 +18,7 @@ # from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class ExampleTest (TestBase): """ diff --git a/qpid/python/tests_0-8/queue.py b/qpid/python/tests_0-8/queue.py index 60ac4c3dfb..b7a41736ab 100644 --- a/qpid/python/tests_0-8/queue.py +++ b/qpid/python/tests_0-8/queue.py @@ -19,7 +19,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" diff --git a/qpid/python/tests_0-8/testlib.py b/qpid/python/tests_0-8/testlib.py index cab07cc4ac..76f7e964a2 100644 --- a/qpid/python/tests_0-8/testlib.py +++ b/qpid/python/tests_0-8/testlib.py @@ -22,7 +22,7 @@ # from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase from Queue import Empty import sys diff --git a/qpid/python/tests_0-8/tx.py b/qpid/python/tests_0-8/tx.py index 054fb8d8b7..9faddb1110 100644 --- a/qpid/python/tests_0-8/tx.py +++ b/qpid/python/tests_0-8/tx.py @@ -19,7 +19,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class TxTests(TestBase): """ diff --git a/qpid/python/tests_0-9/__init__.py b/qpid/python/tests_0-9/__init__.py index 9a09d2d04f..d9f2ed7dbb 100644 --- a/qpid/python/tests_0-9/__init__.py +++ b/qpid/python/tests_0-9/__init__.py @@ -18,3 +18,5 @@ # specific language governing permissions and limitations # under the License. # + +import query, queue diff --git a/qpid/python/tests_0-9/basic.py b/qpid/python/tests_0-9/basic.py deleted file mode 100644 index 607ba26343..0000000000 --- a/qpid/python/tests_0-9/basic.py +++ /dev/null @@ -1,396 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BasicTests(TestBase): - """Tests for 'methods' on the amqp basic 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") - channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) - channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.basic_consume(consumer_tag="first", queue="test-queue-2") - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.basic_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.basic_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - try: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") - channel.basic_publish(routing_key="test-queue-4", content=Content("One")) - - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) - - #cancel should stop messages being delivered - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_cancel(consumer_tag="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - subscription = channel.basic_consume(queue="test-requeue", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - channel.basic_publish(routing_key="test-requeue", content=Content("One")) - channel.basic_publish(routing_key="test-requeue", content=Content("Two")) - channel.basic_publish(routing_key="test-requeue", content=Content("Three")) - channel.basic_publish(routing_key="test-requeue", content=Content("Four")) - channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_cancel(consumer_tag=subscription.consumer_tag) - - channel.basic_recover(requeue=True) - - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.content.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.content.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 5: - channel.basic_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.basic_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) - - def test_get(self): - """ - Test basic_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - for i in range(11, 21): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): - channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - #recover(requeue=True) - channel.basic_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_ok") - self.assertEqual("Message %d" % i, reply.content.body) - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") - - channel.basic_recover(requeue=True) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get_empty") diff --git a/qpid/python/tests_0-9/broker.py b/qpid/python/tests_0-9/broker.py deleted file mode 100644 index 03b4132d3e..0000000000 --- a/qpid/python/tests_0-9/broker.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BrokerTests(TestBase): - """Tests for basic Broker functionality""" - - def test_ack_and_no_ack(self): - """ - First, this test tries to receive a message with a no-ack - consumer. Second, this test tries to explicitly receive and - acknowledge a message with an acknowledging consumer. - """ - ch = self.channel - self.queue_declare(ch, queue = "myqueue") - - # No ack consumer - ctag = "tag1" - ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True) - body = "test no-ack" - ch.message_transfer(routing_key = "myqueue", body = body) - msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.body == body) - - # Acknowledging consumer - self.queue_declare(ch, queue = "otherqueue") - ctag = "tag2" - ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False) - body = "test ack" - ch.message_transfer(routing_key = "otherqueue", body = body) - msg = self.client.queue(ctag).get(timeout = 5) - msg.ok() - self.assert_(msg.body == body) - - def test_simple_delivery_immediate(self): - """ - Test simple message delivery where consume is issued before publish - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - consumer_tag = "tag1" - channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) - queue = self.client.queue(consumer_tag) - - body = "Immediate Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True) - msg = queue.get(timeout=5) - self.assert_(msg.body == body) - - # TODO: Ensure we fail if immediate=True and there's no consumer. - - - def test_simple_delivery_queued(self): - """ - Test basic message delivery where publish is issued before consume - (i.e. requires queueing of the message) - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - body = "Queued Delivery" - channel.message_transfer(destination="test-exchange", routing_key="key", body=body) - - consumer_tag = "tag1" - channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) - queue = self.client.queue(consumer_tag) - msg = queue.get(timeout=5) - self.assert_(msg.body == body) - - def test_invalid_channel(self): - channel = self.client.channel(200) - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for invalid channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_closed_channel(self): - channel = self.client.channel(200) - channel.channel_open() - channel.channel_close() - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for closed channel") - except Closed, e: - if isinstance(e.args[0], str): self.fail(e) - self.assertConnectionException(504, e.args[0]) - - def test_ping_pong(self): - channel = self.channel - reply = channel.channel_ping() - self.assertEqual(reply.method.klass.name, "channel") - self.assertEqual(reply.method.name, "ok") - #todo: provide a way to get notified of incoming pongs... - - def test_channel_flow(self): - channel = self.channel - channel.queue_declare(queue="flow_test_queue", exclusive=True) - channel.message_consume(destination="my-tag", queue="flow_test_queue") - incoming = self.client.queue("my-tag") - - channel.channel_flow(active=False) - channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz") - try: - incoming.get(timeout=1) - self.fail("Received message when flow turned off.") - except Empty: None - - channel.channel_flow(active=True) - msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body) diff --git a/qpid/python/tests_0-9/dtx.py b/qpid/python/tests_0-9/dtx.py deleted file mode 100644 index bc268f4129..0000000000 --- a/qpid/python/tests_0-9/dtx.py +++ /dev/null @@ -1,587 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from struct import pack, unpack -from time import sleep - -class DtxTests(TestBase): - """ - Tests for the amqp dtx related classes. - - Tests of the form test_simple_xxx test the basic transactional - behaviour. The approach here is to 'swap' a message from one queue - to another by consuming and re-publishing in the same - transaction. That transaction is then completed in different ways - and the appropriate result verified. - - The other tests enforce more specific rules and behaviour on a - per-method or per-field basis. - """ - - XA_RBROLLBACK = 1 - XA_RBTIMEOUT = 2 - XA_OK = 8 - - def test_simple_commit(self): - """ - Test basic one-phase commit behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "commit") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags) - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("commit", "queue-b") - - def test_simple_prepare_commit(self): - """ - Test basic two-phase commit behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "prepare-commit") - - #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags) - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("prepare-commit", "queue-b") - - - def test_simple_rollback(self): - """ - Test basic rollback behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "rollback") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("rollback", "queue-a") - - def test_simple_prepare_rollback(self): - """ - Test basic rollback behaviour after the transaction has been prepared. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "prepare-rollback") - - #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("prepare-rollback", "queue-a") - - def test_select_required(self): - """ - check that an error is flagged if select is not issued before - start or end - """ - channel = self.channel - tx = self.xid("dummy") - try: - channel.dtx_demarcation_start(xid=tx) - - #if we get here we have failed, but need to do some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Channel not selected for use with dtx, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_start_already_known(self): - """ - Verify that an attempt to start an association with a - transaction that is already known is not allowed (unless the - join flag is set). - """ - #create two channels on different connection & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() - - other = self.connect() - channel2 = other.channel(1) - channel2.channel_open() - channel2.dtx_demarcation_select() - - #create a xid - tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) - #then start on the other without the join set - failed = False - try: - channel2.dtx_demarcation_start(xid=tx) - except Closed, e: - failed = True - error = e - - #cleanup: - if not failed: - channel2.dtx_demarcation_end(xid=tx) - other.close() - channel1.dtx_demarcation_end(xid=tx) - channel1.dtx_coordination_rollback(xid=tx) - - #verification: - if failed: self.assertConnectionException(503, e.args[0]) - else: self.fail("Xid already known, expected exception!") - - def test_forget_xid_on_completion(self): - """ - Verify that a xid is 'forgotten' - and can therefore be used - again - once it is completed. - """ - channel = self.channel - #do some transactional work & complete the transaction - self.test_simple_commit() - - #start association for the same xid as the previously completed txn - tx = self.xid("my-xid") - channel.dtx_demarcation_start(xid=tx) - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - - def test_start_join_and_resume(self): - """ - Ensure the correct error is signalled when both the join and - resume flags are set on starting an association between a - channel and a transcation. - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("dummy") - try: - channel.dtx_demarcation_start(xid=tx, join=True, resume=True) - #failed, but need some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Join and resume both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_start_join(self): - """ - Verify 'join' behaviour, where a channel is associated with a - transaction that is already associated with another channel. - """ - #create two channels & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() - - channel2 = self.client.channel(2) - channel2.channel_open() - channel2.dtx_demarcation_select() - - #setup - channel1.queue_declare(queue="one", exclusive=True) - channel1.queue_declare(queue="two", exclusive=True) - channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") - - #create a xid - tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) - #then start on the other with the join flag set - channel2.dtx_demarcation_start(xid=tx, join=True) - - #do work through each channel - self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' - self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' - - #mark end on both channels - channel1.dtx_demarcation_end(xid=tx) - channel2.dtx_demarcation_end(xid=tx) - - #commit and check - channel1.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - - def test_suspend_resume(self): - """ - Test suspension and resumption of an association - """ - channel = self.channel - channel.dtx_demarcation_select() - - #setup - channel.queue_declare(queue="one", exclusive=True) - channel.queue_declare(queue="two", exclusive=True) - channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") - - tx = self.xid("dummy") - - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) - - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) - - #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - def test_end_suspend_and_fail(self): - """ - Verify that the correct error is signalled if the suspend and - fail flag are both set when disassociating a transaction from - the channel - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("suspend_and_fail") - channel.dtx_demarcation_start(xid=tx) - try: - channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) - self.fail("Suspend and fail both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.dtx_coordination_rollback(xid=tx) - channel.channel_close() - other.close() - - - def test_end_unknown_xid(self): - """ - Verifies that the correct exception is thrown when an attempt - is made to end the association for a xid not previously - associated with the channel - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("unknown-xid") - try: - channel.dtx_demarcation_end(xid=tx) - self.fail("Attempted to end association with unknown xid, expected exception!") - except Closed, e: - #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... - self.assertConnectionException(503, e.args[0]) - - def test_end(self): - """ - Verify that the association is terminated by end and subsequent - operations are non-transactional - """ - channel = self.client.channel(2) - channel.channel_open() - channel.queue_declare(queue="tx-queue", exclusive=True) - - #publish a message under a transaction - channel.dtx_demarcation_select() - tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") - channel.dtx_demarcation_end(xid=tx) - - #now that association with txn is ended, publish another message - channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") - - #check the second message is available, but not the first - self.assertMessageCount(1, "tx-queue") - channel.message_consume(queue="tx-queue", destination="results", no_ack=False) - msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.message_id) - channel.message_cancel(destination="results") - #ack the message then close the channel - msg.ok() - channel.channel_close() - - channel = self.channel - #commit the transaction and check that the first message (and - #only the first message) is then delivered - channel.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "tx-queue") - self.assertMessageId("one", "tx-queue") - - def test_invalid_commit_one_phase_true(self): - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.channel(1) - tester.channel_open() - tester.queue_declare(queue="dummy", exclusive=True) - tester.dtx_demarcation_select() - tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") - tester.dtx_demarcation_end(xid=tx) - tester.dtx_coordination_prepare(xid=tx) - failed = False - try: - tester.dtx_coordination_commit(xid=tx, one_phase=True) - except Closed, e: - failed = True - error = e - - if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) - else: - tester.channel_close() - other.close() - self.fail("Invalid use of one_phase=True, expected exception!") - - def test_invalid_commit_one_phase_false(self): - """ - Test that a commit with one_phase = False is rejected if the - transaction in question has not yet been prepared. - """ - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.channel(1) - tester.channel_open() - tester.queue_declare(queue="dummy", exclusive=True) - tester.dtx_demarcation_select() - tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") - tester.dtx_demarcation_end(xid=tx) - failed = False - try: - tester.dtx_coordination_commit(xid=tx, one_phase=False) - except Closed, e: - failed = True - error = e - - if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) - else: - tester.channel_close() - other.close() - self.fail("Invalid use of one_phase=False, expected exception!") - - def test_implicit_end(self): - """ - Test that an association is implicitly ended when the channel - is closed (whether by exception or explicit client request) - and the transaction in question is marked as rollback only. - """ - channel1 = self.channel - channel2 = self.client.channel(2) - channel2.channel_open() - - #setup: - channel2.queue_declare(queue="dummy", exclusive=True) - channel2.message_transfer(routing_key="dummy", body="whatever") - tx = self.xid("dummy") - - channel2.dtx_demarcation_select() - channel2.dtx_demarcation_start(xid=tx) - channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() - channel2.message_transfer(routing_key="dummy", body="whatever") - channel2.channel_close() - - self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) - channel1.dtx_coordination_rollback(xid=tx) - - def test_get_timeout(self): - """ - Check that get-timeout returns the correct value, (and that a - transaction with a timeout can complete normally) - """ - channel = self.channel - tx = self.xid("dummy") - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) - channel.dtx_coordination_set_timeout(xid=tx, timeout=60) - self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) - self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags) - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - def test_set_timeout(self): - """ - Test the timeout of a transaction results in the expected - behaviour - """ - #open new channel to allow self.channel to be used in checking te queue - channel = self.client.channel(2) - channel.channel_open() - #setup: - tx = self.xid("dummy") - channel.queue_declare(queue="queue-a", exclusive=True) - channel.queue_declare(queue="queue-b", exclusive=True) - channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "queue-a", "queue-b") - channel.dtx_coordination_set_timeout(xid=tx, timeout=2) - sleep(3) - #check that the work has been rolled back already - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("timeout", "queue-a") - #check the correct codes are returned when we try to complete the txn - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags) - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags) - - - - def test_recover(self): - """ - Test basic recover behaviour - """ - channel = self.channel - - channel.dtx_demarcation_select() - channel.queue_declare(queue="dummy", exclusive=True) - - prepared = [] - for i in range(1, 10): - tx = self.xid("tx%s" % (i)) - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="dummy", body="message%s" % (i)) - channel.dtx_demarcation_end(xid=tx) - if i in [2, 5, 6, 8]: - channel.dtx_coordination_prepare(xid=tx) - prepared.append(tx) - else: - channel.dtx_coordination_rollback(xid=tx) - - indoubt = channel.dtx_coordination_recover().xids - #convert indoubt table to a list of xids (note: this will change for 0-10) - data = indoubt["xids"] - xids = [] - pos = 0 - while pos < len(data): - size = unpack("!B", data[pos])[0] - start = pos + 1 - end = start + size - xid = data[start:end] - xids.append(xid) - pos = end - - #rollback the prepared transactions returned by recover - for x in xids: - channel.dtx_coordination_rollback(xid=x) - - #validate against the expected list of prepared transactions - actual = set(xids) - expected = set(prepared) - intersection = actual.intersection(expected) - - if intersection != expected: - missing = expected.difference(actual) - extra = actual.difference(expected) - for x in missing: - channel.dtx_coordination_rollback(xid=x) - self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - - def xid(self, txid, branchqual = ''): - return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual - - def txswap(self, tx, id): - channel = self.channel - #declare two queues: - channel.queue_declare(queue="queue-a", exclusive=True) - channel.queue_declare(queue="queue-b", exclusive=True) - #put message with specified id on one queue: - channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") - - #start the transaction: - channel.dtx_demarcation_select() - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags) - - #'swap' the message from one queue to the other, under that transaction: - self.swap(self.channel, "queue-a", "queue-b") - - #mark the end of the transactional work: - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags) - - def swap(self, channel, src, dest): - #consume from src: - channel.message_get(destination="temp-swap", queue=src) - msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); - - #re-publish to dest - channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) - - def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) - - def assertMessageId(self, expected, queue): - self.channel.message_consume(queue=queue, destination="results", no_ack=True) - self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) - self.channel.message_cancel(destination="results") diff --git a/qpid/python/tests_0-9/example.py b/qpid/python/tests_0-9/example.py deleted file mode 100644 index 7ab4cc7d0a..0000000000 --- a/qpid/python/tests_0-9/example.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class ExampleTest (TestBase): - """ - An example Qpid test, illustrating the unittest frameowkr and the - python Qpid client. The test class must inherit TestCase. The - test code uses the Qpid client to interact with a qpid broker and - verify it behaves as expected. - """ - - def test_example(self): - """ - An example test. Note that test functions must start with 'test_' - to be recognized by the test framework. - """ - - # By inheriting TestBase, self.client is automatically connected - # and self.channel is automatically opened as channel(1) - # Other channel methods mimic the protocol. - channel = self.channel - - # Now we can send regular commands. If you want to see what the method - # arguments mean or what other commands are available, you can use the - # python builtin help() method. For example: - #help(chan) - #help(chan.exchange_declare) - - # If you want browse the available protocol methods without being - # connected to a live server you can use the amqp-doc utility: - # - # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] - # - # Options: - # -e, --regexp use regex instead of glob when matching - - # Now that we know what commands are available we can use them to - # interact with the server. - - # Here we use ordinal arguments. - self.exchange_declare(channel, 0, "test", "direct") - - # Here we use keyword arguments. - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") - - # Call Channel.basic_consume to register as a consumer. - # All the protocol methods return a message object. The message object - # has fields corresponding to the reply method fields, plus a content - # field that is filled if the reply includes content. In this case the - # interesting field is the consumer_tag. - channel.message_consume(queue="test-queue", destination="consumer_tag") - - # We can use the Client.queue(...) method to access the queue - # corresponding to our consumer_tag. - queue = self.client.queue("consumer_tag") - - # Now lets publish a message and see if our consumer gets it. To do - # this we need to import the Content class. - body = "Hello World!" - channel.message_transfer(destination="test", - routing_key="key", - body = body) - - # Now we'll wait for the message to arrive. We can use the timeout - # argument in case the server hangs. By default queue.get() will wait - # until a message arrives or the connection to the server dies. - msg = queue.get(timeout=10) - - # And check that we got the right response with assertEqual - self.assertEqual(body, msg.body) - - # Now acknowledge the message. - msg.ok() - diff --git a/qpid/python/tests_0-9/exchange.py b/qpid/python/tests_0-9/exchange.py deleted file mode 100644 index 3a47ffff8c..0000000000 --- a/qpid/python/tests_0-9/exchange.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Tests for exchange behaviour. - -Test classes ending in 'RuleTests' are derived from rules in amqp.xml. -""" - -import Queue, logging -from qpid.testlib import TestBase -from qpid.content import Content -from qpid.client import Closed - - -class StandardExchangeVerifier: - """Verifies standard exchange behavior. - - Used as base class for classes that test standard exchanges.""" - - def verifyDirectExchange(self, ex): - """Verify that ex behaves like a direct exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") - self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") - try: - self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") - self.fail("Expected Empty exception") - except Queue.Empty: None # Expected - - def verifyFanOutExchange(self, ex): - """Verify that ex behaves like a fanout exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex) - self.queue_declare(queue="p") - self.channel.queue_bind(queue="p", exchange=ex) - for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) - - def verifyTopicExchange(self, ex): - """Verify that ex behaves like a topic exchange""" - self.queue_declare(queue="a") - self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") - q = self.consume("a") - self.assertPublishGet(q, ex, "a.b.x") - self.assertPublishGet(q, ex, "a.x.b.x") - self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="") - self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="") - self.channel.message_transfer(destination=ex, routing_key="a.b", body="") - self.assert_(q.empty()) - - def verifyHeadersExchange(self, ex): - """Verify that ex is a headers exchange""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) - q = self.consume("q") - headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties=headers) - self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver - self.assertEmpty(q); - - -class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server SHOULD implement these standard exchange types: topic, headers. - - Client attempts to declare an exchange with each of these standard types. - """ - - def testDirect(self): - """Declare and test a direct exchange""" - self.exchange_declare(0, exchange="d", type="direct") - self.verifyDirectExchange("d") - - def testFanout(self): - """Declare and test a fanout exchange""" - self.exchange_declare(0, exchange="f", type="fanout") - self.verifyFanOutExchange("f") - - def testTopic(self): - """Declare and test a topic exchange""" - self.exchange_declare(0, exchange="t", type="topic") - self.verifyTopicExchange("t") - - def testHeaders(self): - """Declare and test a headers exchange""" - self.exchange_declare(0, exchange="h", type="headers") - self.verifyHeadersExchange("h") - - -class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST, in each virtual host, pre-declare an exchange instance - for each standard exchange type that it implements, where the name of the - exchange instance is amq. followed by the exchange type name. - - Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if - those types are defined). - """ - def testAmqDirect(self): self.verifyDirectExchange("amq.direct") - - def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") - - def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - - def testAmqMatch(self): self.verifyHeadersExchange("amq.match") - -class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST predeclare a direct exchange to act as the default exchange - for content Publish methods and for default queue bindings. - - Client checks that the default exchange is active by specifying a queue - binding with no exchange name, and publishing a message with a suitable - routing key but without specifying the exchange name, then ensuring that - the message arrives in the queue correctly. - """ - def testDefaultExchange(self): - # Test automatic binding by queue name. - self.queue_declare(queue="d") - self.assertPublishConsume(queue="d", routing_key="d") - # Test explicit bind to default queue - self.verifyDirectExchange("") - - -# TODO aconway 2006-09-27: Fill in empty tests: - -class DefaultAccessRuleTests(TestBase): - """ - The server MUST NOT allow clients to access the default exchange except - by specifying an empty exchange name in the Queue.Bind and content Publish - methods. - """ - -class ExtensionsRuleTests(TestBase): - """ - The server MAY implement other exchange types as wanted. - """ - - -class DeclareMethodMinimumRuleTests(TestBase): - """ - The server SHOULD support a minimum of 16 exchanges per virtual host and - ideally, impose no limit except as defined by available resources. - - The client creates as many exchanges as it can until the server reports - an error; the number of exchanges successfuly created must be at least - sixteen. - """ - - -class DeclareMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access to - the realm in which the exchange exists or will be created, or "passive" - access if the if-exists flag is set. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeclareMethodExchangeFieldReservedRuleTests(TestBase): - """ - Exchange names starting with "amq." are reserved for predeclared and - standardised exchanges. The client MUST NOT attempt to create an exchange - starting with "amq.". - - - """ - - -class DeclareMethodTypeFieldTypedRuleTests(TestBase): - """ - Exchanges cannot be redeclared with different types. The client MUST not - attempt to redeclare an existing exchange with a different type than used - in the original Exchange.Declare method. - - - """ - - -class DeclareMethodTypeFieldSupportRuleTests(TestBase): - """ - The client MUST NOT attempt to create an exchange with a type that the - server does not support. - - - """ - - -class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): - """ - If set, and the exchange does not already exist, the server MUST raise a - channel exception with reply code 404 (not found). - """ - def test(self): - try: - self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) - self.fail("Expected 404 for passive declaration of unknown exchange.") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - -class DeclareMethodDurableFieldSupportRuleTests(TestBase): - """ - The server MUST support both durable and transient exchanges. - - - """ - - -class DeclareMethodDurableFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the durable field if the exchange already exists. - - - """ - - -class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the auto-delete field if the exchange already - exists. - - - """ - - -class DeleteMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access - rights to the exchange's access realm. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeleteMethodExchangeFieldExistsRuleTests(TestBase): - """ - The client MUST NOT attempt to delete an exchange that does not exist. - """ - - -class HeadersExchangeTests(TestBase): - """ - Tests for headers exchange functionality. - """ - def setUp(self): - TestBase.setUp(self) - self.queue_declare(queue="q") - self.q = self.consume("q") - - def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties=headers) - - def myBasicPublish(self, headers): - self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers) - - def testMatchAll(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) - - # None of these should match - self.myBasicPublish({}) - self.myBasicPublish({"name":"barney"}) - self.myBasicPublish({"name":10}) - self.myBasicPublish({"name":"fred", "age":2}) - self.assertEmpty(self.q) - - def testMatchAny(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred"}) - self.myAssertPublishGet({"name":"fred", "ignoreme":10}) - self.myAssertPublishGet({"ignoreme":10, "age":3}) - - # Wont match - self.myBasicPublish({}) - self.myBasicPublish({"irrelevant":0}) - self.assertEmpty(self.q) - - -class MiscellaneousErrorsTests(TestBase): - """ - Test some miscellaneous error conditions - """ - def testTypeNotKnown(self): - try: - self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 503 for declaration of unknown exchange type.") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def testDifferentDeclaredType(self): - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") - try: - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") - self.fail("Expected 530 for redeclaration of exchange with different type.") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - #cleanup - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - c2.exchange_delete(exchange="test_different_declared_type_exchange") - diff --git a/qpid/python/tests_0-9/execution.py b/qpid/python/tests_0-9/execution.py deleted file mode 100644 index f2facfe42b..0000000000 --- a/qpid/python/tests_0-9/execution.py +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class ExecutionTests (TestBase): - def test_flush(self): - channel = self.channel - for i in [1, 2, 3]: - channel.basic_publish() - channel.execution_flush() - assert(channel.completion.wait(channel.completion.command_id, timeout=1)) diff --git a/qpid/python/tests_0-9/message.py b/qpid/python/tests_0-9/message.py deleted file mode 100644 index b25016e680..0000000000 --- a/qpid/python/tests_0-9/message.py +++ /dev/null @@ -1,657 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from qpid.reference import Reference, ReferenceId - -class MessageTests(TestBase): - """Tests for 'methods' on the amqp message 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.message_consume(destination="local_included", queue="test-queue-1a") - channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") - channel.message_transfer(routing_key="test-queue-1b", body="consume_no_local") - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.message_consume(destination="first", queue="test-queue-2", exclusive=True) - try: - channel.message_consume(destination="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.message_consume(destination="first", queue="test-queue-2") - try: - channel.message_consume(destination="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.message_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.message_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.message_consume(destination="first", queue="test-queue-3") - try: - channel.message_consume(destination="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.message_consume(destination="my-consumer", queue="test-queue-4") - channel.message_transfer(routing_key="test-queue-4", body="One") - - #cancel should stop messages being delivered - channel.message_cancel(destination="my-consumer") - channel.message_transfer(routing_key="test-queue-4", body="Two") - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.body) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.message_cancel(destination="my-consumer") - channel.message_cancel(destination="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - channel.message_transfer(routing_key="test-ack-queue", body="One") - channel.message_transfer(routing_key="test-ack-queue", body="Two") - channel.message_transfer(routing_key="test-ack-queue", body="Three") - channel.message_transfer(routing_key="test-ack-queue", body="Four") - channel.message_transfer(routing_key="test-ack-queue", body="Five") - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) - - msg1.ok(batchoffset=1)#One and Two - msg4.ok() - - channel.message_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - channel.message_transfer(routing_key="test-requeue", body="One") - channel.message_transfer(routing_key="test-requeue", body="Two") - channel.message_transfer(routing_key="test-requeue", body="Three") - channel.message_transfer(routing_key="test-requeue", body="Four") - channel.message_transfer(routing_key="test-requeue", body="Five") - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.body) - self.assertEqual("Two", msg2.body) - self.assertEqual("Three", msg3.body) - self.assertEqual("Four", msg4.body) - self.assertEqual("Five", msg5.body) - - msg1.ok(batchoffset=1) #One and Two - msg4.ok() #Four - - channel.message_cancel(destination="consumer_tag") - - #publish a new message - channel.message_transfer(routing_key="test-requeue", body="Six") - #requeue unacked messages (Three and Five) - channel.message_recover(requeue=True) - - channel.message_consume(queue="test-requeue", destination="consumer_tag") - queue2 = self.client.queue("consumer_tag") - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.body) - self.assertEqual("Five", msg5b.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - self.assertEqual("Six", queue2.get(timeout=1).body) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - #set prefetch to 5: - channel.message_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - #todo: once batching is implmented, send a single response for all messages - msg.ok(batchoffset=-4)#1-5 - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - msg.ok(batchoffset=-4)#6-10 - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False) - queue = self.client.queue("consumer_tag") - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.message_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - msg.ok(batchoffset=-4)#1-5 - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - msg.ok(batchoffset=-4)#6-10 - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.message_transfer(routing_key="test-prefetch-size", body=large) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.body) - - def test_get(self): - """ - Test message_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) - for i in range(1, 11): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - #use message_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - tag = "queue %d" % i - reply = channel.message_get(no_ack=True, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #repeat for no_ack=False - for i in range(11, 21): - channel.message_transfer(routing_key="test-get", body="Message %d" % i) - - for i in range(11, 21): - tag = "queue %d" % i - reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - if (i==13): - msg.ok(batchoffset=-2)#11, 12 & 13 - if(i in [15, 17, 19]): - msg.ok() - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - #recover(requeue=True) - channel.message_recover(requeue=True) - - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: - tag = "queue %d" % i - reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "ok") - msg = self.client.queue(tag).get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.ok() - #channel.message_ack(delivery_tag=reply.delivery_tag) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - channel.message_recover(requeue=True) - - reply = channel.message_get(no_ack=True, queue="test-get") - self.assertEqual(reply.method.klass.name, "message") - self.assertEqual(reply.method.name, "empty") - - def test_reference_simple(self): - """ - Test basic ability to handle references - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - channel.message_append(reference=refId, bytes="efgh") - channel.message_append(reference=refId, bytes="ijkl") - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl") - - - def test_reference_large(self): - """ - Test basic ability to handle references whose content exceeds max frame size - """ - channel = self.channel - self.queue_declare(queue="ref_queue") - - #generate a big data string (> max frame size of consumer): - data = "0123456789" - for i in range(0, 10): - data += data - #send it inline - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=data) - channel.synchronous = True - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - #create a new connection for consumer, with specific max frame size (< data) - other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) - ch2 = other.channel(1) - ch2.channel_open() - ch2.message_consume(queue="ref_queue", destination="c1") - queue = other.queue("c1") - - msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg.body, ReferenceId)) - self.assertTrue(msg.reference) - self.assertEquals(data, msg.reference.get_complete()) - - def test_reference_completion(self): - """ - Test that reference transfer are not deemed complete until - closed (therefore are not acked or routed until that point) - """ - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.message_append(reference=refId, bytes="abcd") - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId)) - channel.synchronous = True - - try: - msg = queue.get(timeout=1) - self.fail("Got unexpected message on queue: " + msg) - except Empty: None - - self.assertTrue(not ack.is_complete()) - - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - self.assertDataEquals(channel, queue.get(timeout=1), "abcd") - - def test_reference_multi_transfer(self): - """ - Test that multiple transfer requests for the same reference are - correctly handled. - """ - channel = self.channel - #declare and consume from two queues - channel.queue_declare(queue="q-one", exclusive=True) - channel.queue_declare(queue="q-two", exclusive=True) - channel.message_consume(queue="q-one", destination="q-one") - channel.message_consume(queue="q-two", destination="q-two") - queue1 = self.client.queue("q-one") - queue2 = self.client.queue("q-two") - - #transfer a single ref to both queues (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="my data") - ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - #check that both queues have the message - self.assertDataEquals(channel, queue1.get(timeout=1), "my data") - self.assertDataEquals(channel, queue2.get(timeout=1), "my data") - self.assertEmpty(queue1) - self.assertEmpty(queue2) - - #transfer a single ref to the same queue twice (in separate commands) - channel.message_open(reference="my-ref") - channel.synchronous = False - ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) - channel.message_append(reference="my-ref", bytes="second message") - ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - msg1 = queue1.get(timeout=1) - msg2 = queue1.get(timeout=1) - #order is undefined - if msg1.message_id == "abc": - self.assertEquals(msg2.message_id, "xyz") - else: - self.assertEquals(msg1.message_id, "xyz") - self.assertEquals(msg2.message_id, "abc") - - #would be legal for the incoming messages to be transfered - #inline or by reference in any combination - - if isinstance(msg1.body, ReferenceId): - self.assertEquals("second message", msg1.reference.get_complete()) - if isinstance(msg2.body, ReferenceId): - if msg1.body != msg2.body: - self.assertEquals("second message", msg2.reference.get_complete()) - #else ok, as same ref as msg1 - else: - self.assertEquals("second message", msg1.body) - if isinstance(msg2.body, ReferenceId): - self.assertEquals("second message", msg2.reference.get_complete()) - else: - self.assertEquals("second message", msg2.body) - - self.assertEmpty(queue1) - - def test_reference_unopened_on_append_error(self): - channel = self.channel - try: - channel.message_append(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_close_error(self): - channel = self.channel - try: - channel.message_close(reference="unopened") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_unopened_on_transfer_error(self): - channel = self.channel - try: - channel.message_transfer(body=ReferenceId("unopened")) - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_reference_already_opened_error(self): - channel = self.channel - channel.message_open(reference="a") - try: - channel.message_open(reference="a") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_empty_reference(self): - channel = self.channel - channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") - queue = self.client.queue("c1") - - refId = "myref" - channel.message_open(reference=refId) - channel.synchronous = False - ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) - channel.synchronous = True - channel.message_close(reference=refId) - - #first, wait for the ok for the transfer - ack.get_response(timeout=1) - - msg = queue.get(timeout=1) - self.assertEquals(msg.message_id, "empty-msg") - self.assertDataEquals(channel, msg, "") - - def test_reject(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_consume(queue = "q", destination = "consumer") - channel.message_transfer(routing_key = "q", body="blah, blah") - msg = self.client.queue("consumer").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - channel.message_cancel(destination = "consumer") - msg.reject() - - channel.message_consume(queue = "q", destination = "checker") - msg = self.client.queue("checker").get(timeout = 1) - self.assertEquals(msg.body, "blah, blah") - - def test_checkpoint(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True) - - channel.message_open(reference="my-ref") - channel.message_append(reference="my-ref", bytes="abcdefgh") - channel.message_append(reference="my-ref", bytes="ijklmnop") - channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") - channel.channel_close() - - channel = self.client.channel(2) - channel.channel_open() - channel.message_consume(queue = "q", destination = "consumer") - offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value - self.assertTrue(offset<=16) - channel.message_append(reference="my-ref", bytes="qrstuvwxyz") - channel.synchronous = False - channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) - channel.synchronous = True - channel.message_close(reference="my-ref") - - self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") - self.assertEmpty(self.client.queue("consumer")) - - - def assertDataEquals(self, channel, msg, expected): - if isinstance(msg.body, ReferenceId): - data = msg.reference.get_complete() - else: - data = msg.body - self.assertEquals(expected, data) diff --git a/qpid/python/tests_0-9/query.py b/qpid/python/tests_0-9/query.py index c2e08c003c..cb66d079e5 100644 --- a/qpid/python/tests_0-9/query.py +++ b/qpid/python/tests_0-9/query.py @@ -19,7 +19,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class QueryTests(TestBase): """Tests for various query methods introduced in 0-10 and available in 0-9 for preview""" diff --git a/qpid/python/tests_0-9/queue.py b/qpid/python/tests_0-9/queue.py index e7fe0b3ed4..de1153307c 100644 --- a/qpid/python/tests_0-9/queue.py +++ b/qpid/python/tests_0-9/queue.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,137 +19,11 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" - def test_purge(self): - """ - Test that the purge method removes messages from the queue - """ - channel = self.channel - #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", routing_key="key", body="one") - channel.message_transfer(destination="test-exchange", routing_key="key", body="two") - channel.message_transfer(destination="test-exchange", routing_key="key", body="three") - - #check that the queue now reports 3 messages: - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(3, reply.message_count) - - #now do the purge, then test that three messages are purged and the count drops to 0 - reply = channel.queue_purge(queue="test-queue"); - self.assertEqual(3, reply.message_count) - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(0, reply.message_count) - - #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", routing_key="key", body="four") - channel.message_consume(queue="test-queue", destination="tag", no_ack=True) - queue = self.client.queue("tag") - msg = queue.get(timeout=1) - self.assertEqual("four", msg.body) - - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.channel_open() - try: - #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") - self.fail("Expected failure when purging non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(3) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.queue_purge() - self.fail("Expected failure when purging unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.exchange_delete(exchange="test-exchange") - - def test_declare_exclusive(self): - """ - Test that the exclusive field is honoured in queue.declare - """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel - # Here we open a second separate connection: - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - - #declare an exclusive queue: - c1.queue_declare(queue="exclusive-queue", exclusive="True") - try: - #other connection should not be allowed to declare this: - c2.queue_declare(queue="exclusive-queue", exclusive="True") - self.fail("Expected second exclusive queue_declare to raise a channel exception") - except Closed, e: - self.assertChannelException(405, e.args[0]) - - - def test_declare_passive(self): - """ - Test that the passive field is honoured in queue.declare - """ - channel = self.channel - #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive="True") - channel.queue_declare(queue="passive-queue-1", passive="True") - try: - #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive="True") - self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - def test_bind(self): - """ - Test various permutations of the queue.bind method - """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive="True") - - #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") - - #bind the default queue for the channel (i.e. last one declared): - channel.queue_bind(exchange="amq.direct", routing_key="key2") - - #use the queue name where neither routing key nor queue are specified: - channel.queue_bind(exchange="amq.direct") - - #try and bind to non-existant exchange - try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") - self.fail("Expected bind to non-existant exchange to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #need to reopen a channel: - channel = self.client.channel(2) - channel.channel_open() - - #try and bind non-existant queue: - try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") - self.fail("Expected bind of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") @@ -165,12 +39,12 @@ class QueueTests(TestBase): def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them channel = self.channel - + channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True) - channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True) + channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True) + channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True) queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -179,130 +53,29 @@ class QueueTests(TestBase): channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") - + channel.basic_publish(exchange=exchange, routing_key=routing_key, + content=Content("one", properties={"headers": headers})) + #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - + #send another message - channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") + channel.basic_publish(exchange=exchange, routing_key=routing_key, + content=Content("two", properties={"headers": headers})) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).body) + self.assertEquals("one", queue1.get(timeout=1).content.body) try: msg = queue1.get(timeout=1) self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).body) - self.assertEquals("two", queue2.get(timeout=1).body) + self.assertEquals("one", queue2.get(timeout=1).content.body) + self.assertEquals("two", queue2.get(timeout=1).content.body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) - except Empty: pass - - - def test_delete_simple(self): - """ - Test core queue deletion behaviour - """ - channel = self.channel - - #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.message_transfer(routing_key="delete-me", body="a") - channel.message_transfer(routing_key="delete-me", body="b") - channel.message_transfer(routing_key="delete-me", body="c") - reply = channel.queue_delete(queue="delete-me") - self.assertEqual(3, reply.message_count) - #check that it has gone be declaring passively - try: - channel.queue_declare(queue="delete-me", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.channel_open() - try: - channel.queue_delete(queue="i-dont-exist", if_empty="True") - self.fail("Expected delete of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - - def test_delete_ifempty(self): - """ - Test that if_empty field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive="True") - channel.message_transfer(routing_key="delete-me-2", body="message") - - #try to delete, but only if empty: - try: - channel.queue_delete(queue="delete-me-2", if_empty="True") - self.fail("Expected delete if_empty to fail for non-empty queue") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - #need new channel now: - channel = self.client.channel(2) - channel.channel_open() - - #empty queue: - channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) - queue = self.client.queue("consumer_tag") - msg = queue.get(timeout=1) - self.assertEqual("message", msg.body) - channel.message_cancel(destination="consumer_tag") - - #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty="True") - - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-2", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - def test_delete_ifunused(self): - """ - Test that if_unused field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive="True") - channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) - - #need new channel now: - channel2 = self.client.channel(2) - channel2.channel_open() - #try to delete, but only if empty: - try: - channel2.queue_delete(queue="delete-me-3", if_unused="True") - self.fail("Expected delete if_unused to fail for queue with existing consumer") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - - channel.message_cancel(destination="consumer_tag") - channel.queue_delete(queue="delete-me-3", if_unused="True") - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-3", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - + except Empty: pass def test_autodelete_shared(self): """ @@ -336,5 +109,3 @@ class QueueTests(TestBase): self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) - - diff --git a/qpid/python/tests_0-9/testlib.py b/qpid/python/tests_0-9/testlib.py deleted file mode 100644 index f345fbbd80..0000000000 --- a/qpid/python/tests_0-9/testlib.py +++ /dev/null @@ -1,66 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Tests for the testlib itself. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from Queue import Empty - -import sys -from traceback import * - -def mytrace(frame, event, arg): - print_stack(frame); - print "====" - return mytrace - -class TestBaseTest(TestBase): - """Verify TestBase functions work as expected""" - - def testAssertEmptyPass(self): - """Test assert empty works""" - self.queue_declare(queue="empty") - q = self.consume("empty") - self.assertEmpty(q) - try: - q.get(timeout=1) - self.fail("Queue is not empty.") - except Empty: None # Ignore - - def testAssertEmptyFail(self): - self.queue_declare(queue="full") - q = self.consume("full") - self.channel.message_transfer(routing_key="full", body="") - try: - self.assertEmpty(q); - self.fail("assertEmpty did not assert on non-empty queue") - except AssertionError: None # Ignore - - def testMessageProperties(self): - """Verify properties are passed with message""" - props={"x":1, "y":2} - self.queue_declare(queue="q") - q = self.consume("q") - self.assertPublishGet(q, routing_key="q", properties=props) - - - diff --git a/qpid/python/tests_0-9/tx.py b/qpid/python/tests_0-9/tx.py deleted file mode 100644 index 0f6b4f5cd1..0000000000 --- a/qpid/python/tests_0-9/tx.py +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class TxTests(TestBase): - """ - Tests for 'methods' on the amqp tx 'class' - """ - - def test_commit(self): - """ - Test that commited publishes are delivered and commited acks are not re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() - - #check results - for i in range(1, 5): - msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.body) - msg.ok() - - msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.body) - msg.ok() - - msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.body) - msg.ok() - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - channel.tx_commit() - - def test_auto_rollback(self): - """ - Test that a channel closed with an open transaction is effectively rolled back - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.ok() - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - msg.ok() - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - msg.ok() - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - channel.tx_commit() - - def test_rollback(self): - """ - Test that rolled back publishes are not delivered and rolled back acks are re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - msg.ok() - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - msg.ok() - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - msg.ok() - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.body) - except Empty: None - - #cleanup - channel.tx_commit() - - def perform_txn_work(self, channel, name_a, name_b, name_c): - """ - Utility method that does some setup and some work under a transaction. Used for testing both - commit and rollback - """ - #setup: - channel.queue_declare(queue=name_a, exclusive=True) - channel.queue_declare(queue=name_b, exclusive=True) - channel.queue_declare(queue=name_c, exclusive=True) - - key = "my_key_" + name_b - topic = "my_topic_" + name_c - - channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) - channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) - - for i in range(1, 5): - channel.message_transfer(routing_key=name_a, body="Message %d" % i) - - channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") - - channel.tx_select() - - #consume and ack messages - channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) - queue_a = self.client.queue("sub_a") - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.body) - - msg.ok(batchoffset=-3) - - channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) - queue_b = self.client.queue("sub_b") - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.body) - msg.ok() - - sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) - queue_c = self.client.queue("sub_c") - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.body) - msg.ok() - - #publish messages - for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) - - channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, body="TxMessage 7") - - return queue_a, queue_b, queue_c |