diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-19 16:54:44 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-19 16:54:44 +0000 |
commit | d66aca6eb24b89052e7e2ab05ade9fcd5398bb95 (patch) | |
tree | b576cec8a33157707585a5ca63b730765276ef69 /Final/python | |
parent | 96420dfa6bb12a4b9fce082d50e49910e3dc8779 (diff) | |
download | qpid-python-d66aca6eb24b89052e7e2ab05ade9fcd5398bb95.tar.gz |
Undoing the accidental move instead of a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@596363 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'Final/python')
32 files changed, 0 insertions, 4103 deletions
diff --git a/Final/python/LICENSE.txt b/Final/python/LICENSE.txt deleted file mode 100755 index 6b0b1270ff..0000000000 --- a/Final/python/LICENSE.txt +++ /dev/null @@ -1,203 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - diff --git a/Final/python/NOTICE.txt b/Final/python/NOTICE.txt deleted file mode 100644 index 32ccdb70c4..0000000000 --- a/Final/python/NOTICE.txt +++ /dev/null @@ -1,20 +0,0 @@ -========================================================================= -== NOTICE file corresponding to the section 4 d of == -== the Apache License, Version 2.0, == -== in this case for the Apache Qpid distribution. == -========================================================================= - -This product includes software developed by the Apache Software Foundation -(http://www.apache.org/). - -Please read the LICENSE.txt file present in the root directory of this -distribution. - - -Aside from contributions to the Apache Qpid project, this software also -includes (binary only): - - - None at this time - - - diff --git a/Final/python/README.txt b/Final/python/README.txt deleted file mode 100644 index 0a64f0e2f2..0000000000 --- a/Final/python/README.txt +++ /dev/null @@ -1,24 +0,0 @@ -= RUNNING THE PYTHON TESTS = - -The tests/ directory contains a collection of python unit tests to -exercise functions of a broker. - -Simplest way to run the tests: - - * Run a broker on the default port - - * ./run_tests - -For additional options: ./run_tests --help - - -== Expected failures == - -Until we complete functionality, tests may fail because the tested -functionality is missing in the broker. To skip expected failures -in the C++ or Java brokers: - - ./run_tests -I cpp_failing.txt - ./run_tests -I java_failing.txt - -If you fix a failure, please remove it from the corresponding list. diff --git a/Final/python/RELEASE_NOTES b/Final/python/RELEASE_NOTES deleted file mode 100644 index 7005aa83cb..0000000000 --- a/Final/python/RELEASE_NOTES +++ /dev/null @@ -1,25 +0,0 @@ -Apache Incubator Qpid Python M2 Release Notes -------------------------------------------- - -The Qpid M2 release contains support the for AMQP 0-8 specification. -You can access the 0-8 specification using the following link. -http://www.amqp.org/tikiwiki/tiki-index.php?page=Download - -For full details of Qpid capabilities, as they currently stand, see our -detailed project documentation at: - -http://cwiki.apache.org/confluence/pages/viewpage.action?pageId=28284 - -Please take time to go through the README file provided with the distro. - - -Known Issues/Outstanding Work ------------------------------ - -There are no known issues for the Phyton client. - - -M2 Tasks Completed -------------------- - -Bug QPID-467 Complete Interop Testing diff --git a/Final/python/amqp-doc b/Final/python/amqp-doc deleted file mode 100755 index 0e7f9e862a..0000000000 --- a/Final/python/amqp-doc +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import sys, re -from qpid.spec import load, pythonize -from getopt import gnu_getopt as getopt, GetoptError -from fnmatch import fnmatchcase as fnmatch - -def die(msg): - print >> sys.stderr, msg - sys.exit(1) - -def usage(msg = ""): - return ("""%s - -Usage %s [<options>] [<pattern_1> ... <pattern_n>] - -Options: - -e, --regexp use regex instead of glob when matching - -s, --spec <url> location of amqp.xml -""" % (msg, sys.argv[0])).strip() - -try: - opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="]) -except GetoptError, e: - die(str(e)) - -regexp = False -spec = "../specs/amqp.0-8.xml" -for k, v in opts: - if k == "-e" or k == "--regexp": regexp = True - if k == "-s" or k == "--spec": spec = v - -if regexp: - def match(pattern, value): - try: - return re.match(pattern, value) - except Exception, e: - die("error: '%s': %s" % (pattern, e)) -else: - def match(pattern, value): - return fnmatch(value, pattern) - -spec = load(spec) -methods = {} -patterns = args -for pattern in patterns: - for c in spec.classes: - for m in c.methods: - name = pythonize("%s_%s" % (c.name, m.name)) - if match(pattern, name): - methods[name] = m.define_method(name) - -if patterns: - if methods: - AMQP = type("AMQP[%s]" % ", ".join(patterns), (), methods) - else: - die("no matches") -else: - AMQP = spec.define_class("AMQP") - -help(AMQP) diff --git a/Final/python/cpp_failing.txt b/Final/python/cpp_failing.txt deleted file mode 100644 index e69de29bb2..0000000000 --- a/Final/python/cpp_failing.txt +++ /dev/null diff --git a/Final/python/doc/test-requirements.txt b/Final/python/doc/test-requirements.txt deleted file mode 100644 index a1ba414eb2..0000000000 --- a/Final/python/doc/test-requirements.txt +++ /dev/null @@ -1,10 +0,0 @@ - * start and stop server, possibly in different configurations, should - at least be able to specify host and port - - * initiate multiple connections/server - - * initiate multiple channels/connection - - * enable positive and negative tests for any protocol interaction - - * test harness must be as robust as possible to spec changes diff --git a/Final/python/java_failing.txt b/Final/python/java_failing.txt deleted file mode 100644 index e69de29bb2..0000000000 --- a/Final/python/java_failing.txt +++ /dev/null diff --git a/Final/python/pal2py b/Final/python/pal2py deleted file mode 100755 index 544151bf76..0000000000 --- a/Final/python/pal2py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import sys, os, xml - -from qpid.spec import load, pythonize -from textwrap import TextWrapper -from xml.sax.handler import ContentHandler - -class Block: - - def __init__(self, children): - self.children = children - - def emit(self, out): - for child in self.children: - if not hasattr(child, "emit"): - raise ValueError(child) - child.emit(out) - - if not self.children: - out.line("pass") - -class If: - - def __init__(self, expr, cons, alt = None): - self.expr = expr - self.cons = cons - self.alt = alt - - def emit(self, out): - out.line("if ") - self.expr.emit(out) - out.write(":") - out.level += 1 - self.cons.emit(out) - out.level -= 1 - if self.alt: - out.line("else:") - out.level += 1 - self.alt.emit(out) - out.level -= 1 - -class Stmt: - - def __init__(self, code): - self.code = code - - def emit(self, out): - out.line(self.code) - -class Expr: - - def __init__(self, code): - self.code = code - - def emit(self, out): - out.write(self.code) - -class Abort: - - def __init__(self, expr): - self.expr = expr - - def emit(self, out): - out.line("assert False, ") - self.expr.emit(out) - -WRAPPER = TextWrapper() - -def wrap(text): - return WRAPPER.wrap(" ".join(text.split())) - -class Doc: - - def __init__(self, text): - self.text = text - - def emit(self, out): - out.line('"""') - for line in wrap(self.text): - out.line(line) - out.line('"""') - -class Frame: - - def __init__(self, attrs): - self.attrs = attrs - self.children = [] - self.text = None - - def __getattr__(self, attr): - return self.attrs[attr] - -def isunicode(s): - if isinstance(s, str): - return False - for ch in s: - if ord(ch) > 127: - return True - return False - -def string_literal(s): - if s == None: - return None - if isunicode(s): - return "%r" % s - else: - return "%r" % str(s) - -TRUTH = { - "1": True, - "0": False, - "true": True, - "false": False - } - -LITERAL = { - "shortstr": string_literal, - "longstr": string_literal, - "bit": lambda s: TRUTH[s.lower()], - "longlong": lambda s: "%r" % long(s) - } - -def literal(s, field): - return LITERAL[field.type](s) - -def palexpr(s, field): - if s.startswith("$"): - return "msg.%s" % s[1:] - else: - return literal(s, field) - -class Translator(ContentHandler): - - def __init__(self, spec): - self.spec = spec - self.stack = [] - self.content = None - self.root = Frame(None) - self.push(self.root) - - def emit(self, out): - blk = Block(self.root.children) - blk.emit(out) - out.write("\n") - - def peek(self): - return self.stack[-1] - - def pop(self): - return self.stack.pop() - - def push(self, frame): - self.stack.append(frame) - - def startElement(self, name, attrs): - self.push(Frame(attrs)) - - def endElement(self, name): - frame = self.pop() - if hasattr(self, name): - child = getattr(self, name)(frame) - else: - child = self.handle(name, frame) - - if child: - self.peek().children.append(child) - - def characters(self, text): - frame = self.peek() - if frame.text: - frame.text += text - else: - frame.text = text - - def handle(self, name, frame): - for klass in self.spec.classes: - pyklass = pythonize(klass.name) - if name.startswith(pyklass): - name = name[len(pyklass) + 1:] - break - else: - raise ValueError("unknown class: %s" % name) - - for method in klass.methods: - pymethod = pythonize(method.name) - if name == pymethod: - break - else: - raise ValueError("unknown method: %s" % name) - - args = ["%s = %s" % (key, palexpr(val, method.fields.bypyname[key])) - for key, val in frame.attrs.items()] - if method.content and self.content: - args.append("content = %r" % string_literal(self.content)) - code = "ssn.%s_%s(%s)" % (pyklass, pymethod, ", ".join(args)) - if pymethod == "consume": - code = "consumer_tag = %s.consumer_tag" % code - return Stmt(code) - - def pal(self, frame): - return Block([Doc(frame.text)] + frame.children) - - def include(self, frame): - base, ext = os.path.splitext(frame.filename) - return Stmt("from %s import *" % base) - - def session(self, frame): - return Block([Stmt("cli = open()"), Stmt("ssn = cli.channel(0)"), - Stmt("ssn.channel_open()")] + frame.children) - - def empty(self, frame): - return If(Expr("msg == None"), Block(frame.children)) - - def abort(self, frame): - return Abort(Expr(string_literal(frame.text))) - - def wait(self, frame): - return Stmt("msg = ssn.queue(consumer_tag).get(timeout=%r)" % - (int(frame.timeout)/1000)) - - def basic_arrived(self, frame): - if frame.children: - return If(Expr("msg != None"), Block(frame.children)) - - def basic_content(self, frame): - self.content = frame.text - -class Emitter: - - def __init__(self, out): - self.out = out - self.level = 0 - - def write(self, code): - self.out.write(code) - - def line(self, code): - self.write("\n%s%s" % (" "*self.level, code)) - - def flush(self): - self.out.flush() - - def close(self): - self.out.close() - - -for f in sys.argv[2:]: - base, ext = os.path.splitext(f) - spec = load(sys.argv[1]) - t = Translator(spec) - xml.sax.parse(f, t) -# out = Emitter(open("%s.py" % base)) - out = Emitter(sys.stdout) - t.emit(out) - out.close() diff --git a/Final/python/qpid/__init__.py b/Final/python/qpid/__init__.py deleted file mode 100644 index 4363f175fb..0000000000 --- a/Final/python/qpid/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import spec, codec, connection, content, peer, delegate, client diff --git a/Final/python/qpid/client.py b/Final/python/qpid/client.py deleted file mode 100644 index b4a282f251..0000000000 --- a/Final/python/qpid/client.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -An AQMP client implementation that uses a custom delegate for -interacting with the server. -""" - -import threading -from peer import Peer, Closed -from delegate import Delegate -from connection import Connection, Frame -from spec import load -from queue import Queue - - -class Client: - - def __init__(self, host, port, spec, vhost = None): - self.host = host - self.port = port - self.spec = spec - - self.mechanism = None - self.response = None - self.locale = None - - self.vhost = vhost - if self.vhost == None: - self.vhost = self.host - - self.queues = {} - self.lock = threading.Lock() - - self.closed = False - self.started = threading.Event() - - self.conn = Connection(self.host, self.port, self.spec) - self.peer = Peer(self.conn, ClientDelegate(self)) - - def wait(self): - self.started.wait() - if self.closed: - raise EOFError() - - def queue(self, key): - self.lock.acquire() - try: - try: - q = self.queues[key] - except KeyError: - q = Queue(0) - self.queues[key] = q - finally: - self.lock.release() - return q - - def start(self, response, mechanism="AMQPLAIN", locale="en_US"): - self.mechanism = mechanism - self.response = response - self.locale = locale - - self.conn.connect() - self.conn.init() - self.peer.start() - self.wait() - self.channel(0).connection_open(self.vhost) - - def channel(self, id): - return self.peer.channel(id) - -class ClientDelegate(Delegate): - - def __init__(self, client): - Delegate.__init__(self) - self.client = client - - def connection_start(self, ch, msg): - ch.connection_start_ok(mechanism=self.client.mechanism, - response=self.client.response, - locale=self.client.locale) - - def connection_tune(self, ch, msg): - ch.connection_tune_ok(*msg.fields) - self.client.started.set() - - def basic_deliver(self, ch, msg): - self.client.queue(msg.consumer_tag).put(msg) - - def channel_close(self, ch, msg): - ch.close(msg) - - def connection_close(self, ch, msg): - self.client.peer.close(msg) - - def close(self, reason): - self.client.closed = True - self.client.started.set() diff --git a/Final/python/qpid/codec.py b/Final/python/qpid/codec.py deleted file mode 100644 index 69c7ca8afa..0000000000 --- a/Final/python/qpid/codec.py +++ /dev/null @@ -1,224 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Utility code to translate between python objects and AMQP encoded data -fields. -""" - -from cStringIO import StringIO -from struct import * - -class EOF(Exception): - pass - -class Codec: - - def __init__(self, stream): - self.stream = stream - self.nwrote = 0 - self.nread = 0 - self.incoming_bits = [] - self.outgoing_bits = [] - - def read(self, n): - data = self.stream.read(n) - if n > 0 and len(data) == 0: - raise EOF() - self.nread += len(data) - return data - - def write(self, s): - self.flushbits() - self.stream.write(s) - self.nwrote += len(s) - - def flush(self): - self.flushbits() - self.stream.flush() - - def flushbits(self): - if len(self.outgoing_bits) > 0: - bytes = [] - index = 0 - for b in self.outgoing_bits: - if index == 0: bytes.append(0) - if b: bytes[-1] |= 1 << index - index = (index + 1) % 8 - del self.outgoing_bits[:] - for byte in bytes: - self.encode_octet(byte) - - def pack(self, fmt, *args): - self.write(pack(fmt, *args)) - - def unpack(self, fmt): - size = calcsize(fmt) - data = self.read(size) - values = unpack(fmt, data) - if len(values) == 1: - return values[0] - else: - return values - - def encode(self, type, value): - getattr(self, "encode_" + type)(value) - - def decode(self, type): - return getattr(self, "decode_" + type)() - - # bit - def encode_bit(self, o): - if o: - self.outgoing_bits.append(True) - else: - self.outgoing_bits.append(False) - - def decode_bit(self): - if len(self.incoming_bits) == 0: - bits = self.decode_octet() - for i in range(8): - self.incoming_bits.append(bits >> i & 1 != 0) - return self.incoming_bits.pop(0) - - # octet - def encode_octet(self, o): - self.pack("!B", o) - - def decode_octet(self): - return self.unpack("!B") - - # short - def encode_short(self, o): - self.pack("!H", o) - - def decode_short(self): - return self.unpack("!H") - - # long - def encode_long(self, o): - self.pack("!L", o) - - def decode_long(self): - return self.unpack("!L") - - # longlong - def encode_longlong(self, o): - self.pack("!Q", o) - - def decode_longlong(self): - return self.unpack("!Q") - - def enc_str(self, fmt, s): - size = len(s) - self.pack(fmt, size) - self.write(s) - - def dec_str(self, fmt): - size = self.unpack(fmt) - return self.read(size) - - # shortstr - def encode_shortstr(self, s): - self.enc_str("!B", s) - - def decode_shortstr(self): - return self.dec_str("!B") - - # longstr - def encode_longstr(self, s): - if isinstance(s, dict): - self.encode_table(s) - else: - self.enc_str("!L", s) - - def decode_longstr(self): - return self.dec_str("!L") - - # table - def encode_table(self, tbl): - enc = StringIO() - codec = Codec(enc) - for key, value in tbl.items(): - codec.encode_shortstr(key) - if isinstance(value, basestring): - codec.write("S") - codec.encode_longstr(value) - else: - codec.write("I") - codec.encode_long(value) - s = enc.getvalue() - self.encode_long(len(s)) - self.write(s) - - def decode_table(self): - size = self.decode_long() - start = self.nread - result = {} - while self.nread - start < size: - key = self.decode_shortstr() - type = self.read(1) - if type == "S": - value = self.decode_longstr() - elif type == "I": - value = self.decode_long() - else: - raise ValueError(repr(type)) - result[key] = value - return result - -def test(type, value): - if isinstance(value, (list, tuple)): - values = value - else: - values = [value] - stream = StringIO() - codec = Codec(stream) - for v in values: - codec.encode(type, v) - codec.flush() - enc = stream.getvalue() - stream.reset() - dup = [] - for i in xrange(len(values)): - dup.append(codec.decode(type)) - if values != dup: - raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) - -if __name__ == "__main__": - def dotest(type, value): - args = (type, value) - test(*args) - - for value in ("1", "0", "110", "011", "11001", "10101", "10011"): - for i in range(10): - dotest("bit", map(lambda x: x == "1", value*i)) - - for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): - dotest("table", value) - - for type in ("octet", "short", "long", "longlong"): - for value in range(0, 256): - dotest(type, value) - - for type in ("shortstr", "longstr"): - for value in ("", "a", "asdf"): - dotest(type, value) diff --git a/Final/python/qpid/connection.py b/Final/python/qpid/connection.py deleted file mode 100644 index 0b788e091b..0000000000 --- a/Final/python/qpid/connection.py +++ /dev/null @@ -1,270 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A Connection class containing socket code that uses the spec metadata -to read and write Frame objects. This could be used by a client, -server, or even a proxy implementation. -""" - -import socket, codec,logging -from cStringIO import StringIO -from spec import load, pythonize -from codec import EOF - -class SockIO: - - def __init__(self, sock): - self.sock = sock - - def write(self, buf): -# print "OUT: %r" % buf - self.sock.sendall(buf) - - def read(self, n): - data = "" - while len(data) < n: - try: - s = self.sock.recv(n - len(data)) - except socket.error: - break - if len(s) == 0: - break -# print "IN: %r" % s - data += s - return data - - def flush(self): - pass - -class Connection: - - def __init__(self, host, port, spec): - self.host = host - self.port = port - self.spec = spec - self.FRAME_END = self.spec.constants.bypyname["frame_end"].id - - def connect(self): - sock = socket.socket() - sock.connect((self.host, self.port)) - sock.setblocking(1) - self.codec = codec.Codec(SockIO(sock)) - - def flush(self): - self.codec.flush() - - INIT="!4s4B" - - def init(self): - self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, - self.spec.minor) - - def write(self, frame): - c = self.codec - c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id) - c.encode_short(frame.channel) - frame.payload.encode(c) - c.encode_octet(self.FRAME_END) - - def read(self): - c = self.codec - type = pythonize(self.spec.constants.byid[c.decode_octet()].name) - channel = c.decode_short() - payload = Frame.DECODERS[type].decode(self.spec, c) - end = c.decode_octet() - if end != self.FRAME_END: - raise "frame error: expected %r, got %r" % (self.FRAME_END, end) - frame = Frame(channel, payload) - return frame - -class Frame: - - METHOD = "frame_method" - HEADER = "frame_header" - BODY = "frame_body" - OOB_METHOD = "frame_oob_method" - OOB_HEADER = "frame_oob_header" - OOB_BODY = "frame_oob_body" - TRACE = "frame_trace" - HEARTBEAT = "frame_heartbeat" - - DECODERS = {} - - def __init__(self, channel, payload): - self.channel = channel - self.payload = payload - - def __str__(self): - return "[%d] %s" % (self.channel, self.payload) - -class Payload: - - class __metaclass__(type): - - def __new__(cls, name, bases, dict): - for req in ("encode", "decode", "type"): - if not dict.has_key(req): - raise TypeError("%s must define %s" % (name, req)) - dict["decode"] = staticmethod(dict["decode"]) - t = type.__new__(cls, name, bases, dict) - if t.type != None: - Frame.DECODERS[t.type] = t - return t - - type = None - - def encode(self, enc): abstract - - def decode(spec, dec): abstract - -class Method(Payload): - - type = Frame.METHOD - - def __init__(self, method, *args): - if len(args) != len(method.fields): - argspec = ["%s: %s" % (pythonize(f.name), f.type) - for f in method.fields] - raise TypeError("%s.%s expecting (%s), got %s" % - (pythonize(method.klass.name), - pythonize(method.name), ", ".join(argspec), args)) - self.method = method - self.args = args - - def encode(self, enc): - buf = StringIO() - c = codec.Codec(buf) - c.encode_short(self.method.klass.id) - c.encode_short(self.method.id) - for field, arg in zip(self.method.fields, self.args): - c.encode(field.type, arg) - c.flush() - enc.encode_longstr(buf.getvalue()) - - def decode(spec, dec): - enc = dec.decode_longstr() - c = codec.Codec(StringIO(enc)) - klass = spec.classes.byid[c.decode_short()] - meth = klass.methods.byid[c.decode_short()] - args = tuple([c.decode(f.type) for f in meth.fields]) - return Method(meth, *args) - - def __str__(self): - return "%s %s" % (self.method, ", ".join([str(a) for a in self.args])) - -class Header(Payload): - - type = Frame.HEADER - - def __init__(self, klass, weight, size, **properties): - self.klass = klass - self.weight = weight - self.size = size - self.properties = properties - - def __getitem__(self, name): - return self.properties[name] - - def __setitem__(self, name, value): - self.properties[name] = value - - def __delitem__(self, name): - del self.properties[name] - - def encode(self, enc): - buf = StringIO() - c = codec.Codec(buf) - c.encode_short(self.klass.id) - c.encode_short(self.weight) - c.encode_longlong(self.size) - - # property flags - nprops = len(self.klass.fields) - flags = 0 - for i in range(nprops): - f = self.klass.fields.items[i] - flags <<= 1 - if self.properties.get(f.name) != None: - flags |= 1 - # the last bit indicates more flags - if i > 0 and (i % 15) == 0: - flags <<= 1 - if nprops > (i + 1): - flags |= 1 - c.encode_short(flags) - flags = 0 - flags <<= ((16 - (nprops % 15)) % 16) - c.encode_short(flags) - - # properties - for f in self.klass.fields: - v = self.properties.get(f.name) - if v != None: - c.encode(f.type, v) - c.flush() - enc.encode_longstr(buf.getvalue()) - - def decode(spec, dec): - c = codec.Codec(StringIO(dec.decode_longstr())) - klass = spec.classes.byid[c.decode_short()] - weight = c.decode_short() - size = c.decode_longlong() - - # property flags - bits = [] - while True: - flags = c.decode_short() - for i in range(15, 0, -1): - if flags >> i & 0x1 != 0: - bits.append(True) - else: - bits.append(False) - if flags & 0x1 == 0: - break - - # properties - properties = {} - for b, f in zip(bits, klass.fields): - if b: - # Note: decode returns a unicode u'' string but only - # plain '' strings can be used as keywords so we need to - # stringify the names. - properties[str(f.name)] = c.decode(f.type) - return Header(klass, weight, size, **properties) - - def __str__(self): - return "%s %s %s %s" % (self.klass, self.weight, self.size, - self.properties) - -class Body(Payload): - - type = Frame.BODY - - def __init__(self, content): - self.content = content - - def encode(self, enc): - enc.encode_longstr(self.content) - - def decode(spec, dec): - return Body(dec.decode_longstr()) - - def __str__(self): - return "Body(%r)" % self.content diff --git a/Final/python/qpid/content.py b/Final/python/qpid/content.py deleted file mode 100644 index bcbea1697c..0000000000 --- a/Final/python/qpid/content.py +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A simple python representation for AMQP content. -""" - -def default(val, defval): - if val == None: - return defval - else: - return val - -class Content: - - def __init__(self, body = "", children = None, properties = None): - self.body = body - self.children = default(children, []) - self.properties = default(properties, {}) - - def size(self): - return len(self.body) - - def weight(self): - return len(self.children) - - def __getitem__(self, name): - return self.properties[name] - - def __setitem__(self, name, value): - self.properties[name] = value - - def __delitem__(self, name): - del self.properties[name] diff --git a/Final/python/qpid/delegate.py b/Final/python/qpid/delegate.py deleted file mode 100644 index 035bb3c476..0000000000 --- a/Final/python/qpid/delegate.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Delegate implementation intended for use with the peer module. -""" - -import threading, inspect -from spec import pythonize - -class Delegate: - - def __init__(self): - self.handlers = {} - self.invokers = {} - # initialize all the mixins - self.invoke_all("init") - - def invoke_all(self, meth, *args, **kwargs): - for cls in inspect.getmro(self.__class__): - if hasattr(cls, meth): - getattr(cls, meth)(self, *args, **kwargs) - - def dispatch(self, channel, message): - method = message.method - - try: - handler = self.handlers[method] - except KeyError: - name = "%s_%s" % (pythonize(method.klass.name), - pythonize(method.name)) - handler = getattr(self, name) - self.handlers[method] = handler - - return handler(channel, message) - - def close(self, reason): - self.invoke_all("close", reason) diff --git a/Final/python/qpid/message.py b/Final/python/qpid/message.py deleted file mode 100644 index 914b878147..0000000000 --- a/Final/python/qpid/message.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from sets import Set - -class Message: - - COMMON_FIELDS = Set(("content", "method", "fields")) - - def __init__(self, method, fields, content = None): - self.method = method - self.fields = fields - self.content = content - - def __len__(self): - l = len(self.fields) - if self.method.content: - l += 1 - return len(self.fields) - - def _idx(self, idx): - if idx < 0: idx += len(self) - if idx < 0 or idx > len(self): - raise IndexError(idx) - return idx - - def __getitem__(self, idx): - idx = self._idx(idx) - if idx == len(self.fields): - return self.content - else: - return self.fields[idx] - - def __setitem__(self, idx, value): - idx = self._idx(idx) - if idx == len(self.fields): - self.content = value - else: - self.fields[idx] = value - - def _slot(self, attr): - if attr in Message.COMMON_FIELDS: - env = self.__dict__ - key = attr - else: - env = self.fields - try: - field = self.method.fields.bypyname[attr] - key = self.method.fields.index(field) - except KeyError: - raise AttributeError(attr) - return env, key - - def __getattr__(self, attr): - env, key = self._slot(attr) - return env[key] - - def __setattr__(self, attr, value): - env, key = self._slot(attr) - env[attr] = value - - STR = "%s %s content = %s" - REPR = STR.replace("%s", "%r") - - def __str__(self): - return Message.STR % (self.method, self.fields, self.content) - - def __repr__(self): - return Message.REPR % (self.method, self.fields, self.content) diff --git a/Final/python/qpid/peer.py b/Final/python/qpid/peer.py deleted file mode 100644 index 7c6cf91dea..0000000000 --- a/Final/python/qpid/peer.py +++ /dev/null @@ -1,210 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -This module contains a skeletal peer implementation useful for -implementing an AMQP server, client, or proxy. The peer implementation -sorts incoming frames to their intended channels, and dispatches -incoming method frames to a delegate. -""" - -import thread, traceback, socket, sys, logging -from connection import Frame, EOF, Method, Header, Body -from message import Message -from queue import Queue, Closed as QueueClosed -from content import Content -from cStringIO import StringIO - -class Peer: - - def __init__(self, conn, delegate): - self.conn = conn - self.delegate = delegate - self.outgoing = Queue(0) - self.work = Queue(0) - self.channels = {} - self.Channel = type("Channel%s" % conn.spec.klass.__name__, - (Channel, conn.spec.klass), {}) - self.lock = thread.allocate_lock() - - def channel(self, id): - self.lock.acquire() - try: - try: - ch = self.channels[id] - except KeyError: - ch = self.Channel(id, self.outgoing) - self.channels[id] = ch - finally: - self.lock.release() - return ch - - def start(self): - thread.start_new_thread(self.writer, ()) - thread.start_new_thread(self.reader, ()) - thread.start_new_thread(self.worker, ()) - - def fatal(self, message=None): - """Call when an unexpected exception occurs that will kill a thread.""" - if message: print >> sys.stderr, message - self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) - - def reader(self): - try: - while True: - try: - frame = self.conn.read() - except EOF, e: - self.work.close() - break - ch = self.channel(frame.channel) - ch.dispatch(frame, self.work) - except: - self.fatal() - - def close(self, reason): - for ch in self.channels.values(): - ch.close(reason) - self.delegate.close(reason) - - def writer(self): - try: - while True: - try: - message = self.outgoing.get() - self.conn.write(message) - except socket.error, e: - self.close(e) - break - self.conn.flush() - except: - self.fatal() - - def worker(self): - try: - while True: - self.dispatch(self.work.get()) - except QueueClosed, e: - self.close(e) - except: - self.fatal() - - def dispatch(self, queue): - frame = queue.get() - channel = self.channel(frame.channel) - payload = frame.payload - if payload.method.content: - content = read_content(queue) - else: - content = None - # Let the caller deal with exceptions thrown here. - message = Message(payload.method, payload.args, content) - self.delegate.dispatch(channel, message) - -class Closed(Exception): pass - -class Channel: - - def __init__(self, id, outgoing): - self.id = id - self.outgoing = outgoing - self.incoming = Queue(0) - self.responses = Queue(0) - self.queue = None - self.closed = False - self.reason = None - - def close(self, reason): - if self.closed: - return - self.closed = True - self.reason = reason - self.incoming.close() - self.responses.close() - - def dispatch(self, frame, work): - payload = frame.payload - if isinstance(payload, Method): - if payload.method.response: - self.queue = self.responses - else: - self.queue = self.incoming - work.put(self.incoming) - self.queue.put(frame) - - def invoke(self, method, args, content = None): - if self.closed: - raise Closed(self.reason) - frame = Frame(self.id, Method(method, *args)) - self.outgoing.put(frame) - - if method.content: - if content == None: - content = Content() - self.write_content(method.klass, content, self.outgoing) - - try: - # here we depend on all nowait fields being named nowait - f = method.fields.byname["nowait"] - nowait = args[method.fields.index(f)] - except KeyError: - nowait = False - - try: - if not nowait and method.responses: - resp = self.responses.get().payload - if resp.method.content: - content = read_content(self.responses) - else: - content = None - if resp.method in method.responses: - return Message(resp.method, resp.args, content) - else: - raise ValueError(resp) - except QueueClosed, e: - if self.closed: - raise Closed(self.reason) - else: - raise e - - def write_content(self, klass, content, queue): - size = content.size() - header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) - queue.put(header) - for child in content.children: - self.write_content(klass, child, queue) - # should split up if content.body exceeds max frame size - if size > 0: - queue.put(Frame(self.id, Body(content.body))) - -def read_content(queue): - frame = queue.get() - header = frame.payload - children = [] - for i in range(header.weight): - children.append(read_content(queue)) - size = header.size - read = 0 - buf = StringIO() - while read < size: - body = queue.get() - content = body.payload.content - buf.write(content) - read += len(content) - return Content(buf.getvalue(), children, header.properties.copy()) diff --git a/Final/python/qpid/queue.py b/Final/python/qpid/queue.py deleted file mode 100644 index 5438b328ab..0000000000 --- a/Final/python/qpid/queue.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -This module augments the standard python multithreaded Queue -implementation to add a close() method so that threads blocking on the -content of a queue can be notified if the queue is no longer in use. -""" - -from Queue import Queue as BaseQueue, Empty, Full - -class Closed(Exception): pass - -class Queue(BaseQueue): - - END = object() - - def close(self): - self.put(Queue.END) - - def get(self, block = True, timeout = None): - result = BaseQueue.get(self, block, timeout) - if result == Queue.END: - # this guarantees that any other waiting threads or any future - # calls to get will also result in a Closed exception - self.put(Queue.END) - raise Closed() - else: - return result diff --git a/Final/python/qpid/spec.py b/Final/python/qpid/spec.py deleted file mode 100644 index 0e3a477066..0000000000 --- a/Final/python/qpid/spec.py +++ /dev/null @@ -1,358 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -This module loads protocol metadata into python objects. It provides -access to spec metadata via a python object model, and can also -dynamically creating python methods, classes, and modules based on the -spec metadata. All the generated methods have proper signatures and -doc strings based on the spec metadata so the python help system can -be used to browse the spec documentation. The generated methods all -dispatch to the self.invoke(meth, args) callback of the containing -class so that the generated code can be reused in a variety of -situations. -""" - -import re, textwrap, new, xmlutil - -class SpecContainer: - - def __init__(self): - self.items = [] - self.byname = {} - self.byid = {} - self.indexes = {} - self.bypyname = {} - - def add(self, item): - if self.byname.has_key(item.name): - raise ValueError("duplicate name: %s" % item) - if self.byid.has_key(item.id): - raise ValueError("duplicate id: %s" % item) - pyname = pythonize(item.name) - if self.bypyname.has_key(pyname): - raise ValueError("duplicate pyname: %s" % item) - self.indexes[item] = len(self.items) - self.items.append(item) - self.byname[item.name] = item - self.byid[item.id] = item - self.bypyname[pyname] = item - - def index(self, item): - try: - return self.indexes[item] - except KeyError: - raise ValueError(item) - - def __iter__(self): - return iter(self.items) - - def __len__(self): - return len(self.items) - -class Metadata: - - PRINT = [] - - def __init__(self): - pass - - def __str__(self): - args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) - return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) - - def __repr__(self): - return str(self) - -class Spec(Metadata): - - PRINT=["major", "minor", "file"] - - def __init__(self, major, minor, file): - Metadata.__init__(self) - self.major = major - self.minor = minor - self.file = file - self.constants = SpecContainer() - self.classes = SpecContainer() - - def post_load(self): - self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) - self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) - - def parse_method(self, name): - parts = re.split(r"\s*\.\s*", name) - if len(parts) != 2: - raise ValueError(name) - klass, meth = parts - return self.classes.byname[klass].methods.byname[meth] - - def define_module(self, name, doc = None): - module = new.module(name, doc) - module.__file__ = self.file - for c in self.classes: - classname = pythonize(c.name) - cls = c.define_class(classname) - cls.__module__ = module.__name__ - setattr(module, classname, cls) - return module - - def define_class(self, name): - methods = {} - for c in self.classes: - for m in c.methods: - meth = pythonize(m.klass.name + "_" + m.name) - methods[meth] = m.define_method(meth) - return type(name, (), methods) - -class Constant(Metadata): - - PRINT=["name", "id"] - - def __init__(self, spec, name, id, klass, docs): - Metadata.__init__(self) - self.spec = spec - self.name = name - self.id = id - self.klass = klass - self.docs = docs - -class Class(Metadata): - - PRINT=["name", "id"] - - def __init__(self, spec, name, id, handler, docs): - Metadata.__init__(self) - self.spec = spec - self.name = name - self.id = id - self.handler = handler - self.fields = SpecContainer() - self.methods = SpecContainer() - self.docs = docs - - def define_class(self, name): - methods = {} - for m in self.methods: - meth = pythonize(m.name) - methods[meth] = m.define_method(meth) - return type(name, (), methods) - -class Method(Metadata): - - PRINT=["name", "id"] - - def __init__(self, klass, name, id, content, responses, synchronous, - description, docs): - Metadata.__init__(self) - self.klass = klass - self.name = name - self.id = id - self.content = content - self.responses = responses - self.synchronous = synchronous - self.fields = SpecContainer() - self.description = description - self.docs = docs - self.response = False - - def docstring(self): - s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) - for f in self.fields: - if f.docs: - s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] + - [fill(d, 4) for d in f.docs[1:]]) - return s - - METHOD = "__method__" - DEFAULTS = {"bit": False, - "shortstr": "", - "longstr": "", - "table": {}, - "octet": 0, - "short": 0, - "long": 0, - "longlong": 0, - "timestamp": 0, - "content": None} - - def define_method(self, name): - g = {Method.METHOD: self} - l = {} - args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields] - if self.content: - args += [("content", None)] - code = "def %s(self, %s):\n" % \ - (name, ", ".join(["%s = %r" % a for a in args])) - code += " %r\n" % self.docstring() - if self.content: - methargs = args[:-1] - else: - methargs = args - argnames = ", ".join([a[0] for a in methargs]) - code += " return self.invoke(%s" % Method.METHOD - if argnames: - code += ", (%s,)" % argnames - else: - code += ", ()" - if self.content: - code += ", content" - code += ")" - exec code in g, l - return l[name] - -class Field(Metadata): - - PRINT=["name", "id", "type"] - - def __init__(self, name, id, type, docs): - Metadata.__init__(self) - self.name = name - self.id = id - self.type = type - self.docs = docs - -def get_docs(nd): - return [n.text for n in nd["doc"]] - -def load_fields(nd, l, domains): - for f_nd in nd["field"]: - try: - type = f_nd["@domain"] - except KeyError: - type = f_nd["@type"] - while domains.has_key(type) and domains[type] != type: - type = domains[type] - l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd))) - -def load(specfile): - doc = xmlutil.parse(specfile) - root = doc["amqp"][0] - spec = Spec(int(root["@major"]), int(root["@minor"]), specfile) - - # constants - for nd in root["constant"]: - const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"), - get_docs(nd)) - spec.constants.add(const) - - # domains are typedefs - domains = {} - for nd in root["domain"]: - domains[nd["@name"]] = nd["@type"] - - # classes - for c_nd in root["class"]: - klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"], - get_docs(c_nd)) - load_fields(c_nd, klass.fields, domains) - for m_nd in c_nd["method"]: - meth = Method(klass, m_nd["@name"], - int(m_nd["@index"]), - m_nd.get_bool("@content", False), - [nd["@name"] for nd in m_nd["response"]], - m_nd.get_bool("@synchronous", False), - m_nd.text, - get_docs(m_nd)) - load_fields(m_nd, meth.fields, domains) - klass.methods.add(meth) - # resolve the responses - for m in klass.methods: - m.responses = [klass.methods.byname[r] for r in m.responses] - for resp in m.responses: - resp.response = True - spec.classes.add(klass) - spec.post_load() - return spec - -REPLACE = {" ": "_", "-": "_"} -KEYWORDS = {"global": "global_", - "return": "return_"} - -def pythonize(name): - name = str(name) - for key, val in REPLACE.items(): - name = name.replace(key, val) - try: - name = KEYWORDS[name] - except KeyError: - pass - return name - -def fill(text, indent, heading = None): - sub = indent * " " - if heading: - init = (indent - 2) * " " + heading + " -- " - else: - init = sub - w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) - return w.fill(" ".join(text.split())) - -class Rule(Metadata): - - PRINT = ["text", "implement", "tests"] - - def __init__(self, text, implement, tests, path): - self.text = text - self.implement = implement - self.tests = tests - self.path = path - -def find_rules(node, rules): - if node.name == "rule": - rules.append(Rule(node.text, node.get("@implement"), - [ch.text for ch in node if ch.name == "test"], - node.path())) - if node.name == "doc" and node.get("@name") == "rule": - tests = [] - if node.has("@test"): - tests.append(node["@test"]) - rules.append(Rule(node.text, None, tests, node.path())) - for child in node: - find_rules(child, rules) - -def load_rules(specfile): - rules = [] - find_rules(xmlutil.parse(specfile), rules) - return rules - -def test_summary(): - template = """ - <html><head><title>AMQP Tests</title></head> - <body> - <table width="80%%" align="center"> - %s - </table> - </body> - </html> - """ - rows = [] - for rule in load_rules("amqp.org/specs/amqp7.xml"): - if rule.tests: - tests = ", ".join(rule.tests) - else: - tests = " " - rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' - '<td><b>Implement:</b> %s</td>' - '<td><b>Tests:</b> %s</td></tr>' % - (rule.path[len("/root/amqp"):], rule.implement, tests)) - rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) - rows.append('<tr><td colspan="3"> </td></tr>') - - print template % "\n".join(rows) diff --git a/Final/python/qpid/testlib.py b/Final/python/qpid/testlib.py deleted file mode 100644 index 39bad75b86..0000000000 --- a/Final/python/qpid/testlib.py +++ /dev/null @@ -1,237 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Support library for qpid python tests. -# - -import sys, re, unittest, os, random, logging -import qpid.client, qpid.spec -import Queue -from getopt import getopt, GetoptError -from qpid.content import Content - -def findmodules(root): - """Find potential python modules under directory root""" - found = [] - for dirpath, subdirs, files in os.walk(root): - modpath = dirpath.replace(os.sep, '.') - if not re.match(r'\.svn$', dirpath): # Avoid SVN directories - for f in files: - match = re.match(r'(.+)\.py$', f) - if match and f != '__init__.py': - found.append('.'.join([modpath, match.group(1)])) - return found - -def default(value, default): - if (value == None): return default - else: return value - -class TestRunner: - """Runs unit tests. - - Parses command line arguments, provides utility functions for tests, - runs the selected test suite. - """ - - def _die(self, message = None): - if message: print message - print """ -run-tests [options] [test*] -The name of a test is package.module.ClassName.testMethod -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : file containing amqp XML spec - -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. - """ - sys.exit(1) - - def setBroker(self, broker): - rex = re.compile(r""" - # [ <user> [ / <password> ] @] <host> [ :<port> ] - ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) - match = rex.match(broker) - if not match: self._die("'%s' is not a valid broker" % (broker)) - self.user, self.password, self.host, self.port = match.groups() - self.port = int(default(self.port, 5672)) - self.user = default(self.user, "guest") - self.password = default(self.password, "guest") - - def __init__(self): - # Defaults - self.setBroker("localhost") - self.spec = "../specs/amqp.0-8.xml" - self.verbose = 1 - self.ignore = [] - - def ignoreFile(self, filename): - f = file(filename) - for line in f.readlines(): self.ignore.append(line.strip()) - f.close() - - def _parseargs(self, args): - try: - opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) - except GetoptError, e: - self._die(str(e)) - for opt, value in opts: - if opt in ("-?", "-h", "--help"): self._die() - if opt in ("-s", "--spec"): self.spec = value - if opt in ("-b", "--broker"): self.setBroker(value) - if opt in ("-v", "--verbose"): self.verbose = 2 - if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) - if opt in ("-i", "--ignore"): self.ignore.append(value) - if opt in ("-I", "--ignore-file"): self.ignoreFile(value) - - if len(self.tests) == 0: self.tests=findmodules("tests") - - def testSuite(self): - class IgnoringTestSuite(unittest.TestSuite): - def addTest(self, test): - if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore: - return - unittest.TestSuite.addTest(self, test) - - # Use our IgnoringTestSuite in the test loader. - unittest.TestLoader.suiteClass = IgnoringTestSuite - return unittest.defaultTestLoader.loadTestsFromNames(self.tests) - - def run(self, args=sys.argv[1:]): - self._parseargs(args) - runner = unittest.TextTestRunner(descriptions=False, - verbosity=self.verbose) - result = runner.run(self.testSuite()) - if (self.ignore): - print "=======================================" - print "NOTE: the following tests were ignored:" - for t in self.ignore: print t - print "=======================================" - return result.wasSuccessful() - - def connect(self, host=None, port=None, spec=None, user=None, password=None): - """Connect to the broker, returns a qpid.client.Client""" - host = host or self.host - port = port or self.port - spec = spec or self.spec - user = user or self.user - password = password or self.password - client = qpid.client.Client(host, port, qpid.spec.load(spec)) - client.start({"LOGIN": user, "PASSWORD": password}) - return client - - -# Global instance for tests to call connect. -testrunner = TestRunner() - - -class TestBase(unittest.TestCase): - """Base class for Qpid test cases. - - self.client is automatically connected with channel 1 open before - the test methods are run. - - Deletes queues and exchanges after. Tests call - self.queue_declare(channel, ...) and self.exchange_declare(chanel, - ...) which are wrappers for the Channel functions that note - resources to clean up later. - """ - - def setUp(self): - self.queues = [] - self.exchanges = [] - self.client = self.connect() - self.channel = self.client.channel(1) - self.channel.channel_open() - - def tearDown(self): - for ch, q in self.queues: - ch.queue_delete(queue=q) - for ch, ex in self.exchanges: - ch.exchange_delete(exchange=ex) - - def connect(self, *args, **keys): - """Create a new connction, return the Client object""" - return testrunner.connect(*args, **keys) - - def queue_declare(self, channel=None, *args, **keys): - channel = channel or self.channel - reply = channel.queue_declare(*args, **keys) - self.queues.append((channel, reply.queue)) - return reply - - def exchange_declare(self, channel=None, ticket=0, exchange='', - type='', passive=False, durable=False, - auto_delete=False, internal=False, nowait=False, - arguments={}): - channel = channel or self.channel - reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) - self.exchanges.append((channel,exchange)) - return reply - - def uniqueString(self): - """Generate a unique string, unique for this TestBase instance""" - if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; - return "Test Message " + str(self.uniqueCounter) - - def consume(self, queueName): - """Consume from named queue returns the Queue object.""" - reply = self.channel.basic_consume(queue=queueName, no_ack=True) - return self.client.queue(reply.consumer_tag) - - def assertEmpty(self, queue): - """Assert that the queue is empty""" - try: - queue.get(timeout=1) - self.fail("Queue is not empty.") - except Queue.Empty: None # Ignore - - def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): - """ - Publish to exchange and assert queue.get() returns the same message. - """ - body = self.uniqueString() - self.channel.basic_publish(exchange=exchange, - content=Content(body, properties=properties), - routing_key=routing_key) - msg = queue.get(timeout=1) - self.assertEqual(body, msg.content.body) - if (properties): self.assertEqual(properties, msg.content.properties) - - def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): - """ - Publish a message and consume it, assert it comes back intact. - Return the Queue object used to consume. - """ - self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) - - def assertChannelException(self, expectedCode, message): - self.assertEqual("channel", message.method.klass.name) - self.assertEqual("close", message.method.name) - self.assertEqual(expectedCode, message.reply_code) - - - def assertConnectionException(self, expectedCode, message): - self.assertEqual("connection", message.method.klass.name) - self.assertEqual("close", message.method.name) - self.assertEqual(expectedCode, message.reply_code) - diff --git a/Final/python/qpid/xmlutil.py b/Final/python/qpid/xmlutil.py deleted file mode 100644 index 585516b44f..0000000000 --- a/Final/python/qpid/xmlutil.py +++ /dev/null @@ -1,119 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -XML utilities used by spec.py -""" - -import xml.sax -from xml.sax.handler import ContentHandler - -def parse(file): - doc = Node("root") - xml.sax.parse(file, Builder(doc)) - return doc - -class Node: - - def __init__(self, name, attrs = None, text = None, parent = None): - self.name = name - self.attrs = attrs - self.text = text - self.parent = parent - self.children = [] - if parent != None: - parent.children.append(self) - - def get_bool(self, key, default = False): - v = self.get(key) - if v == None: - return default - else: - return bool(int(v)) - - def index(self): - if self.parent: - return self.parent.children.index(self) - else: - return 0 - - def has(self, key): - try: - result = self[key] - return True - except KeyError: - return False - except IndexError: - return False - - def get(self, key, default = None): - if self.has(key): - return self[key] - else: - return default - - def __getitem__(self, key): - if callable(key): - return filter(key, self.children) - else: - t = key.__class__ - meth = "__get%s__" % t.__name__ - if hasattr(self, meth): - return getattr(self, meth)(key) - else: - raise KeyError(key) - - def __getstr__(self, name): - if name[:1] == "@": - return self.attrs[name[1:]] - else: - return self[lambda nd: nd.name == name] - - def __getint__(self, index): - return self.children[index] - - def __iter__(self): - return iter(self.children) - - def path(self): - if self.parent == None: - return "/%s" % self.name - else: - return "%s/%s" % (self.parent.path(), self.name) - -class Builder(ContentHandler): - - def __init__(self, start = None): - self.node = start - - def __setitem__(self, element, type): - self.types[element] = type - - def startElement(self, name, attrs): - self.node = Node(name, attrs, None, self.node) - - def endElement(self, name): - self.node = self.node.parent - - def characters(self, content): - if self.node.text == None: - self.node.text = content - else: - self.node.text += content - diff --git a/Final/python/rule2test b/Final/python/rule2test deleted file mode 100755 index 10f151366e..0000000000 --- a/Final/python/rule2test +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Convert rules to tests -# -import sys, re, os.path -from getopt import getopt, GetoptError -from string import capitalize -from xml import dom -from xml.dom.minidom import parse - -def camelcase(s): - """Convert 'string like this' to 'StringLikeThis'""" - return "".join([capitalize(w) for w in re.split(re.compile("\W*"), s)]) - -def uncapitalize(s): return s[0].lower()+s[1:] - -def ancestors(node): - "Return iterator of ancestors from top-level element to node" - def generator(node): - while node and node.parentNode: - yield node - node = node.parentNode - return reversed(list(generator(node))) - -def tagAndName(element): - nameAttr = element.getAttribute("name"); - if (nameAttr) : return camelcase(nameAttr) + camelcase(element.tagName) - else: return camelcase(element.tagName) - -def nodeText(n): - """Recursively collect text from all text nodes under n""" - if n.nodeType == dom.Node.TEXT_NODE: - return n.data - if n.childNodes: - return reduce(lambda t, c: t + nodeText(c), n.childNodes, "") - return "" - -def cleanup(docString, level=8): - unindent = re.sub("\n[ \t]*", "\n", docString.strip()) - emptyLines = re.sub("\n\n\n", "\n\n", unindent) - indented = re.sub("\n", "\n"+level*" ", emptyLines) - return level*" " + indented - -def printTest(test, docstring): - print "class %s(TestBase):" % test - print ' """' - print docstring - print ' """' - print - print - -def printTests(doc, module): - """Returns dictionary { classname : [ (methodname, docstring)* ] * }""" - tests = {} - rules = doc.getElementsByTagName("rule") - for r in rules: - path = list(ancestors(r)) - if module == path[1].getAttribute("name").lower(): - test = "".join(map(tagAndName, path[2:])) + "Tests" - docstring = cleanup(nodeText(r), 4) - printTest(test, docstring) - -def usage(message=None): - if message: print >>sys.stderr, message - print >>sys.stderr, """ -rule2test [options] <amqpclass> - -Print test classes for each rule for the amqpclass in amqp.xml. - -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : file containing amqp XML spec -""" - return 1 - -def main(argv): - try: opts, args = getopt(argv[1:], "h?s:", ["help", "spec="]) - except GetoptError, e: return usage(e) - spec = "../specs/amqp.xml" # Default - for opt, val in opts: - if (opt in ("-h", "-?", "--help")): return usage() - if (opt in ("-s", "--spec")): spec = val - doc = parse(spec) - if len(args) == 0: return usage() - printTests(doc, args[0]) - return 0 - -if (__name__ == "__main__"): sys.exit(main(sys.argv)) diff --git a/Final/python/run-tests b/Final/python/run-tests deleted file mode 100755 index 90c0200d01..0000000000 --- a/Final/python/run-tests +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from qpid.testlib import testrunner - -if not testrunner.run(): sys.exit(1) - - - diff --git a/Final/python/setup.py b/Final/python/setup.py deleted file mode 100644 index a49fa6ca51..0000000000 --- a/Final/python/setup.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from distutils.core import setup - -setup(name="qpid", version="0.1", packages=["qpid"], scripts=["amqp-doc"], - url="http://incubator.apache.org/qpid", - license="Apache Software License", - description="Python language client implementation for Apache Qpid") diff --git a/Final/python/tests/__init__.py b/Final/python/tests/__init__.py deleted file mode 100644 index 9a09d2d04f..0000000000 --- a/Final/python/tests/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# diff --git a/Final/python/tests/basic.py b/Final/python/tests/basic.py deleted file mode 100644 index bbbfa8ebf9..0000000000 --- a/Final/python/tests/basic.py +++ /dev/null @@ -1,431 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BasicTests(TestBase): - """Tests for 'methods' on the amqp basic 'class'""" - - def test_consume_no_local(self): - """ - Test that the no_local flag is honoured in the consume method - """ - channel = self.channel - #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True) - #establish two consumers one of which excludes delivery of locally sent messages - channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a") - channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True) - - #send a message - channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local")) - channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local")) - - #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") - msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) - try: - excluded.get(timeout=1) - self.fail("Received locally published message though no_local=true") - except Empty: None - - - def test_consume_exclusive(self): - """ - Test that the exclusive flag is honoured in the consume method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True) - - #check that an exclusive consumer prevents other consumer being created: - channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True) - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2") - self.fail("Expected consume request to fail due to previous exclusive consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.channel_open() - - #check that an exclusive consumer cannot be created if a consumer already exists: - channel.basic_consume(consumer_tag="first", queue="test-queue-2") - try: - channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) - self.fail("Expected exclusive consume request to fail due to previous consumer") - except Closed, e: - self.assertChannelException(403, e.args[0]) - - def test_consume_queue_errors(self): - """ - Test error conditions associated with the queue field of the consume method: - """ - channel = self.channel - try: - #queue specified but doesn't exist: - channel.basic_consume(queue="invalid-queue") - self.fail("Expected failure when consuming from non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(2) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.basic_consume(queue="") - self.fail("Expected failure when consuming from unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_consume_unique_consumers(self): - """ - Ensure unique consumer tags are enforced - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True) - - #check that attempts to use duplicate tags are detected and prevented: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - try: - channel.basic_consume(consumer_tag="first", queue="test-queue-3") - self.fail("Expected consume request to fail due to non-unique tag") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - def test_cancel(self): - """ - Test compliance of the basic.cancel method - """ - channel = self.channel - #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") - channel.basic_publish(routing_key="test-queue-4", content=Content("One")) - - #cancel should stop messages being delivered - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) - myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) - try: - msg = myqueue.get(timeout=1) - self.fail("Got message after cancellation: " + msg) - except Empty: None - - #cancellation of non-existant consumers should be handled without error - channel.basic_cancel(consumer_tag="my-consumer") - channel.basic_cancel(consumer_tag="this-never-existed") - - - def test_ack(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True) - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - def test_recover_requeue(self): - """ - Test requeing on recovery - """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True) - - subscription = channel.basic_consume(queue="test-requeue", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - channel.basic_publish(routing_key="test-requeue", content=Content("One")) - channel.basic_publish(routing_key="test-requeue", content=Content("Two")) - channel.basic_publish(routing_key="test-requeue", content=Content("Three")) - channel.basic_publish(routing_key="test-requeue", content=Content("Four")) - channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_cancel(consumer_tag=subscription.consumer_tag) - subscription2 = channel.basic_consume(queue="test-requeue") - queue2 = self.client.queue(subscription2.consumer_tag) - - channel.basic_recover(requeue=True) - - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - self.assertEqual(True, msg3b.redelivered) - self.assertEqual(True, msg5b.redelivered) - - try: - extra = queue2.get(timeout=1) - self.fail("Got unexpected message in second queue: " + extra.content.body) - except Empty: None - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message in original queue: " + extra.content.body) - except Empty: None - - - def test_qos_prefetch_count(self): - """ - Test that the prefetch count specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 5: - channel.basic_qos(prefetch_count=5) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered: - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - - - def test_qos_prefetch_size(self): - """ - Test that the prefetch size specified is honoured - """ - #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) - queue = self.client.queue(subscription.consumer_tag) - - #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.basic_qos(prefetch_size=50) - - #publish 10 messages: - for i in range(1, 11): - channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) - - #only 5 messages should have been delivered (i.e. 45 bytes worth): - for i in range(1, 6): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 6th message in original queue: " + extra.content.body) - except Empty: None - - #ack messages and check that the next set arrive ok: - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - for i in range(6, 11): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected 11th message in original queue: " + extra.content.body) - except Empty: None - - #make sure that a single oversized message still gets delivered - large = "abcdefghijklmnopqrstuvwxyz" - large = large + "-" + large; - channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) - - def test_get(self): - """ - Test basic_get method - """ - channel = self.channel - channel.queue_declare(queue="test-get", exclusive=True) - - #publish some messages (no_ack=True) with persistent messaging - for i in range(1, 11): - msg=Content("Message %d" % i) - msg["delivery mode"] = 2 - channel.basic_publish(routing_key="test-get",content=msg ) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(1, 11): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-empty") - - - #publish some messages (no_ack=True) transient messaging - for i in range(11, 21): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(11, 21): - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-empty") - - #repeat for no_ack=False - - #publish some messages (no_ack=False) with persistent messaging - for i in range(21, 31): - msg=Content("Message %d" % i) - msg["delivery mode"] = 2 - channel.basic_publish(routing_key="test-get",content=msg ) - - #use basic_get to read back the messages, and check that we get an empty at the end - for i in range(21, 31): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-ok") - self.assertEqual("Message %d" % i, reply.content.body) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-empty") - - #public some messages (no_ack=False) with transient messaging - for i in range(31, 41): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) - - for i in range(31, 41): - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-ok") - self.assertEqual("Message %d" % i, reply.content.body) - if(i == 33): - channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [35, 37, 39]): - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-empty") - - #recover(requeue=True) - channel.basic_recover(requeue=True) - - #get the unacked messages again (34, 36, 38, 40) - for i in [34, 36, 38, 40]: - reply = channel.basic_get(no_ack=False) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-ok") - self.assertEqual("Message %d" % i, reply.content.body) - channel.basic_ack(delivery_tag=reply.delivery_tag) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-empty") - - channel.basic_recover(requeue=True) - - reply = channel.basic_get(no_ack=True) - self.assertEqual(reply.method.klass.name, "basic") - self.assertEqual(reply.method.name, "get-empty") diff --git a/Final/python/tests/broker.py b/Final/python/tests/broker.py deleted file mode 100644 index 90009b6847..0000000000 --- a/Final/python/tests/broker.py +++ /dev/null @@ -1,122 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class BrokerTests(TestBase): - """Tests for basic Broker functionality""" - - def test_amqp_basic_13(self): - """ - First, this test tries to receive a message with a no-ack - consumer. Second, this test tries to explicitely receive and - acknowledge a message with an acknowledging consumer. - """ - ch = self.channel - self.queue_declare(ch, queue = "myqueue") - - # No ack consumer - ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag - body = "test no-ack" - ch.basic_publish(routing_key = "myqueue", content = Content(body)) - msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.content.body == body) - - # Acknowleding consumer - self.queue_declare(ch, queue = "otherqueue") - ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag - body = "test ack" - ch.basic_publish(routing_key = "otherqueue", content = Content(body)) - msg = self.client.queue(ctag).get(timeout = 5) - ch.basic_ack(delivery_tag = msg.delivery_tag) - self.assert_(msg.content.body == body) - - def test_basic_delivery_immediate(self): - """ - Test basic message delivery where consume is issued before publish - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - - body = "Immediate Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) - msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) - - # TODO: Ensure we fail if immediate=True and there's no consumer. - - - def test_basic_delivery_queued(self): - """ - Test basic message delivery where publish is issued before consume - (i.e. requires queueing of the message) - """ - channel = self.channel - self.exchange_declare(channel, exchange="test-exchange", type="direct") - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - body = "Queued Delivery" - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=5) - self.assert_(msg.content.body == body) - - def test_invalid_channel(self): - channel = self.client.channel(200) - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for invalid channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_closed_channel(self): - channel = self.client.channel(200) - channel.channel_open() - channel.channel_close() - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for closed channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_channel_flow(self): - channel = self.channel - channel.queue_declare(queue="flow_test_queue", exclusive=True) - channel.basic_consume(consumer_tag="my-tag", queue="flow_test_queue") - incoming = self.client.queue("my-tag") - - channel.channel_flow(active=False) - channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) - try: - incoming.get(timeout=1) - self.fail("Received message when flow turned off.") - except Empty: None - - channel.channel_flow(active=True) - msg = incoming.get(timeout=1) - self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) - - diff --git a/Final/python/tests/example.py b/Final/python/tests/example.py deleted file mode 100644 index bc84f002e0..0000000000 --- a/Final/python/tests/example.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class ExampleTest (TestBase): - """ - An example Qpid test, illustrating the unittest frameowkr and the - python Qpid client. The test class must inherit TestCase. The - test code uses the Qpid client to interact with a qpid broker and - verify it behaves as expected. - """ - - def test_example(self): - """ - An example test. Note that test functions must start with 'test_' - to be recognized by the test framework. - """ - - # By inheriting TestBase, self.client is automatically connected - # and self.channel is automatically opened as channel(1) - # Other channel methods mimic the protocol. - channel = self.channel - - # Now we can send regular commands. If you want to see what the method - # arguments mean or what other commands are available, you can use the - # python builtin help() method. For example: - #help(chan) - #help(chan.exchange_declare) - - # If you want browse the available protocol methods without being - # connected to a live server you can use the amqp-doc utility: - # - # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>] - # - # Options: - # -e, --regexp use regex instead of glob when matching - - # Now that we know what commands are available we can use them to - # interact with the server. - - # Here we use ordinal arguments. - self.exchange_declare(channel, 0, "test", "direct") - - # Here we use keyword arguments. - self.queue_declare(channel, queue="test-queue") - channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") - - # Call Channel.basic_consume to register as a consumer. - # All the protocol methods return a message object. The message object - # has fields corresponding to the reply method fields, plus a content - # field that is filled if the reply includes content. In this case the - # interesting field is the consumer_tag. - reply = channel.basic_consume(queue="test-queue") - - # We can use the Client.queue(...) method to access the queue - # corresponding to our consumer_tag. - queue = self.client.queue(reply.consumer_tag) - - # Now lets publish a message and see if our consumer gets it. To do - # this we need to import the Content class. - body = "Hello World!" - channel.basic_publish(exchange="test", - routing_key="key", - content=Content(body)) - - # Now we'll wait for the message to arrive. We can use the timeout - # argument in case the server hangs. By default queue.get() will wait - # until a message arrives or the connection to the server dies. - msg = queue.get(timeout=10) - - # And check that we got the right response with assertEqual - self.assertEqual(body, msg.content.body) - - # Now acknowledge the message. - channel.basic_ack(msg.delivery_tag, True) - diff --git a/Final/python/tests/exchange.py b/Final/python/tests/exchange.py deleted file mode 100644 index 56d6fa82e4..0000000000 --- a/Final/python/tests/exchange.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Tests for exchange behaviour. - -Test classes ending in 'RuleTests' are derived from rules in amqp.xml. -""" - -import Queue, logging -from qpid.testlib import TestBase -from qpid.content import Content -from qpid.client import Closed - - -class StandardExchangeVerifier: - """Verifies standard exchange behavior. - - Used as base class for classes that test standard exchanges.""" - - def verifyDirectExchange(self, ex): - """Verify that ex behaves like a direct exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") - self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") - try: - self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") - self.fail("Expected Empty exception") - except Queue.Empty: None # Expected - - def verifyFanOutExchange(self, ex): - """Verify that ex behaves like a fanout exchange.""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex) - self.queue_declare(queue="p") - self.channel.queue_bind(queue="p", exchange=ex) - for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) - - def verifyTopicExchange(self, ex): - """Verify that ex behaves like a topic exchange""" - self.queue_declare(queue="a") - self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") - q = self.consume("a") - self.assertPublishGet(q, ex, "a.b.x") - self.assertPublishGet(q, ex, "a.x.b.x") - self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match - self.channel.basic_publish(exchange=ex, routing_key="a.b") - self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") - self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") - self.channel.basic_publish(exchange=ex, routing_key="a.b") - self.assert_(q.empty()) - - def verifyHeadersExchange(self, ex): - """Verify that ex is a headers exchange""" - self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) - q = self.consume("q") - headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties={'headers':headers}) - self.channel.basic_publish(exchange=ex) # No headers, won't deliver - self.assertEmpty(q); - - -class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server SHOULD implement these standard exchange types: topic, headers. - - Client attempts to declare an exchange with each of these standard types. - """ - - def testDirect(self): - """Declare and test a direct exchange""" - self.exchange_declare(0, exchange="d", type="direct") - self.verifyDirectExchange("d") - - def testFanout(self): - """Declare and test a fanout exchange""" - self.exchange_declare(0, exchange="f", type="fanout") - self.verifyFanOutExchange("f") - - def testTopic(self): - """Declare and test a topic exchange""" - self.exchange_declare(0, exchange="t", type="topic") - self.verifyTopicExchange("t") - - def testHeaders(self): - """Declare and test a headers exchange""" - self.exchange_declare(0, exchange="h", type="headers") - self.verifyHeadersExchange("h") - - -class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST, in each virtual host, pre-declare an exchange instance - for each standard exchange type that it implements, where the name of the - exchange instance is amq. followed by the exchange type name. - - Client creates a temporary queue and attempts to bind to each required - exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if - those types are defined). - """ - def testAmqDirect(self): self.verifyDirectExchange("amq.direct") - - def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") - - def testAmqTopic(self): self.verifyTopicExchange("amq.topic") - - def testAmqMatch(self): self.verifyHeadersExchange("amq.match") - -class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): - """ - The server MUST predeclare a direct exchange to act as the default exchange - for content Publish methods and for default queue bindings. - - Client checks that the default exchange is active by specifying a queue - binding with no exchange name, and publishing a message with a suitable - routing key but without specifying the exchange name, then ensuring that - the message arrives in the queue correctly. - """ - def testDefaultExchange(self): - # Test automatic binding by queue name. - self.queue_declare(queue="d") - self.assertPublishConsume(queue="d", routing_key="d") - # Test explicit bind to default queue - self.verifyDirectExchange("") - - -# TODO aconway 2006-09-27: Fill in empty tests: - -class DefaultAccessRuleTests(TestBase): - """ - The server MUST NOT allow clients to access the default exchange except - by specifying an empty exchange name in the Queue.Bind and content Publish - methods. - """ - -class ExtensionsRuleTests(TestBase): - """ - The server MAY implement other exchange types as wanted. - """ - - -class DeclareMethodMinimumRuleTests(TestBase): - """ - The server SHOULD support a minimum of 16 exchanges per virtual host and - ideally, impose no limit except as defined by available resources. - - The client creates as many exchanges as it can until the server reports - an error; the number of exchanges successfuly created must be at least - sixteen. - """ - - -class DeclareMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access to - the realm in which the exchange exists or will be created, or "passive" - access if the if-exists flag is set. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeclareMethodExchangeFieldReservedRuleTests(TestBase): - """ - Exchange names starting with "amq." are reserved for predeclared and - standardised exchanges. The client MUST NOT attempt to create an exchange - starting with "amq.". - - - """ - - -class DeclareMethodTypeFieldTypedRuleTests(TestBase): - """ - Exchanges cannot be redeclared with different types. The client MUST not - attempt to redeclare an existing exchange with a different type than used - in the original Exchange.Declare method. - - - """ - - -class DeclareMethodTypeFieldSupportRuleTests(TestBase): - """ - The client MUST NOT attempt to create an exchange with a type that the - server does not support. - - - """ - - -class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): - """ - If set, and the exchange does not already exist, the server MUST raise a - channel exception with reply code 404 (not found). - """ - def test(self): - try: - self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) - self.fail("Expected 404 for passive declaration of unknown exchange.") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - -class DeclareMethodDurableFieldSupportRuleTests(TestBase): - """ - The server MUST support both durable and transient exchanges. - - - """ - - -class DeclareMethodDurableFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the durable field if the exchange already exists. - - - """ - - -class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase): - """ - The server MUST ignore the auto-delete field if the exchange already - exists. - - - """ - - -class DeleteMethodTicketFieldValidityRuleTests(TestBase): - """ - The client MUST provide a valid access ticket giving "active" access - rights to the exchange's access realm. - - Client creates access ticket with wrong access rights and attempts to use - in this method. - """ - - -class DeleteMethodExchangeFieldExistsRuleTests(TestBase): - """ - The client MUST NOT attempt to delete an exchange that does not exist. - """ - - -class HeadersExchangeTests(TestBase): - """ - Tests for headers exchange functionality. - """ - def setUp(self): - TestBase.setUp(self) - self.queue_declare(queue="q") - self.q = self.consume("q") - - def myAssertPublishGet(self, headers): - self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers}) - - def myBasicPublish(self, headers): - self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers})) - - def testMatchAll(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) - - # None of these should match - self.myBasicPublish({}) - self.myBasicPublish({"name":"barney"}) - self.myBasicPublish({"name":10}) - self.myBasicPublish({"name":"fred", "age":2}) - self.assertEmpty(self.q) - - def testMatchAny(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) - self.myAssertPublishGet({"name":"fred"}) - self.myAssertPublishGet({"name":"fred", "ignoreme":10}) - self.myAssertPublishGet({"ignoreme":10, "age":3}) - - # Wont match - self.myBasicPublish({}) - self.myBasicPublish({"irrelevant":0}) - self.assertEmpty(self.q) - - -class MiscellaneousErrorsTests(TestBase): - """ - Test some miscellaneous error conditions - """ - def testTypeNotKnown(self): - try: - self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") - self.fail("Expected 503 for declaration of unknown exchange type.") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def testDifferentDeclaredType(self): - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") - try: - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") - self.fail("Expected 530 for redeclaration of exchange with different type.") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - #cleanup - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - c2.exchange_delete(exchange="test_different_declared_type_exchange") - diff --git a/Final/python/tests/queue.py b/Final/python/tests/queue.py deleted file mode 100644 index 60ac4c3dfb..0000000000 --- a/Final/python/tests/queue.py +++ /dev/null @@ -1,255 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class QueueTests(TestBase): - """Tests for 'methods' on the amqp queue 'class'""" - - def test_purge(self): - """ - Test that the purge method removes messages from the queue - """ - channel = self.channel - #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two")) - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three")) - - #check that the queue now reports 3 messages: - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(3, reply.message_count) - - #now do the purge, then test that three messages are purged and the count drops to 0 - reply = channel.queue_purge(queue="test-queue"); - self.assertEqual(3, reply.message_count) - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(0, reply.message_count) - - #send a further message and consume it, ensuring that the other messages are really gone - channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) - reply = channel.basic_consume(queue="test-queue", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=1) - self.assertEqual("four", msg.content.body) - - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.channel_open() - try: - #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") - self.fail("Expected failure when purging non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - channel = self.client.channel(3) - channel.channel_open() - try: - #queue not specified and none previously declared for channel: - channel.queue_purge() - self.fail("Expected failure when purging unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.exchange_delete(exchange="test-exchange") - - def test_declare_exclusive(self): - """ - Test that the exclusive field is honoured in queue.declare - """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel - # Here we open a second separate connection: - other = self.connect() - c2 = other.channel(1) - c2.channel_open() - - #declare an exclusive queue: - c1.queue_declare(queue="exclusive-queue", exclusive="True") - try: - #other connection should not be allowed to declare this: - c2.queue_declare(queue="exclusive-queue", exclusive="True") - self.fail("Expected second exclusive queue_declare to raise a channel exception") - except Closed, e: - self.assertChannelException(405, e.args[0]) - - - def test_declare_passive(self): - """ - Test that the passive field is honoured in queue.declare - """ - channel = self.channel - #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive="True") - channel.queue_declare(queue="passive-queue-1", passive="True") - try: - #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive="True") - self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - def test_bind(self): - """ - Test various permutations of the queue.bind method - """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive="True") - - #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") - - #bind the default queue for the channel (i.e. last one declared): - channel.queue_bind(exchange="amq.direct", routing_key="key2") - - #use the queue name where neither routing key nor queue are specified: - channel.queue_bind(exchange="amq.direct") - - #try and bind to non-existant exchange - try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") - self.fail("Expected bind to non-existant exchange to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #need to reopen a channel: - channel = self.client.channel(2) - channel.channel_open() - - #try and bind non-existant queue: - try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") - self.fail("Expected bind of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - def test_delete_simple(self): - """ - Test basic queue deletion - """ - channel = self.channel - - #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.basic_publish(routing_key="delete-me", content=Content("a")) - channel.basic_publish(routing_key="delete-me", content=Content("b")) - channel.basic_publish(routing_key="delete-me", content=Content("c")) - reply = channel.queue_delete(queue="delete-me") - self.assertEqual(3, reply.message_count) - #check that it has gone be declaring passively - try: - channel.queue_declare(queue="delete-me", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.channel_open() - try: - channel.queue_delete(queue="i-dont-exist", if_empty="True") - self.fail("Expected delete of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - - - def test_delete_ifempty(self): - """ - Test that if_empty field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive="True") - channel.basic_publish(routing_key="delete-me-2", content=Content("message")) - - #try to delete, but only if empty: - try: - channel.queue_delete(queue="delete-me-2", if_empty="True") - self.fail("Expected delete if_empty to fail for non-empty queue") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - #need new channel now: - channel = self.client.channel(2) - channel.channel_open() - - #empty queue: - reply = channel.basic_consume(queue="delete-me-2", no_ack=True) - queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=1) - self.assertEqual("message", msg.content.body) - channel.basic_cancel(consumer_tag=reply.consumer_tag) - - #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty="True") - - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-2", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - def test_delete_ifunused(self): - """ - Test that if_unused field of queue_delete is honoured - """ - channel = self.channel - - #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive="True") - reply = channel.basic_consume(queue="delete-me-3", no_ack=True) - - #need new channel now: - channel2 = self.client.channel(2) - channel2.channel_open() - #try to delete, but only if empty: - try: - channel2.queue_delete(queue="delete-me-3", if_unused="True") - self.fail("Expected delete if_unused to fail for queue with existing consumer") - except Closed, e: - self.assertChannelException(406, e.args[0]) - - - channel.basic_cancel(consumer_tag=reply.consumer_tag) - channel.queue_delete(queue="delete-me-3", if_unused="True") - #check that it has gone by declaring passively: - try: - channel.queue_declare(queue="delete-me-3", passive="True") - self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) - - diff --git a/Final/python/tests/testlib.py b/Final/python/tests/testlib.py deleted file mode 100644 index cab07cc4ac..0000000000 --- a/Final/python/tests/testlib.py +++ /dev/null @@ -1,66 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# -# Tests for the testlib itself. -# - -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from Queue import Empty - -import sys -from traceback import * - -def mytrace(frame, event, arg): - print_stack(frame); - print "====" - return mytrace - -class TestBaseTest(TestBase): - """Verify TestBase functions work as expected""" - - def testAssertEmptyPass(self): - """Test assert empty works""" - self.queue_declare(queue="empty") - q = self.consume("empty") - self.assertEmpty(q) - try: - q.get(timeout=1) - self.fail("Queue is not empty.") - except Empty: None # Ignore - - def testAssertEmptyFail(self): - self.queue_declare(queue="full") - q = self.consume("full") - self.channel.basic_publish(routing_key="full") - try: - self.assertEmpty(q); - self.fail("assertEmpty did not assert on non-empty queue") - except AssertionError: None # Ignore - - def testMessageProperties(self): - """Verify properties are passed with message""" - props={"headers":{"x":1, "y":2}} - self.queue_declare(queue="q") - q = self.consume("q") - self.assertPublishGet(q, routing_key="q", properties=props) - - - diff --git a/Final/python/tests/tx.py b/Final/python/tests/tx.py deleted file mode 100644 index 054fb8d8b7..0000000000 --- a/Final/python/tests/tx.py +++ /dev/null @@ -1,209 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase - -class TxTests(TestBase): - """ - Tests for 'methods' on the amqp tx 'class' - """ - - def test_commit(self): - """ - Test that commited publishes are delivered and commited acks are not re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() - - #check results - for i in range(1, 5): - msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) - - msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) - - msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) - channel.tx_commit() - - def test_auto_rollback(self): - """ - Test that a channel closed with an open transaction is effectively rolled back - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) - channel.tx_commit() - - def test_rollback(self): - """ - Test that rolled back publishes are not delivered and rolled back acks are re-delivered - """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - channel.tx_rollback() - - #check results - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - - for q in [queue_a, queue_b, queue_c]: - try: - extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None - - #cleanup - channel.basic_ack(delivery_tag=0, multiple=True) - channel.tx_commit() - - def perform_txn_work(self, channel, name_a, name_b, name_c): - """ - Utility method that does some setup and some work under a transaction. Used for testing both - commit and rollback - """ - #setup: - channel.queue_declare(queue=name_a, exclusive=True) - channel.queue_declare(queue=name_b, exclusive=True) - channel.queue_declare(queue=name_c, exclusive=True) - - key = "my_key_" + name_b - topic = "my_topic_" + name_c - - channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) - channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) - - for i in range(1, 5): - channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i)) - - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6")) - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7")) - - channel.tx_select() - - #consume and ack messages - sub_a = channel.basic_consume(queue=name_a, no_ack=False) - queue_a = self.client.queue(sub_a.consumer_tag) - for i in range(1, 5): - msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) - - sub_b = channel.basic_consume(queue=name_b, no_ack=False) - queue_b = self.client.queue(sub_b.consumer_tag) - msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) - - sub_c = channel.basic_consume(queue=name_c, no_ack=False) - queue_c = self.client.queue(sub_c.consumer_tag) - msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - channel.basic_ack(delivery_tag=msg.delivery_tag) - - #publish messages - for i in range(1, 5): - channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i)) - - channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6")) - channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7")) - - return queue_a, queue_b, queue_c - - def test_commit_overlapping_acks(self): - """ - Test that logically 'overlapping' acks do not cause errors on commit - """ - channel = self.channel - channel.queue_declare(queue="commit-overlapping", exclusive=True) - for i in range(1, 10): - channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i)) - - - channel.tx_select() - - sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) - queue = self.client.queue(sub.consumer_tag) - for i in range(1, 10): - msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - if i in [3, 6, 10]: - channel.basic_ack(delivery_tag=msg.delivery_tag) - - channel.tx_commit() - - #check all have been acked: - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None |