summaryrefslogtreecommitdiff
path: root/Final/python
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-11-19 16:54:44 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-11-19 16:54:44 +0000
commitd66aca6eb24b89052e7e2ab05ade9fcd5398bb95 (patch)
treeb576cec8a33157707585a5ca63b730765276ef69 /Final/python
parent96420dfa6bb12a4b9fce082d50e49910e3dc8779 (diff)
downloadqpid-python-d66aca6eb24b89052e7e2ab05ade9fcd5398bb95.tar.gz
Undoing the accidental move instead of a copy
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@596363 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'Final/python')
-rwxr-xr-xFinal/python/LICENSE.txt203
-rw-r--r--Final/python/NOTICE.txt20
-rw-r--r--Final/python/README.txt24
-rw-r--r--Final/python/RELEASE_NOTES25
-rwxr-xr-xFinal/python/amqp-doc78
-rw-r--r--Final/python/cpp_failing.txt0
-rw-r--r--Final/python/doc/test-requirements.txt10
-rw-r--r--Final/python/java_failing.txt0
-rwxr-xr-xFinal/python/pal2py274
-rw-r--r--Final/python/qpid/__init__.py20
-rw-r--r--Final/python/qpid/client.py114
-rw-r--r--Final/python/qpid/codec.py224
-rw-r--r--Final/python/qpid/connection.py270
-rw-r--r--Final/python/qpid/content.py50
-rw-r--r--Final/python/qpid/delegate.py54
-rw-r--r--Final/python/qpid/message.py84
-rw-r--r--Final/python/qpid/peer.py210
-rw-r--r--Final/python/qpid/queue.py45
-rw-r--r--Final/python/qpid/spec.py358
-rw-r--r--Final/python/qpid/testlib.py237
-rw-r--r--Final/python/qpid/xmlutil.py119
-rwxr-xr-xFinal/python/rule2test108
-rwxr-xr-xFinal/python/run-tests27
-rw-r--r--Final/python/setup.py25
-rw-r--r--Final/python/tests/__init__.py20
-rw-r--r--Final/python/tests/basic.py431
-rw-r--r--Final/python/tests/broker.py122
-rw-r--r--Final/python/tests/example.py94
-rw-r--r--Final/python/tests/exchange.py327
-rw-r--r--Final/python/tests/queue.py255
-rw-r--r--Final/python/tests/testlib.py66
-rw-r--r--Final/python/tests/tx.py209
32 files changed, 0 insertions, 4103 deletions
diff --git a/Final/python/LICENSE.txt b/Final/python/LICENSE.txt
deleted file mode 100755
index 6b0b1270ff..0000000000
--- a/Final/python/LICENSE.txt
+++ /dev/null
@@ -1,203 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
diff --git a/Final/python/NOTICE.txt b/Final/python/NOTICE.txt
deleted file mode 100644
index 32ccdb70c4..0000000000
--- a/Final/python/NOTICE.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-=========================================================================
-== NOTICE file corresponding to the section 4 d of ==
-== the Apache License, Version 2.0, ==
-== in this case for the Apache Qpid distribution. ==
-=========================================================================
-
-This product includes software developed by the Apache Software Foundation
-(http://www.apache.org/).
-
-Please read the LICENSE.txt file present in the root directory of this
-distribution.
-
-
-Aside from contributions to the Apache Qpid project, this software also
-includes (binary only):
-
- - None at this time
-
-
-
diff --git a/Final/python/README.txt b/Final/python/README.txt
deleted file mode 100644
index 0a64f0e2f2..0000000000
--- a/Final/python/README.txt
+++ /dev/null
@@ -1,24 +0,0 @@
-= RUNNING THE PYTHON TESTS =
-
-The tests/ directory contains a collection of python unit tests to
-exercise functions of a broker.
-
-Simplest way to run the tests:
-
- * Run a broker on the default port
-
- * ./run_tests
-
-For additional options: ./run_tests --help
-
-
-== Expected failures ==
-
-Until we complete functionality, tests may fail because the tested
-functionality is missing in the broker. To skip expected failures
-in the C++ or Java brokers:
-
- ./run_tests -I cpp_failing.txt
- ./run_tests -I java_failing.txt
-
-If you fix a failure, please remove it from the corresponding list.
diff --git a/Final/python/RELEASE_NOTES b/Final/python/RELEASE_NOTES
deleted file mode 100644
index 7005aa83cb..0000000000
--- a/Final/python/RELEASE_NOTES
+++ /dev/null
@@ -1,25 +0,0 @@
-Apache Incubator Qpid Python M2 Release Notes
--------------------------------------------
-
-The Qpid M2 release contains support the for AMQP 0-8 specification.
-You can access the 0-8 specification using the following link.
-http://www.amqp.org/tikiwiki/tiki-index.php?page=Download
-
-For full details of Qpid capabilities, as they currently stand, see our
-detailed project documentation at:
-
-http://cwiki.apache.org/confluence/pages/viewpage.action?pageId=28284
-
-Please take time to go through the README file provided with the distro.
-
-
-Known Issues/Outstanding Work
------------------------------
-
-There are no known issues for the Phyton client.
-
-
-M2 Tasks Completed
--------------------
-
-Bug QPID-467 Complete Interop Testing
diff --git a/Final/python/amqp-doc b/Final/python/amqp-doc
deleted file mode 100755
index 0e7f9e862a..0000000000
--- a/Final/python/amqp-doc
+++ /dev/null
@@ -1,78 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import sys, re
-from qpid.spec import load, pythonize
-from getopt import gnu_getopt as getopt, GetoptError
-from fnmatch import fnmatchcase as fnmatch
-
-def die(msg):
- print >> sys.stderr, msg
- sys.exit(1)
-
-def usage(msg = ""):
- return ("""%s
-
-Usage %s [<options>] [<pattern_1> ... <pattern_n>]
-
-Options:
- -e, --regexp use regex instead of glob when matching
- -s, --spec <url> location of amqp.xml
-""" % (msg, sys.argv[0])).strip()
-
-try:
- opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="])
-except GetoptError, e:
- die(str(e))
-
-regexp = False
-spec = "../specs/amqp.0-8.xml"
-for k, v in opts:
- if k == "-e" or k == "--regexp": regexp = True
- if k == "-s" or k == "--spec": spec = v
-
-if regexp:
- def match(pattern, value):
- try:
- return re.match(pattern, value)
- except Exception, e:
- die("error: '%s': %s" % (pattern, e))
-else:
- def match(pattern, value):
- return fnmatch(value, pattern)
-
-spec = load(spec)
-methods = {}
-patterns = args
-for pattern in patterns:
- for c in spec.classes:
- for m in c.methods:
- name = pythonize("%s_%s" % (c.name, m.name))
- if match(pattern, name):
- methods[name] = m.define_method(name)
-
-if patterns:
- if methods:
- AMQP = type("AMQP[%s]" % ", ".join(patterns), (), methods)
- else:
- die("no matches")
-else:
- AMQP = spec.define_class("AMQP")
-
-help(AMQP)
diff --git a/Final/python/cpp_failing.txt b/Final/python/cpp_failing.txt
deleted file mode 100644
index e69de29bb2..0000000000
--- a/Final/python/cpp_failing.txt
+++ /dev/null
diff --git a/Final/python/doc/test-requirements.txt b/Final/python/doc/test-requirements.txt
deleted file mode 100644
index a1ba414eb2..0000000000
--- a/Final/python/doc/test-requirements.txt
+++ /dev/null
@@ -1,10 +0,0 @@
- * start and stop server, possibly in different configurations, should
- at least be able to specify host and port
-
- * initiate multiple connections/server
-
- * initiate multiple channels/connection
-
- * enable positive and negative tests for any protocol interaction
-
- * test harness must be as robust as possible to spec changes
diff --git a/Final/python/java_failing.txt b/Final/python/java_failing.txt
deleted file mode 100644
index e69de29bb2..0000000000
--- a/Final/python/java_failing.txt
+++ /dev/null
diff --git a/Final/python/pal2py b/Final/python/pal2py
deleted file mode 100755
index 544151bf76..0000000000
--- a/Final/python/pal2py
+++ /dev/null
@@ -1,274 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import sys, os, xml
-
-from qpid.spec import load, pythonize
-from textwrap import TextWrapper
-from xml.sax.handler import ContentHandler
-
-class Block:
-
- def __init__(self, children):
- self.children = children
-
- def emit(self, out):
- for child in self.children:
- if not hasattr(child, "emit"):
- raise ValueError(child)
- child.emit(out)
-
- if not self.children:
- out.line("pass")
-
-class If:
-
- def __init__(self, expr, cons, alt = None):
- self.expr = expr
- self.cons = cons
- self.alt = alt
-
- def emit(self, out):
- out.line("if ")
- self.expr.emit(out)
- out.write(":")
- out.level += 1
- self.cons.emit(out)
- out.level -= 1
- if self.alt:
- out.line("else:")
- out.level += 1
- self.alt.emit(out)
- out.level -= 1
-
-class Stmt:
-
- def __init__(self, code):
- self.code = code
-
- def emit(self, out):
- out.line(self.code)
-
-class Expr:
-
- def __init__(self, code):
- self.code = code
-
- def emit(self, out):
- out.write(self.code)
-
-class Abort:
-
- def __init__(self, expr):
- self.expr = expr
-
- def emit(self, out):
- out.line("assert False, ")
- self.expr.emit(out)
-
-WRAPPER = TextWrapper()
-
-def wrap(text):
- return WRAPPER.wrap(" ".join(text.split()))
-
-class Doc:
-
- def __init__(self, text):
- self.text = text
-
- def emit(self, out):
- out.line('"""')
- for line in wrap(self.text):
- out.line(line)
- out.line('"""')
-
-class Frame:
-
- def __init__(self, attrs):
- self.attrs = attrs
- self.children = []
- self.text = None
-
- def __getattr__(self, attr):
- return self.attrs[attr]
-
-def isunicode(s):
- if isinstance(s, str):
- return False
- for ch in s:
- if ord(ch) > 127:
- return True
- return False
-
-def string_literal(s):
- if s == None:
- return None
- if isunicode(s):
- return "%r" % s
- else:
- return "%r" % str(s)
-
-TRUTH = {
- "1": True,
- "0": False,
- "true": True,
- "false": False
- }
-
-LITERAL = {
- "shortstr": string_literal,
- "longstr": string_literal,
- "bit": lambda s: TRUTH[s.lower()],
- "longlong": lambda s: "%r" % long(s)
- }
-
-def literal(s, field):
- return LITERAL[field.type](s)
-
-def palexpr(s, field):
- if s.startswith("$"):
- return "msg.%s" % s[1:]
- else:
- return literal(s, field)
-
-class Translator(ContentHandler):
-
- def __init__(self, spec):
- self.spec = spec
- self.stack = []
- self.content = None
- self.root = Frame(None)
- self.push(self.root)
-
- def emit(self, out):
- blk = Block(self.root.children)
- blk.emit(out)
- out.write("\n")
-
- def peek(self):
- return self.stack[-1]
-
- def pop(self):
- return self.stack.pop()
-
- def push(self, frame):
- self.stack.append(frame)
-
- def startElement(self, name, attrs):
- self.push(Frame(attrs))
-
- def endElement(self, name):
- frame = self.pop()
- if hasattr(self, name):
- child = getattr(self, name)(frame)
- else:
- child = self.handle(name, frame)
-
- if child:
- self.peek().children.append(child)
-
- def characters(self, text):
- frame = self.peek()
- if frame.text:
- frame.text += text
- else:
- frame.text = text
-
- def handle(self, name, frame):
- for klass in self.spec.classes:
- pyklass = pythonize(klass.name)
- if name.startswith(pyklass):
- name = name[len(pyklass) + 1:]
- break
- else:
- raise ValueError("unknown class: %s" % name)
-
- for method in klass.methods:
- pymethod = pythonize(method.name)
- if name == pymethod:
- break
- else:
- raise ValueError("unknown method: %s" % name)
-
- args = ["%s = %s" % (key, palexpr(val, method.fields.bypyname[key]))
- for key, val in frame.attrs.items()]
- if method.content and self.content:
- args.append("content = %r" % string_literal(self.content))
- code = "ssn.%s_%s(%s)" % (pyklass, pymethod, ", ".join(args))
- if pymethod == "consume":
- code = "consumer_tag = %s.consumer_tag" % code
- return Stmt(code)
-
- def pal(self, frame):
- return Block([Doc(frame.text)] + frame.children)
-
- def include(self, frame):
- base, ext = os.path.splitext(frame.filename)
- return Stmt("from %s import *" % base)
-
- def session(self, frame):
- return Block([Stmt("cli = open()"), Stmt("ssn = cli.channel(0)"),
- Stmt("ssn.channel_open()")] + frame.children)
-
- def empty(self, frame):
- return If(Expr("msg == None"), Block(frame.children))
-
- def abort(self, frame):
- return Abort(Expr(string_literal(frame.text)))
-
- def wait(self, frame):
- return Stmt("msg = ssn.queue(consumer_tag).get(timeout=%r)" %
- (int(frame.timeout)/1000))
-
- def basic_arrived(self, frame):
- if frame.children:
- return If(Expr("msg != None"), Block(frame.children))
-
- def basic_content(self, frame):
- self.content = frame.text
-
-class Emitter:
-
- def __init__(self, out):
- self.out = out
- self.level = 0
-
- def write(self, code):
- self.out.write(code)
-
- def line(self, code):
- self.write("\n%s%s" % (" "*self.level, code))
-
- def flush(self):
- self.out.flush()
-
- def close(self):
- self.out.close()
-
-
-for f in sys.argv[2:]:
- base, ext = os.path.splitext(f)
- spec = load(sys.argv[1])
- t = Translator(spec)
- xml.sax.parse(f, t)
-# out = Emitter(open("%s.py" % base))
- out = Emitter(sys.stdout)
- t.emit(out)
- out.close()
diff --git a/Final/python/qpid/__init__.py b/Final/python/qpid/__init__.py
deleted file mode 100644
index 4363f175fb..0000000000
--- a/Final/python/qpid/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import spec, codec, connection, content, peer, delegate, client
diff --git a/Final/python/qpid/client.py b/Final/python/qpid/client.py
deleted file mode 100644
index b4a282f251..0000000000
--- a/Final/python/qpid/client.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-An AQMP client implementation that uses a custom delegate for
-interacting with the server.
-"""
-
-import threading
-from peer import Peer, Closed
-from delegate import Delegate
-from connection import Connection, Frame
-from spec import load
-from queue import Queue
-
-
-class Client:
-
- def __init__(self, host, port, spec, vhost = None):
- self.host = host
- self.port = port
- self.spec = spec
-
- self.mechanism = None
- self.response = None
- self.locale = None
-
- self.vhost = vhost
- if self.vhost == None:
- self.vhost = self.host
-
- self.queues = {}
- self.lock = threading.Lock()
-
- self.closed = False
- self.started = threading.Event()
-
- self.conn = Connection(self.host, self.port, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self))
-
- def wait(self):
- self.started.wait()
- if self.closed:
- raise EOFError()
-
- def queue(self, key):
- self.lock.acquire()
- try:
- try:
- q = self.queues[key]
- except KeyError:
- q = Queue(0)
- self.queues[key] = q
- finally:
- self.lock.release()
- return q
-
- def start(self, response, mechanism="AMQPLAIN", locale="en_US"):
- self.mechanism = mechanism
- self.response = response
- self.locale = locale
-
- self.conn.connect()
- self.conn.init()
- self.peer.start()
- self.wait()
- self.channel(0).connection_open(self.vhost)
-
- def channel(self, id):
- return self.peer.channel(id)
-
-class ClientDelegate(Delegate):
-
- def __init__(self, client):
- Delegate.__init__(self)
- self.client = client
-
- def connection_start(self, ch, msg):
- ch.connection_start_ok(mechanism=self.client.mechanism,
- response=self.client.response,
- locale=self.client.locale)
-
- def connection_tune(self, ch, msg):
- ch.connection_tune_ok(*msg.fields)
- self.client.started.set()
-
- def basic_deliver(self, ch, msg):
- self.client.queue(msg.consumer_tag).put(msg)
-
- def channel_close(self, ch, msg):
- ch.close(msg)
-
- def connection_close(self, ch, msg):
- self.client.peer.close(msg)
-
- def close(self, reason):
- self.client.closed = True
- self.client.started.set()
diff --git a/Final/python/qpid/codec.py b/Final/python/qpid/codec.py
deleted file mode 100644
index 69c7ca8afa..0000000000
--- a/Final/python/qpid/codec.py
+++ /dev/null
@@ -1,224 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Utility code to translate between python objects and AMQP encoded data
-fields.
-"""
-
-from cStringIO import StringIO
-from struct import *
-
-class EOF(Exception):
- pass
-
-class Codec:
-
- def __init__(self, stream):
- self.stream = stream
- self.nwrote = 0
- self.nread = 0
- self.incoming_bits = []
- self.outgoing_bits = []
-
- def read(self, n):
- data = self.stream.read(n)
- if n > 0 and len(data) == 0:
- raise EOF()
- self.nread += len(data)
- return data
-
- def write(self, s):
- self.flushbits()
- self.stream.write(s)
- self.nwrote += len(s)
-
- def flush(self):
- self.flushbits()
- self.stream.flush()
-
- def flushbits(self):
- if len(self.outgoing_bits) > 0:
- bytes = []
- index = 0
- for b in self.outgoing_bits:
- if index == 0: bytes.append(0)
- if b: bytes[-1] |= 1 << index
- index = (index + 1) % 8
- del self.outgoing_bits[:]
- for byte in bytes:
- self.encode_octet(byte)
-
- def pack(self, fmt, *args):
- self.write(pack(fmt, *args))
-
- def unpack(self, fmt):
- size = calcsize(fmt)
- data = self.read(size)
- values = unpack(fmt, data)
- if len(values) == 1:
- return values[0]
- else:
- return values
-
- def encode(self, type, value):
- getattr(self, "encode_" + type)(value)
-
- def decode(self, type):
- return getattr(self, "decode_" + type)()
-
- # bit
- def encode_bit(self, o):
- if o:
- self.outgoing_bits.append(True)
- else:
- self.outgoing_bits.append(False)
-
- def decode_bit(self):
- if len(self.incoming_bits) == 0:
- bits = self.decode_octet()
- for i in range(8):
- self.incoming_bits.append(bits >> i & 1 != 0)
- return self.incoming_bits.pop(0)
-
- # octet
- def encode_octet(self, o):
- self.pack("!B", o)
-
- def decode_octet(self):
- return self.unpack("!B")
-
- # short
- def encode_short(self, o):
- self.pack("!H", o)
-
- def decode_short(self):
- return self.unpack("!H")
-
- # long
- def encode_long(self, o):
- self.pack("!L", o)
-
- def decode_long(self):
- return self.unpack("!L")
-
- # longlong
- def encode_longlong(self, o):
- self.pack("!Q", o)
-
- def decode_longlong(self):
- return self.unpack("!Q")
-
- def enc_str(self, fmt, s):
- size = len(s)
- self.pack(fmt, size)
- self.write(s)
-
- def dec_str(self, fmt):
- size = self.unpack(fmt)
- return self.read(size)
-
- # shortstr
- def encode_shortstr(self, s):
- self.enc_str("!B", s)
-
- def decode_shortstr(self):
- return self.dec_str("!B")
-
- # longstr
- def encode_longstr(self, s):
- if isinstance(s, dict):
- self.encode_table(s)
- else:
- self.enc_str("!L", s)
-
- def decode_longstr(self):
- return self.dec_str("!L")
-
- # table
- def encode_table(self, tbl):
- enc = StringIO()
- codec = Codec(enc)
- for key, value in tbl.items():
- codec.encode_shortstr(key)
- if isinstance(value, basestring):
- codec.write("S")
- codec.encode_longstr(value)
- else:
- codec.write("I")
- codec.encode_long(value)
- s = enc.getvalue()
- self.encode_long(len(s))
- self.write(s)
-
- def decode_table(self):
- size = self.decode_long()
- start = self.nread
- result = {}
- while self.nread - start < size:
- key = self.decode_shortstr()
- type = self.read(1)
- if type == "S":
- value = self.decode_longstr()
- elif type == "I":
- value = self.decode_long()
- else:
- raise ValueError(repr(type))
- result[key] = value
- return result
-
-def test(type, value):
- if isinstance(value, (list, tuple)):
- values = value
- else:
- values = [value]
- stream = StringIO()
- codec = Codec(stream)
- for v in values:
- codec.encode(type, v)
- codec.flush()
- enc = stream.getvalue()
- stream.reset()
- dup = []
- for i in xrange(len(values)):
- dup.append(codec.decode(type))
- if values != dup:
- raise AssertionError("%r --> %r --> %r" % (values, enc, dup))
-
-if __name__ == "__main__":
- def dotest(type, value):
- args = (type, value)
- test(*args)
-
- for value in ("1", "0", "110", "011", "11001", "10101", "10011"):
- for i in range(10):
- dotest("bit", map(lambda x: x == "1", value*i))
-
- for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}):
- dotest("table", value)
-
- for type in ("octet", "short", "long", "longlong"):
- for value in range(0, 256):
- dotest(type, value)
-
- for type in ("shortstr", "longstr"):
- for value in ("", "a", "asdf"):
- dotest(type, value)
diff --git a/Final/python/qpid/connection.py b/Final/python/qpid/connection.py
deleted file mode 100644
index 0b788e091b..0000000000
--- a/Final/python/qpid/connection.py
+++ /dev/null
@@ -1,270 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-A Connection class containing socket code that uses the spec metadata
-to read and write Frame objects. This could be used by a client,
-server, or even a proxy implementation.
-"""
-
-import socket, codec,logging
-from cStringIO import StringIO
-from spec import load, pythonize
-from codec import EOF
-
-class SockIO:
-
- def __init__(self, sock):
- self.sock = sock
-
- def write(self, buf):
-# print "OUT: %r" % buf
- self.sock.sendall(buf)
-
- def read(self, n):
- data = ""
- while len(data) < n:
- try:
- s = self.sock.recv(n - len(data))
- except socket.error:
- break
- if len(s) == 0:
- break
-# print "IN: %r" % s
- data += s
- return data
-
- def flush(self):
- pass
-
-class Connection:
-
- def __init__(self, host, port, spec):
- self.host = host
- self.port = port
- self.spec = spec
- self.FRAME_END = self.spec.constants.bypyname["frame_end"].id
-
- def connect(self):
- sock = socket.socket()
- sock.connect((self.host, self.port))
- sock.setblocking(1)
- self.codec = codec.Codec(SockIO(sock))
-
- def flush(self):
- self.codec.flush()
-
- INIT="!4s4B"
-
- def init(self):
- self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
- self.spec.minor)
-
- def write(self, frame):
- c = self.codec
- c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
- c.encode_short(frame.channel)
- frame.payload.encode(c)
- c.encode_octet(self.FRAME_END)
-
- def read(self):
- c = self.codec
- type = pythonize(self.spec.constants.byid[c.decode_octet()].name)
- channel = c.decode_short()
- payload = Frame.DECODERS[type].decode(self.spec, c)
- end = c.decode_octet()
- if end != self.FRAME_END:
- raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
- frame = Frame(channel, payload)
- return frame
-
-class Frame:
-
- METHOD = "frame_method"
- HEADER = "frame_header"
- BODY = "frame_body"
- OOB_METHOD = "frame_oob_method"
- OOB_HEADER = "frame_oob_header"
- OOB_BODY = "frame_oob_body"
- TRACE = "frame_trace"
- HEARTBEAT = "frame_heartbeat"
-
- DECODERS = {}
-
- def __init__(self, channel, payload):
- self.channel = channel
- self.payload = payload
-
- def __str__(self):
- return "[%d] %s" % (self.channel, self.payload)
-
-class Payload:
-
- class __metaclass__(type):
-
- def __new__(cls, name, bases, dict):
- for req in ("encode", "decode", "type"):
- if not dict.has_key(req):
- raise TypeError("%s must define %s" % (name, req))
- dict["decode"] = staticmethod(dict["decode"])
- t = type.__new__(cls, name, bases, dict)
- if t.type != None:
- Frame.DECODERS[t.type] = t
- return t
-
- type = None
-
- def encode(self, enc): abstract
-
- def decode(spec, dec): abstract
-
-class Method(Payload):
-
- type = Frame.METHOD
-
- def __init__(self, method, *args):
- if len(args) != len(method.fields):
- argspec = ["%s: %s" % (pythonize(f.name), f.type)
- for f in method.fields]
- raise TypeError("%s.%s expecting (%s), got %s" %
- (pythonize(method.klass.name),
- pythonize(method.name), ", ".join(argspec), args))
- self.method = method
- self.args = args
-
- def encode(self, enc):
- buf = StringIO()
- c = codec.Codec(buf)
- c.encode_short(self.method.klass.id)
- c.encode_short(self.method.id)
- for field, arg in zip(self.method.fields, self.args):
- c.encode(field.type, arg)
- c.flush()
- enc.encode_longstr(buf.getvalue())
-
- def decode(spec, dec):
- enc = dec.decode_longstr()
- c = codec.Codec(StringIO(enc))
- klass = spec.classes.byid[c.decode_short()]
- meth = klass.methods.byid[c.decode_short()]
- args = tuple([c.decode(f.type) for f in meth.fields])
- return Method(meth, *args)
-
- def __str__(self):
- return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
-
-class Header(Payload):
-
- type = Frame.HEADER
-
- def __init__(self, klass, weight, size, **properties):
- self.klass = klass
- self.weight = weight
- self.size = size
- self.properties = properties
-
- def __getitem__(self, name):
- return self.properties[name]
-
- def __setitem__(self, name, value):
- self.properties[name] = value
-
- def __delitem__(self, name):
- del self.properties[name]
-
- def encode(self, enc):
- buf = StringIO()
- c = codec.Codec(buf)
- c.encode_short(self.klass.id)
- c.encode_short(self.weight)
- c.encode_longlong(self.size)
-
- # property flags
- nprops = len(self.klass.fields)
- flags = 0
- for i in range(nprops):
- f = self.klass.fields.items[i]
- flags <<= 1
- if self.properties.get(f.name) != None:
- flags |= 1
- # the last bit indicates more flags
- if i > 0 and (i % 15) == 0:
- flags <<= 1
- if nprops > (i + 1):
- flags |= 1
- c.encode_short(flags)
- flags = 0
- flags <<= ((16 - (nprops % 15)) % 16)
- c.encode_short(flags)
-
- # properties
- for f in self.klass.fields:
- v = self.properties.get(f.name)
- if v != None:
- c.encode(f.type, v)
- c.flush()
- enc.encode_longstr(buf.getvalue())
-
- def decode(spec, dec):
- c = codec.Codec(StringIO(dec.decode_longstr()))
- klass = spec.classes.byid[c.decode_short()]
- weight = c.decode_short()
- size = c.decode_longlong()
-
- # property flags
- bits = []
- while True:
- flags = c.decode_short()
- for i in range(15, 0, -1):
- if flags >> i & 0x1 != 0:
- bits.append(True)
- else:
- bits.append(False)
- if flags & 0x1 == 0:
- break
-
- # properties
- properties = {}
- for b, f in zip(bits, klass.fields):
- if b:
- # Note: decode returns a unicode u'' string but only
- # plain '' strings can be used as keywords so we need to
- # stringify the names.
- properties[str(f.name)] = c.decode(f.type)
- return Header(klass, weight, size, **properties)
-
- def __str__(self):
- return "%s %s %s %s" % (self.klass, self.weight, self.size,
- self.properties)
-
-class Body(Payload):
-
- type = Frame.BODY
-
- def __init__(self, content):
- self.content = content
-
- def encode(self, enc):
- enc.encode_longstr(self.content)
-
- def decode(spec, dec):
- return Body(dec.decode_longstr())
-
- def __str__(self):
- return "Body(%r)" % self.content
diff --git a/Final/python/qpid/content.py b/Final/python/qpid/content.py
deleted file mode 100644
index bcbea1697c..0000000000
--- a/Final/python/qpid/content.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-A simple python representation for AMQP content.
-"""
-
-def default(val, defval):
- if val == None:
- return defval
- else:
- return val
-
-class Content:
-
- def __init__(self, body = "", children = None, properties = None):
- self.body = body
- self.children = default(children, [])
- self.properties = default(properties, {})
-
- def size(self):
- return len(self.body)
-
- def weight(self):
- return len(self.children)
-
- def __getitem__(self, name):
- return self.properties[name]
-
- def __setitem__(self, name, value):
- self.properties[name] = value
-
- def __delitem__(self, name):
- del self.properties[name]
diff --git a/Final/python/qpid/delegate.py b/Final/python/qpid/delegate.py
deleted file mode 100644
index 035bb3c476..0000000000
--- a/Final/python/qpid/delegate.py
+++ /dev/null
@@ -1,54 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Delegate implementation intended for use with the peer module.
-"""
-
-import threading, inspect
-from spec import pythonize
-
-class Delegate:
-
- def __init__(self):
- self.handlers = {}
- self.invokers = {}
- # initialize all the mixins
- self.invoke_all("init")
-
- def invoke_all(self, meth, *args, **kwargs):
- for cls in inspect.getmro(self.__class__):
- if hasattr(cls, meth):
- getattr(cls, meth)(self, *args, **kwargs)
-
- def dispatch(self, channel, message):
- method = message.method
-
- try:
- handler = self.handlers[method]
- except KeyError:
- name = "%s_%s" % (pythonize(method.klass.name),
- pythonize(method.name))
- handler = getattr(self, name)
- self.handlers[method] = handler
-
- return handler(channel, message)
-
- def close(self, reason):
- self.invoke_all("close", reason)
diff --git a/Final/python/qpid/message.py b/Final/python/qpid/message.py
deleted file mode 100644
index 914b878147..0000000000
--- a/Final/python/qpid/message.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from sets import Set
-
-class Message:
-
- COMMON_FIELDS = Set(("content", "method", "fields"))
-
- def __init__(self, method, fields, content = None):
- self.method = method
- self.fields = fields
- self.content = content
-
- def __len__(self):
- l = len(self.fields)
- if self.method.content:
- l += 1
- return len(self.fields)
-
- def _idx(self, idx):
- if idx < 0: idx += len(self)
- if idx < 0 or idx > len(self):
- raise IndexError(idx)
- return idx
-
- def __getitem__(self, idx):
- idx = self._idx(idx)
- if idx == len(self.fields):
- return self.content
- else:
- return self.fields[idx]
-
- def __setitem__(self, idx, value):
- idx = self._idx(idx)
- if idx == len(self.fields):
- self.content = value
- else:
- self.fields[idx] = value
-
- def _slot(self, attr):
- if attr in Message.COMMON_FIELDS:
- env = self.__dict__
- key = attr
- else:
- env = self.fields
- try:
- field = self.method.fields.bypyname[attr]
- key = self.method.fields.index(field)
- except KeyError:
- raise AttributeError(attr)
- return env, key
-
- def __getattr__(self, attr):
- env, key = self._slot(attr)
- return env[key]
-
- def __setattr__(self, attr, value):
- env, key = self._slot(attr)
- env[attr] = value
-
- STR = "%s %s content = %s"
- REPR = STR.replace("%s", "%r")
-
- def __str__(self):
- return Message.STR % (self.method, self.fields, self.content)
-
- def __repr__(self):
- return Message.REPR % (self.method, self.fields, self.content)
diff --git a/Final/python/qpid/peer.py b/Final/python/qpid/peer.py
deleted file mode 100644
index 7c6cf91dea..0000000000
--- a/Final/python/qpid/peer.py
+++ /dev/null
@@ -1,210 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-This module contains a skeletal peer implementation useful for
-implementing an AMQP server, client, or proxy. The peer implementation
-sorts incoming frames to their intended channels, and dispatches
-incoming method frames to a delegate.
-"""
-
-import thread, traceback, socket, sys, logging
-from connection import Frame, EOF, Method, Header, Body
-from message import Message
-from queue import Queue, Closed as QueueClosed
-from content import Content
-from cStringIO import StringIO
-
-class Peer:
-
- def __init__(self, conn, delegate):
- self.conn = conn
- self.delegate = delegate
- self.outgoing = Queue(0)
- self.work = Queue(0)
- self.channels = {}
- self.Channel = type("Channel%s" % conn.spec.klass.__name__,
- (Channel, conn.spec.klass), {})
- self.lock = thread.allocate_lock()
-
- def channel(self, id):
- self.lock.acquire()
- try:
- try:
- ch = self.channels[id]
- except KeyError:
- ch = self.Channel(id, self.outgoing)
- self.channels[id] = ch
- finally:
- self.lock.release()
- return ch
-
- def start(self):
- thread.start_new_thread(self.writer, ())
- thread.start_new_thread(self.reader, ())
- thread.start_new_thread(self.worker, ())
-
- def fatal(self, message=None):
- """Call when an unexpected exception occurs that will kill a thread."""
- if message: print >> sys.stderr, message
- self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
-
- def reader(self):
- try:
- while True:
- try:
- frame = self.conn.read()
- except EOF, e:
- self.work.close()
- break
- ch = self.channel(frame.channel)
- ch.dispatch(frame, self.work)
- except:
- self.fatal()
-
- def close(self, reason):
- for ch in self.channels.values():
- ch.close(reason)
- self.delegate.close(reason)
-
- def writer(self):
- try:
- while True:
- try:
- message = self.outgoing.get()
- self.conn.write(message)
- except socket.error, e:
- self.close(e)
- break
- self.conn.flush()
- except:
- self.fatal()
-
- def worker(self):
- try:
- while True:
- self.dispatch(self.work.get())
- except QueueClosed, e:
- self.close(e)
- except:
- self.fatal()
-
- def dispatch(self, queue):
- frame = queue.get()
- channel = self.channel(frame.channel)
- payload = frame.payload
- if payload.method.content:
- content = read_content(queue)
- else:
- content = None
- # Let the caller deal with exceptions thrown here.
- message = Message(payload.method, payload.args, content)
- self.delegate.dispatch(channel, message)
-
-class Closed(Exception): pass
-
-class Channel:
-
- def __init__(self, id, outgoing):
- self.id = id
- self.outgoing = outgoing
- self.incoming = Queue(0)
- self.responses = Queue(0)
- self.queue = None
- self.closed = False
- self.reason = None
-
- def close(self, reason):
- if self.closed:
- return
- self.closed = True
- self.reason = reason
- self.incoming.close()
- self.responses.close()
-
- def dispatch(self, frame, work):
- payload = frame.payload
- if isinstance(payload, Method):
- if payload.method.response:
- self.queue = self.responses
- else:
- self.queue = self.incoming
- work.put(self.incoming)
- self.queue.put(frame)
-
- def invoke(self, method, args, content = None):
- if self.closed:
- raise Closed(self.reason)
- frame = Frame(self.id, Method(method, *args))
- self.outgoing.put(frame)
-
- if method.content:
- if content == None:
- content = Content()
- self.write_content(method.klass, content, self.outgoing)
-
- try:
- # here we depend on all nowait fields being named nowait
- f = method.fields.byname["nowait"]
- nowait = args[method.fields.index(f)]
- except KeyError:
- nowait = False
-
- try:
- if not nowait and method.responses:
- resp = self.responses.get().payload
- if resp.method.content:
- content = read_content(self.responses)
- else:
- content = None
- if resp.method in method.responses:
- return Message(resp.method, resp.args, content)
- else:
- raise ValueError(resp)
- except QueueClosed, e:
- if self.closed:
- raise Closed(self.reason)
- else:
- raise e
-
- def write_content(self, klass, content, queue):
- size = content.size()
- header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
- queue.put(header)
- for child in content.children:
- self.write_content(klass, child, queue)
- # should split up if content.body exceeds max frame size
- if size > 0:
- queue.put(Frame(self.id, Body(content.body)))
-
-def read_content(queue):
- frame = queue.get()
- header = frame.payload
- children = []
- for i in range(header.weight):
- children.append(read_content(queue))
- size = header.size
- read = 0
- buf = StringIO()
- while read < size:
- body = queue.get()
- content = body.payload.content
- buf.write(content)
- read += len(content)
- return Content(buf.getvalue(), children, header.properties.copy())
diff --git a/Final/python/qpid/queue.py b/Final/python/qpid/queue.py
deleted file mode 100644
index 5438b328ab..0000000000
--- a/Final/python/qpid/queue.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-This module augments the standard python multithreaded Queue
-implementation to add a close() method so that threads blocking on the
-content of a queue can be notified if the queue is no longer in use.
-"""
-
-from Queue import Queue as BaseQueue, Empty, Full
-
-class Closed(Exception): pass
-
-class Queue(BaseQueue):
-
- END = object()
-
- def close(self):
- self.put(Queue.END)
-
- def get(self, block = True, timeout = None):
- result = BaseQueue.get(self, block, timeout)
- if result == Queue.END:
- # this guarantees that any other waiting threads or any future
- # calls to get will also result in a Closed exception
- self.put(Queue.END)
- raise Closed()
- else:
- return result
diff --git a/Final/python/qpid/spec.py b/Final/python/qpid/spec.py
deleted file mode 100644
index 0e3a477066..0000000000
--- a/Final/python/qpid/spec.py
+++ /dev/null
@@ -1,358 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-This module loads protocol metadata into python objects. It provides
-access to spec metadata via a python object model, and can also
-dynamically creating python methods, classes, and modules based on the
-spec metadata. All the generated methods have proper signatures and
-doc strings based on the spec metadata so the python help system can
-be used to browse the spec documentation. The generated methods all
-dispatch to the self.invoke(meth, args) callback of the containing
-class so that the generated code can be reused in a variety of
-situations.
-"""
-
-import re, textwrap, new, xmlutil
-
-class SpecContainer:
-
- def __init__(self):
- self.items = []
- self.byname = {}
- self.byid = {}
- self.indexes = {}
- self.bypyname = {}
-
- def add(self, item):
- if self.byname.has_key(item.name):
- raise ValueError("duplicate name: %s" % item)
- if self.byid.has_key(item.id):
- raise ValueError("duplicate id: %s" % item)
- pyname = pythonize(item.name)
- if self.bypyname.has_key(pyname):
- raise ValueError("duplicate pyname: %s" % item)
- self.indexes[item] = len(self.items)
- self.items.append(item)
- self.byname[item.name] = item
- self.byid[item.id] = item
- self.bypyname[pyname] = item
-
- def index(self, item):
- try:
- return self.indexes[item]
- except KeyError:
- raise ValueError(item)
-
- def __iter__(self):
- return iter(self.items)
-
- def __len__(self):
- return len(self.items)
-
-class Metadata:
-
- PRINT = []
-
- def __init__(self):
- pass
-
- def __str__(self):
- args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT)
- return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
-
- def __repr__(self):
- return str(self)
-
-class Spec(Metadata):
-
- PRINT=["major", "minor", "file"]
-
- def __init__(self, major, minor, file):
- Metadata.__init__(self)
- self.major = major
- self.minor = minor
- self.file = file
- self.constants = SpecContainer()
- self.classes = SpecContainer()
-
- def post_load(self):
- self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
- self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
-
- def parse_method(self, name):
- parts = re.split(r"\s*\.\s*", name)
- if len(parts) != 2:
- raise ValueError(name)
- klass, meth = parts
- return self.classes.byname[klass].methods.byname[meth]
-
- def define_module(self, name, doc = None):
- module = new.module(name, doc)
- module.__file__ = self.file
- for c in self.classes:
- classname = pythonize(c.name)
- cls = c.define_class(classname)
- cls.__module__ = module.__name__
- setattr(module, classname, cls)
- return module
-
- def define_class(self, name):
- methods = {}
- for c in self.classes:
- for m in c.methods:
- meth = pythonize(m.klass.name + "_" + m.name)
- methods[meth] = m.define_method(meth)
- return type(name, (), methods)
-
-class Constant(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, spec, name, id, klass, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.name = name
- self.id = id
- self.klass = klass
- self.docs = docs
-
-class Class(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, spec, name, id, handler, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.name = name
- self.id = id
- self.handler = handler
- self.fields = SpecContainer()
- self.methods = SpecContainer()
- self.docs = docs
-
- def define_class(self, name):
- methods = {}
- for m in self.methods:
- meth = pythonize(m.name)
- methods[meth] = m.define_method(meth)
- return type(name, (), methods)
-
-class Method(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, klass, name, id, content, responses, synchronous,
- description, docs):
- Metadata.__init__(self)
- self.klass = klass
- self.name = name
- self.id = id
- self.content = content
- self.responses = responses
- self.synchronous = synchronous
- self.fields = SpecContainer()
- self.description = description
- self.docs = docs
- self.response = False
-
- def docstring(self):
- s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
- for f in self.fields:
- if f.docs:
- s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] +
- [fill(d, 4) for d in f.docs[1:]])
- return s
-
- METHOD = "__method__"
- DEFAULTS = {"bit": False,
- "shortstr": "",
- "longstr": "",
- "table": {},
- "octet": 0,
- "short": 0,
- "long": 0,
- "longlong": 0,
- "timestamp": 0,
- "content": None}
-
- def define_method(self, name):
- g = {Method.METHOD: self}
- l = {}
- args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields]
- if self.content:
- args += [("content", None)]
- code = "def %s(self, %s):\n" % \
- (name, ", ".join(["%s = %r" % a for a in args]))
- code += " %r\n" % self.docstring()
- if self.content:
- methargs = args[:-1]
- else:
- methargs = args
- argnames = ", ".join([a[0] for a in methargs])
- code += " return self.invoke(%s" % Method.METHOD
- if argnames:
- code += ", (%s,)" % argnames
- else:
- code += ", ()"
- if self.content:
- code += ", content"
- code += ")"
- exec code in g, l
- return l[name]
-
-class Field(Metadata):
-
- PRINT=["name", "id", "type"]
-
- def __init__(self, name, id, type, docs):
- Metadata.__init__(self)
- self.name = name
- self.id = id
- self.type = type
- self.docs = docs
-
-def get_docs(nd):
- return [n.text for n in nd["doc"]]
-
-def load_fields(nd, l, domains):
- for f_nd in nd["field"]:
- try:
- type = f_nd["@domain"]
- except KeyError:
- type = f_nd["@type"]
- while domains.has_key(type) and domains[type] != type:
- type = domains[type]
- l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd)))
-
-def load(specfile):
- doc = xmlutil.parse(specfile)
- root = doc["amqp"][0]
- spec = Spec(int(root["@major"]), int(root["@minor"]), specfile)
-
- # constants
- for nd in root["constant"]:
- const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"),
- get_docs(nd))
- spec.constants.add(const)
-
- # domains are typedefs
- domains = {}
- for nd in root["domain"]:
- domains[nd["@name"]] = nd["@type"]
-
- # classes
- for c_nd in root["class"]:
- klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"],
- get_docs(c_nd))
- load_fields(c_nd, klass.fields, domains)
- for m_nd in c_nd["method"]:
- meth = Method(klass, m_nd["@name"],
- int(m_nd["@index"]),
- m_nd.get_bool("@content", False),
- [nd["@name"] for nd in m_nd["response"]],
- m_nd.get_bool("@synchronous", False),
- m_nd.text,
- get_docs(m_nd))
- load_fields(m_nd, meth.fields, domains)
- klass.methods.add(meth)
- # resolve the responses
- for m in klass.methods:
- m.responses = [klass.methods.byname[r] for r in m.responses]
- for resp in m.responses:
- resp.response = True
- spec.classes.add(klass)
- spec.post_load()
- return spec
-
-REPLACE = {" ": "_", "-": "_"}
-KEYWORDS = {"global": "global_",
- "return": "return_"}
-
-def pythonize(name):
- name = str(name)
- for key, val in REPLACE.items():
- name = name.replace(key, val)
- try:
- name = KEYWORDS[name]
- except KeyError:
- pass
- return name
-
-def fill(text, indent, heading = None):
- sub = indent * " "
- if heading:
- init = (indent - 2) * " " + heading + " -- "
- else:
- init = sub
- w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub)
- return w.fill(" ".join(text.split()))
-
-class Rule(Metadata):
-
- PRINT = ["text", "implement", "tests"]
-
- def __init__(self, text, implement, tests, path):
- self.text = text
- self.implement = implement
- self.tests = tests
- self.path = path
-
-def find_rules(node, rules):
- if node.name == "rule":
- rules.append(Rule(node.text, node.get("@implement"),
- [ch.text for ch in node if ch.name == "test"],
- node.path()))
- if node.name == "doc" and node.get("@name") == "rule":
- tests = []
- if node.has("@test"):
- tests.append(node["@test"])
- rules.append(Rule(node.text, None, tests, node.path()))
- for child in node:
- find_rules(child, rules)
-
-def load_rules(specfile):
- rules = []
- find_rules(xmlutil.parse(specfile), rules)
- return rules
-
-def test_summary():
- template = """
- <html><head><title>AMQP Tests</title></head>
- <body>
- <table width="80%%" align="center">
- %s
- </table>
- </body>
- </html>
- """
- rows = []
- for rule in load_rules("amqp.org/specs/amqp7.xml"):
- if rule.tests:
- tests = ", ".join(rule.tests)
- else:
- tests = "&nbsp;"
- rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>'
- '<td><b>Implement:</b> %s</td>'
- '<td><b>Tests:</b> %s</td></tr>' %
- (rule.path[len("/root/amqp"):], rule.implement, tests))
- rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text)
- rows.append('<tr><td colspan="3">&nbsp;</td></tr>')
-
- print template % "\n".join(rows)
diff --git a/Final/python/qpid/testlib.py b/Final/python/qpid/testlib.py
deleted file mode 100644
index 39bad75b86..0000000000
--- a/Final/python/qpid/testlib.py
+++ /dev/null
@@ -1,237 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Support library for qpid python tests.
-#
-
-import sys, re, unittest, os, random, logging
-import qpid.client, qpid.spec
-import Queue
-from getopt import getopt, GetoptError
-from qpid.content import Content
-
-def findmodules(root):
- """Find potential python modules under directory root"""
- found = []
- for dirpath, subdirs, files in os.walk(root):
- modpath = dirpath.replace(os.sep, '.')
- if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
- for f in files:
- match = re.match(r'(.+)\.py$', f)
- if match and f != '__init__.py':
- found.append('.'.join([modpath, match.group(1)]))
- return found
-
-def default(value, default):
- if (value == None): return default
- else: return value
-
-class TestRunner:
- """Runs unit tests.
-
- Parses command line arguments, provides utility functions for tests,
- runs the selected test suite.
- """
-
- def _die(self, message = None):
- if message: print message
- print """
-run-tests [options] [test*]
-The name of a test is package.module.ClassName.testMethod
-Options:
- -?/-h/--help : this message
- -s/--spec <spec.xml> : file containing amqp XML spec
- -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
- """
- sys.exit(1)
-
- def setBroker(self, broker):
- rex = re.compile(r"""
- # [ <user> [ / <password> ] @] <host> [ :<port> ]
- ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
- match = rex.match(broker)
- if not match: self._die("'%s' is not a valid broker" % (broker))
- self.user, self.password, self.host, self.port = match.groups()
- self.port = int(default(self.port, 5672))
- self.user = default(self.user, "guest")
- self.password = default(self.password, "guest")
-
- def __init__(self):
- # Defaults
- self.setBroker("localhost")
- self.spec = "../specs/amqp.0-8.xml"
- self.verbose = 1
- self.ignore = []
-
- def ignoreFile(self, filename):
- f = file(filename)
- for line in f.readlines(): self.ignore.append(line.strip())
- f.close()
-
- def _parseargs(self, args):
- try:
- opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"])
- except GetoptError, e:
- self._die(str(e))
- for opt, value in opts:
- if opt in ("-?", "-h", "--help"): self._die()
- if opt in ("-s", "--spec"): self.spec = value
- if opt in ("-b", "--broker"): self.setBroker(value)
- if opt in ("-v", "--verbose"): self.verbose = 2
- if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
- if opt in ("-i", "--ignore"): self.ignore.append(value)
- if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
-
- if len(self.tests) == 0: self.tests=findmodules("tests")
-
- def testSuite(self):
- class IgnoringTestSuite(unittest.TestSuite):
- def addTest(self, test):
- if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore:
- return
- unittest.TestSuite.addTest(self, test)
-
- # Use our IgnoringTestSuite in the test loader.
- unittest.TestLoader.suiteClass = IgnoringTestSuite
- return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
-
- def run(self, args=sys.argv[1:]):
- self._parseargs(args)
- runner = unittest.TextTestRunner(descriptions=False,
- verbosity=self.verbose)
- result = runner.run(self.testSuite())
- if (self.ignore):
- print "======================================="
- print "NOTE: the following tests were ignored:"
- for t in self.ignore: print t
- print "======================================="
- return result.wasSuccessful()
-
- def connect(self, host=None, port=None, spec=None, user=None, password=None):
- """Connect to the broker, returns a qpid.client.Client"""
- host = host or self.host
- port = port or self.port
- spec = spec or self.spec
- user = user or self.user
- password = password or self.password
- client = qpid.client.Client(host, port, qpid.spec.load(spec))
- client.start({"LOGIN": user, "PASSWORD": password})
- return client
-
-
-# Global instance for tests to call connect.
-testrunner = TestRunner()
-
-
-class TestBase(unittest.TestCase):
- """Base class for Qpid test cases.
-
- self.client is automatically connected with channel 1 open before
- the test methods are run.
-
- Deletes queues and exchanges after. Tests call
- self.queue_declare(channel, ...) and self.exchange_declare(chanel,
- ...) which are wrappers for the Channel functions that note
- resources to clean up later.
- """
-
- def setUp(self):
- self.queues = []
- self.exchanges = []
- self.client = self.connect()
- self.channel = self.client.channel(1)
- self.channel.channel_open()
-
- def tearDown(self):
- for ch, q in self.queues:
- ch.queue_delete(queue=q)
- for ch, ex in self.exchanges:
- ch.exchange_delete(exchange=ex)
-
- def connect(self, *args, **keys):
- """Create a new connction, return the Client object"""
- return testrunner.connect(*args, **keys)
-
- def queue_declare(self, channel=None, *args, **keys):
- channel = channel or self.channel
- reply = channel.queue_declare(*args, **keys)
- self.queues.append((channel, reply.queue))
- return reply
-
- def exchange_declare(self, channel=None, ticket=0, exchange='',
- type='', passive=False, durable=False,
- auto_delete=False, internal=False, nowait=False,
- arguments={}):
- channel = channel or self.channel
- reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
- self.exchanges.append((channel,exchange))
- return reply
-
- def uniqueString(self):
- """Generate a unique string, unique for this TestBase instance"""
- if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
- return "Test Message " + str(self.uniqueCounter)
-
- def consume(self, queueName):
- """Consume from named queue returns the Queue object."""
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
- return self.client.queue(reply.consumer_tag)
-
- def assertEmpty(self, queue):
- """Assert that the queue is empty"""
- try:
- queue.get(timeout=1)
- self.fail("Queue is not empty.")
- except Queue.Empty: None # Ignore
-
- def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
- """
- Publish to exchange and assert queue.get() returns the same message.
- """
- body = self.uniqueString()
- self.channel.basic_publish(exchange=exchange,
- content=Content(body, properties=properties),
- routing_key=routing_key)
- msg = queue.get(timeout=1)
- self.assertEqual(body, msg.content.body)
- if (properties): self.assertEqual(properties, msg.content.properties)
-
- def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
- """
- Publish a message and consume it, assert it comes back intact.
- Return the Queue object used to consume.
- """
- self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
-
- def assertChannelException(self, expectedCode, message):
- self.assertEqual("channel", message.method.klass.name)
- self.assertEqual("close", message.method.name)
- self.assertEqual(expectedCode, message.reply_code)
-
-
- def assertConnectionException(self, expectedCode, message):
- self.assertEqual("connection", message.method.klass.name)
- self.assertEqual("close", message.method.name)
- self.assertEqual(expectedCode, message.reply_code)
-
diff --git a/Final/python/qpid/xmlutil.py b/Final/python/qpid/xmlutil.py
deleted file mode 100644
index 585516b44f..0000000000
--- a/Final/python/qpid/xmlutil.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-XML utilities used by spec.py
-"""
-
-import xml.sax
-from xml.sax.handler import ContentHandler
-
-def parse(file):
- doc = Node("root")
- xml.sax.parse(file, Builder(doc))
- return doc
-
-class Node:
-
- def __init__(self, name, attrs = None, text = None, parent = None):
- self.name = name
- self.attrs = attrs
- self.text = text
- self.parent = parent
- self.children = []
- if parent != None:
- parent.children.append(self)
-
- def get_bool(self, key, default = False):
- v = self.get(key)
- if v == None:
- return default
- else:
- return bool(int(v))
-
- def index(self):
- if self.parent:
- return self.parent.children.index(self)
- else:
- return 0
-
- def has(self, key):
- try:
- result = self[key]
- return True
- except KeyError:
- return False
- except IndexError:
- return False
-
- def get(self, key, default = None):
- if self.has(key):
- return self[key]
- else:
- return default
-
- def __getitem__(self, key):
- if callable(key):
- return filter(key, self.children)
- else:
- t = key.__class__
- meth = "__get%s__" % t.__name__
- if hasattr(self, meth):
- return getattr(self, meth)(key)
- else:
- raise KeyError(key)
-
- def __getstr__(self, name):
- if name[:1] == "@":
- return self.attrs[name[1:]]
- else:
- return self[lambda nd: nd.name == name]
-
- def __getint__(self, index):
- return self.children[index]
-
- def __iter__(self):
- return iter(self.children)
-
- def path(self):
- if self.parent == None:
- return "/%s" % self.name
- else:
- return "%s/%s" % (self.parent.path(), self.name)
-
-class Builder(ContentHandler):
-
- def __init__(self, start = None):
- self.node = start
-
- def __setitem__(self, element, type):
- self.types[element] = type
-
- def startElement(self, name, attrs):
- self.node = Node(name, attrs, None, self.node)
-
- def endElement(self, name):
- self.node = self.node.parent
-
- def characters(self, content):
- if self.node.text == None:
- self.node.text = content
- else:
- self.node.text += content
-
diff --git a/Final/python/rule2test b/Final/python/rule2test
deleted file mode 100755
index 10f151366e..0000000000
--- a/Final/python/rule2test
+++ /dev/null
@@ -1,108 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Convert rules to tests
-#
-import sys, re, os.path
-from getopt import getopt, GetoptError
-from string import capitalize
-from xml import dom
-from xml.dom.minidom import parse
-
-def camelcase(s):
- """Convert 'string like this' to 'StringLikeThis'"""
- return "".join([capitalize(w) for w in re.split(re.compile("\W*"), s)])
-
-def uncapitalize(s): return s[0].lower()+s[1:]
-
-def ancestors(node):
- "Return iterator of ancestors from top-level element to node"
- def generator(node):
- while node and node.parentNode:
- yield node
- node = node.parentNode
- return reversed(list(generator(node)))
-
-def tagAndName(element):
- nameAttr = element.getAttribute("name");
- if (nameAttr) : return camelcase(nameAttr) + camelcase(element.tagName)
- else: return camelcase(element.tagName)
-
-def nodeText(n):
- """Recursively collect text from all text nodes under n"""
- if n.nodeType == dom.Node.TEXT_NODE:
- return n.data
- if n.childNodes:
- return reduce(lambda t, c: t + nodeText(c), n.childNodes, "")
- return ""
-
-def cleanup(docString, level=8):
- unindent = re.sub("\n[ \t]*", "\n", docString.strip())
- emptyLines = re.sub("\n\n\n", "\n\n", unindent)
- indented = re.sub("\n", "\n"+level*" ", emptyLines)
- return level*" " + indented
-
-def printTest(test, docstring):
- print "class %s(TestBase):" % test
- print ' """'
- print docstring
- print ' """'
- print
- print
-
-def printTests(doc, module):
- """Returns dictionary { classname : [ (methodname, docstring)* ] * }"""
- tests = {}
- rules = doc.getElementsByTagName("rule")
- for r in rules:
- path = list(ancestors(r))
- if module == path[1].getAttribute("name").lower():
- test = "".join(map(tagAndName, path[2:])) + "Tests"
- docstring = cleanup(nodeText(r), 4)
- printTest(test, docstring)
-
-def usage(message=None):
- if message: print >>sys.stderr, message
- print >>sys.stderr, """
-rule2test [options] <amqpclass>
-
-Print test classes for each rule for the amqpclass in amqp.xml.
-
-Options:
- -?/-h/--help : this message
- -s/--spec <spec.xml> : file containing amqp XML spec
-"""
- return 1
-
-def main(argv):
- try: opts, args = getopt(argv[1:], "h?s:", ["help", "spec="])
- except GetoptError, e: return usage(e)
- spec = "../specs/amqp.xml" # Default
- for opt, val in opts:
- if (opt in ("-h", "-?", "--help")): return usage()
- if (opt in ("-s", "--spec")): spec = val
- doc = parse(spec)
- if len(args) == 0: return usage()
- printTests(doc, args[0])
- return 0
-
-if (__name__ == "__main__"): sys.exit(main(sys.argv))
diff --git a/Final/python/run-tests b/Final/python/run-tests
deleted file mode 100755
index 90c0200d01..0000000000
--- a/Final/python/run-tests
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import sys
-from qpid.testlib import testrunner
-
-if not testrunner.run(): sys.exit(1)
-
-
-
diff --git a/Final/python/setup.py b/Final/python/setup.py
deleted file mode 100644
index a49fa6ca51..0000000000
--- a/Final/python/setup.py
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from distutils.core import setup
-
-setup(name="qpid", version="0.1", packages=["qpid"], scripts=["amqp-doc"],
- url="http://incubator.apache.org/qpid",
- license="Apache Software License",
- description="Python language client implementation for Apache Qpid")
diff --git a/Final/python/tests/__init__.py b/Final/python/tests/__init__.py
deleted file mode 100644
index 9a09d2d04f..0000000000
--- a/Final/python/tests/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
diff --git a/Final/python/tests/basic.py b/Final/python/tests/basic.py
deleted file mode 100644
index bbbfa8ebf9..0000000000
--- a/Final/python/tests/basic.py
+++ /dev/null
@@ -1,431 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BasicTests(TestBase):
- """Tests for 'methods' on the amqp basic 'class'"""
-
- def test_consume_no_local(self):
- """
- Test that the no_local flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True)
- #establish two consumers one of which excludes delivery of locally sent messages
- channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
- channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
-
- #send a message
- channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
- channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
-
- #check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
- msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.content.body)
- try:
- excluded.get(timeout=1)
- self.fail("Received locally published message though no_local=true")
- except Empty: None
-
-
- def test_consume_exclusive(self):
- """
- Test that the exclusive flag is honoured in the consume method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True)
-
- #check that an exclusive consumer prevents other consumer being created:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2")
- self.fail("Expected consume request to fail due to previous exclusive consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #check that an exclusive consumer cannot be created if a consumer already exists:
- channel.basic_consume(consumer_tag="first", queue="test-queue-2")
- try:
- channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
- self.fail("Expected exclusive consume request to fail due to previous consumer")
- except Closed, e:
- self.assertChannelException(403, e.args[0])
-
- def test_consume_queue_errors(self):
- """
- Test error conditions associated with the queue field of the consume method:
- """
- channel = self.channel
- try:
- #queue specified but doesn't exist:
- channel.basic_consume(queue="invalid-queue")
- self.fail("Expected failure when consuming from non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.basic_consume(queue="")
- self.fail("Expected failure when consuming from unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_consume_unique_consumers(self):
- """
- Ensure unique consumer tags are enforced
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True)
-
- #check that attempts to use duplicate tags are detected and prevented:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- try:
- channel.basic_consume(consumer_tag="first", queue="test-queue-3")
- self.fail("Expected consume request to fail due to non-unique tag")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- def test_cancel(self):
- """
- Test compliance of the basic.cancel method
- """
- channel = self.channel
- #setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
- channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
-
- #cancel should stop messages being delivered
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
- myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.content.body)
- try:
- msg = myqueue.get(timeout=1)
- self.fail("Got message after cancellation: " + msg)
- except Empty: None
-
- #cancellation of non-existant consumers should be handled without error
- channel.basic_cancel(consumer_tag="my-consumer")
- channel.basic_cancel(consumer_tag="this-never-existed")
-
-
- def test_ack(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True)
-
- reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
- queue = self.client.queue(reply.consumer_tag)
-
- channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- def test_recover_requeue(self):
- """
- Test requeing on recovery
- """
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True)
-
- subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- channel.basic_publish(routing_key="test-requeue", content=Content("One"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
- channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_cancel(consumer_tag=subscription.consumer_tag)
- subscription2 = channel.basic_consume(queue="test-requeue")
- queue2 = self.client.queue(subscription2.consumer_tag)
-
- channel.basic_recover(requeue=True)
-
- msg3b = queue2.get(timeout=1)
- msg5b = queue2.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- self.assertEqual(True, msg3b.redelivered)
- self.assertEqual(True, msg5b.redelivered)
-
- try:
- extra = queue2.get(timeout=1)
- self.fail("Got unexpected message in second queue: " + extra.content.body)
- except Empty: None
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message in original queue: " + extra.content.body)
- except Empty: None
-
-
- def test_qos_prefetch_count(self):
- """
- Test that the prefetch count specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 5:
- channel.basic_qos(prefetch_count=5)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered:
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
-
-
- def test_qos_prefetch_size(self):
- """
- Test that the prefetch size specified is honoured
- """
- #setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
- queue = self.client.queue(subscription.consumer_tag)
-
- #set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.basic_qos(prefetch_size=50)
-
- #publish 10 messages:
- for i in range(1, 11):
- channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
-
- #only 5 messages should have been delivered (i.e. 45 bytes worth):
- for i in range(1, 6):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
- except Empty: None
-
- #ack messages and check that the next set arrive ok:
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- for i in range(6, 11):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
- except Empty: None
-
- #make sure that a single oversized message still gets delivered
- large = "abcdefghijklmnopqrstuvwxyz"
- large = large + "-" + large;
- channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
- msg = queue.get(timeout=1)
- self.assertEqual(large, msg.content.body)
-
- def test_get(self):
- """
- Test basic_get method
- """
- channel = self.channel
- channel.queue_declare(queue="test-get", exclusive=True)
-
- #publish some messages (no_ack=True) with persistent messaging
- for i in range(1, 11):
- msg=Content("Message %d" % i)
- msg["delivery mode"] = 2
- channel.basic_publish(routing_key="test-get",content=msg )
-
- #use basic_get to read back the messages, and check that we get an empty at the end
- for i in range(1, 11):
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.content.body)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
-
-
- #publish some messages (no_ack=True) transient messaging
- for i in range(11, 21):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- #use basic_get to read back the messages, and check that we get an empty at the end
- for i in range(11, 21):
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.content.body)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
-
- #repeat for no_ack=False
-
- #publish some messages (no_ack=False) with persistent messaging
- for i in range(21, 31):
- msg=Content("Message %d" % i)
- msg["delivery mode"] = 2
- channel.basic_publish(routing_key="test-get",content=msg )
-
- #use basic_get to read back the messages, and check that we get an empty at the end
- for i in range(21, 31):
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.content.body)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
-
- #public some messages (no_ack=False) with transient messaging
- for i in range(31, 41):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
-
- for i in range(31, 41):
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 33):
- channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [35, 37, 39]):
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
-
- #recover(requeue=True)
- channel.basic_recover(requeue=True)
-
- #get the unacked messages again (34, 36, 38, 40)
- for i in [34, 36, 38, 40]:
- reply = channel.basic_get(no_ack=False)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-ok")
- self.assertEqual("Message %d" % i, reply.content.body)
- channel.basic_ack(delivery_tag=reply.delivery_tag)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
-
- channel.basic_recover(requeue=True)
-
- reply = channel.basic_get(no_ack=True)
- self.assertEqual(reply.method.klass.name, "basic")
- self.assertEqual(reply.method.name, "get-empty")
diff --git a/Final/python/tests/broker.py b/Final/python/tests/broker.py
deleted file mode 100644
index 90009b6847..0000000000
--- a/Final/python/tests/broker.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class BrokerTests(TestBase):
- """Tests for basic Broker functionality"""
-
- def test_amqp_basic_13(self):
- """
- First, this test tries to receive a message with a no-ack
- consumer. Second, this test tries to explicitely receive and
- acknowledge a message with an acknowledging consumer.
- """
- ch = self.channel
- self.queue_declare(ch, queue = "myqueue")
-
- # No ack consumer
- ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag
- body = "test no-ack"
- ch.basic_publish(routing_key = "myqueue", content = Content(body))
- msg = self.client.queue(ctag).get(timeout = 5)
- self.assert_(msg.content.body == body)
-
- # Acknowleding consumer
- self.queue_declare(ch, queue = "otherqueue")
- ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag
- body = "test ack"
- ch.basic_publish(routing_key = "otherqueue", content = Content(body))
- msg = self.client.queue(ctag).get(timeout = 5)
- ch.basic_ack(delivery_tag = msg.delivery_tag)
- self.assert_(msg.content.body == body)
-
- def test_basic_delivery_immediate(self):
- """
- Test basic message delivery where consume is issued before publish
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- reply = channel.basic_consume(queue="test-queue", no_ack=True)
- queue = self.client.queue(reply.consumer_tag)
-
- body = "Immediate Delivery"
- channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True)
- msg = queue.get(timeout=5)
- self.assert_(msg.content.body == body)
-
- # TODO: Ensure we fail if immediate=True and there's no consumer.
-
-
- def test_basic_delivery_queued(self):
- """
- Test basic message delivery where publish is issued before consume
- (i.e. requires queueing of the message)
- """
- channel = self.channel
- self.exchange_declare(channel, exchange="test-exchange", type="direct")
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- body = "Queued Delivery"
- channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body))
- reply = channel.basic_consume(queue="test-queue", no_ack=True)
- queue = self.client.queue(reply.consumer_tag)
- msg = queue.get(timeout=5)
- self.assert_(msg.content.body == body)
-
- def test_invalid_channel(self):
- channel = self.client.channel(200)
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for invalid channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
- def test_closed_channel(self):
- channel = self.client.channel(200)
- channel.channel_open()
- channel.channel_close()
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for closed channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
- def test_channel_flow(self):
- channel = self.channel
- channel.queue_declare(queue="flow_test_queue", exclusive=True)
- channel.basic_consume(consumer_tag="my-tag", queue="flow_test_queue")
- incoming = self.client.queue("my-tag")
-
- channel.channel_flow(active=False)
- channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz"))
- try:
- incoming.get(timeout=1)
- self.fail("Received message when flow turned off.")
- except Empty: None
-
- channel.channel_flow(active=True)
- msg = incoming.get(timeout=1)
- self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
-
-
diff --git a/Final/python/tests/example.py b/Final/python/tests/example.py
deleted file mode 100644
index bc84f002e0..0000000000
--- a/Final/python/tests/example.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class ExampleTest (TestBase):
- """
- An example Qpid test, illustrating the unittest frameowkr and the
- python Qpid client. The test class must inherit TestCase. The
- test code uses the Qpid client to interact with a qpid broker and
- verify it behaves as expected.
- """
-
- def test_example(self):
- """
- An example test. Note that test functions must start with 'test_'
- to be recognized by the test framework.
- """
-
- # By inheriting TestBase, self.client is automatically connected
- # and self.channel is automatically opened as channel(1)
- # Other channel methods mimic the protocol.
- channel = self.channel
-
- # Now we can send regular commands. If you want to see what the method
- # arguments mean or what other commands are available, you can use the
- # python builtin help() method. For example:
- #help(chan)
- #help(chan.exchange_declare)
-
- # If you want browse the available protocol methods without being
- # connected to a live server you can use the amqp-doc utility:
- #
- # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
- #
- # Options:
- # -e, --regexp use regex instead of glob when matching
-
- # Now that we know what commands are available we can use them to
- # interact with the server.
-
- # Here we use ordinal arguments.
- self.exchange_declare(channel, 0, "test", "direct")
-
- # Here we use keyword arguments.
- self.queue_declare(channel, queue="test-queue")
- channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
-
- # Call Channel.basic_consume to register as a consumer.
- # All the protocol methods return a message object. The message object
- # has fields corresponding to the reply method fields, plus a content
- # field that is filled if the reply includes content. In this case the
- # interesting field is the consumer_tag.
- reply = channel.basic_consume(queue="test-queue")
-
- # We can use the Client.queue(...) method to access the queue
- # corresponding to our consumer_tag.
- queue = self.client.queue(reply.consumer_tag)
-
- # Now lets publish a message and see if our consumer gets it. To do
- # this we need to import the Content class.
- body = "Hello World!"
- channel.basic_publish(exchange="test",
- routing_key="key",
- content=Content(body))
-
- # Now we'll wait for the message to arrive. We can use the timeout
- # argument in case the server hangs. By default queue.get() will wait
- # until a message arrives or the connection to the server dies.
- msg = queue.get(timeout=10)
-
- # And check that we got the right response with assertEqual
- self.assertEqual(body, msg.content.body)
-
- # Now acknowledge the message.
- channel.basic_ack(msg.delivery_tag, True)
-
diff --git a/Final/python/tests/exchange.py b/Final/python/tests/exchange.py
deleted file mode 100644
index 56d6fa82e4..0000000000
--- a/Final/python/tests/exchange.py
+++ /dev/null
@@ -1,327 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Tests for exchange behaviour.
-
-Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
-"""
-
-import Queue, logging
-from qpid.testlib import TestBase
-from qpid.content import Content
-from qpid.client import Closed
-
-
-class StandardExchangeVerifier:
- """Verifies standard exchange behavior.
-
- Used as base class for classes that test standard exchanges."""
-
- def verifyDirectExchange(self, ex):
- """Verify that ex behaves like a direct exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
- try:
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
- self.fail("Expected Empty exception")
- except Queue.Empty: None # Expected
-
- def verifyFanOutExchange(self, ex):
- """Verify that ex behaves like a fanout exchange."""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex)
- self.queue_declare(queue="p")
- self.channel.queue_bind(queue="p", exchange=ex)
- for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
-
- def verifyTopicExchange(self, ex):
- """Verify that ex behaves like a topic exchange"""
- self.queue_declare(queue="a")
- self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
- q = self.consume("a")
- self.assertPublishGet(q, ex, "a.b.x")
- self.assertPublishGet(q, ex, "a.x.b.x")
- self.assertPublishGet(q, ex, "a.x.x.b.x")
- # Shouldn't match
- self.channel.basic_publish(exchange=ex, routing_key="a.b")
- self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
- self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")
- self.channel.basic_publish(exchange=ex, routing_key="a.b")
- self.assert_(q.empty())
-
- def verifyHeadersExchange(self, ex):
- """Verify that ex is a headers exchange"""
- self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
- q = self.consume("q")
- headers = {"name":"fred", "age":3}
- self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
- self.channel.basic_publish(exchange=ex) # No headers, won't deliver
- self.assertEmpty(q);
-
-
-class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server SHOULD implement these standard exchange types: topic, headers.
-
- Client attempts to declare an exchange with each of these standard types.
- """
-
- def testDirect(self):
- """Declare and test a direct exchange"""
- self.exchange_declare(0, exchange="d", type="direct")
- self.verifyDirectExchange("d")
-
- def testFanout(self):
- """Declare and test a fanout exchange"""
- self.exchange_declare(0, exchange="f", type="fanout")
- self.verifyFanOutExchange("f")
-
- def testTopic(self):
- """Declare and test a topic exchange"""
- self.exchange_declare(0, exchange="t", type="topic")
- self.verifyTopicExchange("t")
-
- def testHeaders(self):
- """Declare and test a headers exchange"""
- self.exchange_declare(0, exchange="h", type="headers")
- self.verifyHeadersExchange("h")
-
-
-class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST, in each virtual host, pre-declare an exchange instance
- for each standard exchange type that it implements, where the name of the
- exchange instance is amq. followed by the exchange type name.
-
- Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
- those types are defined).
- """
- def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
-
- def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
-
- def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
-
- def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
-
-class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
- """
- The server MUST predeclare a direct exchange to act as the default exchange
- for content Publish methods and for default queue bindings.
-
- Client checks that the default exchange is active by specifying a queue
- binding with no exchange name, and publishing a message with a suitable
- routing key but without specifying the exchange name, then ensuring that
- the message arrives in the queue correctly.
- """
- def testDefaultExchange(self):
- # Test automatic binding by queue name.
- self.queue_declare(queue="d")
- self.assertPublishConsume(queue="d", routing_key="d")
- # Test explicit bind to default queue
- self.verifyDirectExchange("")
-
-
-# TODO aconway 2006-09-27: Fill in empty tests:
-
-class DefaultAccessRuleTests(TestBase):
- """
- The server MUST NOT allow clients to access the default exchange except
- by specifying an empty exchange name in the Queue.Bind and content Publish
- methods.
- """
-
-class ExtensionsRuleTests(TestBase):
- """
- The server MAY implement other exchange types as wanted.
- """
-
-
-class DeclareMethodMinimumRuleTests(TestBase):
- """
- The server SHOULD support a minimum of 16 exchanges per virtual host and
- ideally, impose no limit except as defined by available resources.
-
- The client creates as many exchanges as it can until the server reports
- an error; the number of exchanges successfuly created must be at least
- sixteen.
- """
-
-
-class DeclareMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access to
- the realm in which the exchange exists or will be created, or "passive"
- access if the if-exists flag is set.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
- """
- Exchange names starting with "amq." are reserved for predeclared and
- standardised exchanges. The client MUST NOT attempt to create an exchange
- starting with "amq.".
-
-
- """
-
-
-class DeclareMethodTypeFieldTypedRuleTests(TestBase):
- """
- Exchanges cannot be redeclared with different types. The client MUST not
- attempt to redeclare an existing exchange with a different type than used
- in the original Exchange.Declare method.
-
-
- """
-
-
-class DeclareMethodTypeFieldSupportRuleTests(TestBase):
- """
- The client MUST NOT attempt to create an exchange with a type that the
- server does not support.
-
-
- """
-
-
-class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
- """
- If set, and the exchange does not already exist, the server MUST raise a
- channel exception with reply code 404 (not found).
- """
- def test(self):
- try:
- self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
- self.fail("Expected 404 for passive declaration of unknown exchange.")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-class DeclareMethodDurableFieldSupportRuleTests(TestBase):
- """
- The server MUST support both durable and transient exchanges.
-
-
- """
-
-
-class DeclareMethodDurableFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the durable field if the exchange already exists.
-
-
- """
-
-
-class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
- """
- The server MUST ignore the auto-delete field if the exchange already
- exists.
-
-
- """
-
-
-class DeleteMethodTicketFieldValidityRuleTests(TestBase):
- """
- The client MUST provide a valid access ticket giving "active" access
- rights to the exchange's access realm.
-
- Client creates access ticket with wrong access rights and attempts to use
- in this method.
- """
-
-
-class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
- """
- The client MUST NOT attempt to delete an exchange that does not exist.
- """
-
-
-class HeadersExchangeTests(TestBase):
- """
- Tests for headers exchange functionality.
- """
- def setUp(self):
- TestBase.setUp(self)
- self.queue_declare(queue="q")
- self.q = self.consume("q")
-
- def myAssertPublishGet(self, headers):
- self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
-
- def myBasicPublish(self, headers):
- self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
-
- def testMatchAll(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
-
- # None of these should match
- self.myBasicPublish({})
- self.myBasicPublish({"name":"barney"})
- self.myBasicPublish({"name":10})
- self.myBasicPublish({"name":"fred", "age":2})
- self.assertEmpty(self.q)
-
- def testMatchAny(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
- self.myAssertPublishGet({"name":"fred"})
- self.myAssertPublishGet({"name":"fred", "ignoreme":10})
- self.myAssertPublishGet({"ignoreme":10, "age":3})
-
- # Wont match
- self.myBasicPublish({})
- self.myBasicPublish({"irrelevant":0})
- self.assertEmpty(self.q)
-
-
-class MiscellaneousErrorsTests(TestBase):
- """
- Test some miscellaneous error conditions
- """
- def testTypeNotKnown(self):
- try:
- self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
- self.fail("Expected 503 for declaration of unknown exchange type.")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def testDifferentDeclaredType(self):
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
- try:
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
- self.fail("Expected 530 for redeclaration of exchange with different type.")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
- #cleanup
- other = self.connect()
- c2 = other.channel(1)
- c2.channel_open()
- c2.exchange_delete(exchange="test_different_declared_type_exchange")
-
diff --git a/Final/python/tests/queue.py b/Final/python/tests/queue.py
deleted file mode 100644
index 60ac4c3dfb..0000000000
--- a/Final/python/tests/queue.py
+++ /dev/null
@@ -1,255 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class QueueTests(TestBase):
- """Tests for 'methods' on the amqp queue 'class'"""
-
- def test_purge(self):
- """
- Test that the purge method removes messages from the queue
- """
- channel = self.channel
- #setup, declare a queue and add some messages to it:
- channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True)
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one"))
- channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two"))
- channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three"))
-
- #check that the queue now reports 3 messages:
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(3, reply.message_count)
-
- #now do the purge, then test that three messages are purged and the count drops to 0
- reply = channel.queue_purge(queue="test-queue");
- self.assertEqual(3, reply.message_count)
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(0, reply.message_count)
-
- #send a further message and consume it, ensuring that the other messages are really gone
- channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four"))
- reply = channel.basic_consume(queue="test-queue", no_ack=True)
- queue = self.client.queue(reply.consumer_tag)
- msg = queue.get(timeout=1)
- self.assertEqual("four", msg.content.body)
-
- #check error conditions (use new channels):
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- #queue specified but doesn't exist:
- channel.queue_purge(queue="invalid-queue")
- self.fail("Expected failure when purging non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- channel = self.client.channel(3)
- channel.channel_open()
- try:
- #queue not specified and none previously declared for channel:
- channel.queue_purge()
- self.fail("Expected failure when purging unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.channel_open()
- channel.exchange_delete(exchange="test-exchange")
-
- def test_declare_exclusive(self):
- """
- Test that the exclusive field is honoured in queue.declare
- """
- # TestBase.setUp has already opened channel(1)
- c1 = self.channel
- # Here we open a second separate connection:
- other = self.connect()
- c2 = other.channel(1)
- c2.channel_open()
-
- #declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive="True")
- try:
- #other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive="True")
- self.fail("Expected second exclusive queue_declare to raise a channel exception")
- except Closed, e:
- self.assertChannelException(405, e.args[0])
-
-
- def test_declare_passive(self):
- """
- Test that the passive field is honoured in queue.declare
- """
- channel = self.channel
- #declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive="True")
- channel.queue_declare(queue="passive-queue-1", passive="True")
- try:
- #other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive="True")
- self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_bind(self):
- """
- Test various permutations of the queue.bind method
- """
- channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive="True")
-
- #straightforward case, both exchange & queue exist so no errors expected:
- channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
-
- #bind the default queue for the channel (i.e. last one declared):
- channel.queue_bind(exchange="amq.direct", routing_key="key2")
-
- #use the queue name where neither routing key nor queue are specified:
- channel.queue_bind(exchange="amq.direct")
-
- #try and bind to non-existant exchange
- try:
- channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
- self.fail("Expected bind to non-existant exchange to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #need to reopen a channel:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #try and bind non-existant queue:
- try:
- channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
- self.fail("Expected bind of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
- def test_delete_simple(self):
- """
- Test basic queue deletion
- """
- channel = self.channel
-
- #straight-forward case:
- channel.queue_declare(queue="delete-me")
- channel.basic_publish(routing_key="delete-me", content=Content("a"))
- channel.basic_publish(routing_key="delete-me", content=Content("b"))
- channel.basic_publish(routing_key="delete-me", content=Content("c"))
- reply = channel.queue_delete(queue="delete-me")
- self.assertEqual(3, reply.message_count)
- #check that it has gone be declaring passively
- try:
- channel.queue_declare(queue="delete-me", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- #check attempted deletion of non-existant queue is handled correctly:
- channel = self.client.channel(2)
- channel.channel_open()
- try:
- channel.queue_delete(queue="i-dont-exist", if_empty="True")
- self.fail("Expected delete of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
-
- def test_delete_ifempty(self):
- """
- Test that if_empty field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and add a message to it (use default binding):
- channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive="True")
- channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
-
- #try to delete, but only if empty:
- try:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
- self.fail("Expected delete if_empty to fail for non-empty queue")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
- #need new channel now:
- channel = self.client.channel(2)
- channel.channel_open()
-
- #empty queue:
- reply = channel.basic_consume(queue="delete-me-2", no_ack=True)
- queue = self.client.queue(reply.consumer_tag)
- msg = queue.get(timeout=1)
- self.assertEqual("message", msg.content.body)
- channel.basic_cancel(consumer_tag=reply.consumer_tag)
-
- #retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty="True")
-
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-2", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
- def test_delete_ifunused(self):
- """
- Test that if_unused field of queue_delete is honoured
- """
- channel = self.channel
-
- #create a queue and register a consumer:
- channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive="True")
- reply = channel.basic_consume(queue="delete-me-3", no_ack=True)
-
- #need new channel now:
- channel2 = self.client.channel(2)
- channel2.channel_open()
- #try to delete, but only if empty:
- try:
- channel2.queue_delete(queue="delete-me-3", if_unused="True")
- self.fail("Expected delete if_unused to fail for queue with existing consumer")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
-
- channel.basic_cancel(consumer_tag=reply.consumer_tag)
- channel.queue_delete(queue="delete-me-3", if_unused="True")
- #check that it has gone by declaring passively:
- try:
- channel.queue_declare(queue="delete-me-3", passive="True")
- self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
-
-
diff --git a/Final/python/tests/testlib.py b/Final/python/tests/testlib.py
deleted file mode 100644
index cab07cc4ac..0000000000
--- a/Final/python/tests/testlib.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-#
-# Tests for the testlib itself.
-#
-
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from Queue import Empty
-
-import sys
-from traceback import *
-
-def mytrace(frame, event, arg):
- print_stack(frame);
- print "===="
- return mytrace
-
-class TestBaseTest(TestBase):
- """Verify TestBase functions work as expected"""
-
- def testAssertEmptyPass(self):
- """Test assert empty works"""
- self.queue_declare(queue="empty")
- q = self.consume("empty")
- self.assertEmpty(q)
- try:
- q.get(timeout=1)
- self.fail("Queue is not empty.")
- except Empty: None # Ignore
-
- def testAssertEmptyFail(self):
- self.queue_declare(queue="full")
- q = self.consume("full")
- self.channel.basic_publish(routing_key="full")
- try:
- self.assertEmpty(q);
- self.fail("assertEmpty did not assert on non-empty queue")
- except AssertionError: None # Ignore
-
- def testMessageProperties(self):
- """Verify properties are passed with message"""
- props={"headers":{"x":1, "y":2}}
- self.queue_declare(queue="q")
- q = self.consume("q")
- self.assertPublishGet(q, routing_key="q", properties=props)
-
-
-
diff --git a/Final/python/tests/tx.py b/Final/python/tests/tx.py
deleted file mode 100644
index 054fb8d8b7..0000000000
--- a/Final/python/tests/tx.py
+++ /dev/null
@@ -1,209 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-
-class TxTests(TestBase):
- """
- Tests for 'methods' on the amqp tx 'class'
- """
-
- def test_commit(self):
- """
- Test that commited publishes are delivered and commited acks are not re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
- channel.tx_commit()
-
- #check results
- for i in range(1, 5):
- msg = queue_c.get(timeout=1)
- self.assertEqual("TxMessage %d" % i, msg.content.body)
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("TxMessage 6", msg.content.body)
-
- msg = queue_a.get(timeout=1)
- self.assertEqual("TxMessage 7", msg.content.body)
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #cleanup
- channel.basic_ack(delivery_tag=0, multiple=True)
- channel.tx_commit()
-
- def test_auto_rollback(self):
- """
- Test that a channel closed with an open transaction is effectively rolled back
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- channel.tx_rollback()
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #cleanup
- channel.basic_ack(delivery_tag=0, multiple=True)
- channel.tx_commit()
-
- def test_rollback(self):
- """
- Test that rolled back publishes are not delivered and rolled back acks are re-delivered
- """
- channel = self.channel
- queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- channel.tx_rollback()
-
- #check results
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
-
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
-
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
-
- for q in [queue_a, queue_b, queue_c]:
- try:
- extra = q.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None
-
- #cleanup
- channel.basic_ack(delivery_tag=0, multiple=True)
- channel.tx_commit()
-
- def perform_txn_work(self, channel, name_a, name_b, name_c):
- """
- Utility method that does some setup and some work under a transaction. Used for testing both
- commit and rollback
- """
- #setup:
- channel.queue_declare(queue=name_a, exclusive=True)
- channel.queue_declare(queue=name_b, exclusive=True)
- channel.queue_declare(queue=name_c, exclusive=True)
-
- key = "my_key_" + name_b
- topic = "my_topic_" + name_c
-
- channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
- channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
-
- for i in range(1, 5):
- channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i))
-
- channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6"))
- channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7"))
-
- channel.tx_select()
-
- #consume and ack messages
- sub_a = channel.basic_consume(queue=name_a, no_ack=False)
- queue_a = self.client.queue(sub_a.consumer_tag)
- for i in range(1, 5):
- msg = queue_a.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
-
- sub_b = channel.basic_consume(queue=name_b, no_ack=False)
- queue_b = self.client.queue(sub_b.consumer_tag)
- msg = queue_b.get(timeout=1)
- self.assertEqual("Message 6", msg.content.body)
- channel.basic_ack(delivery_tag=msg.delivery_tag)
-
- sub_c = channel.basic_consume(queue=name_c, no_ack=False)
- queue_c = self.client.queue(sub_c.consumer_tag)
- msg = queue_c.get(timeout=1)
- self.assertEqual("Message 7", msg.content.body)
- channel.basic_ack(delivery_tag=msg.delivery_tag)
-
- #publish messages
- for i in range(1, 5):
- channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i))
-
- channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6"))
- channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7"))
-
- return queue_a, queue_b, queue_c
-
- def test_commit_overlapping_acks(self):
- """
- Test that logically 'overlapping' acks do not cause errors on commit
- """
- channel = self.channel
- channel.queue_declare(queue="commit-overlapping", exclusive=True)
- for i in range(1, 10):
- channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i))
-
-
- channel.tx_select()
-
- sub = channel.basic_consume(queue="commit-overlapping", no_ack=False)
- queue = self.client.queue(sub.consumer_tag)
- for i in range(1, 10):
- msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
- if i in [3, 6, 10]:
- channel.basic_ack(delivery_tag=msg.delivery_tag)
-
- channel.tx_commit()
-
- #check all have been acked:
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None