summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /python
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/README.txt24
-rwxr-xr-xpython/amqp-doc75
-rw-r--r--python/cpp_failing.txt0
-rw-r--r--python/doc/test-requirements.txt10
-rw-r--r--python/java_failing.txt13
-rwxr-xr-xpython/pal2py255
-rw-r--r--python/qpid/__init__.py17
-rw-r--r--python/qpid/client.py111
-rw-r--r--python/qpid/codec.py221
-rw-r--r--python/qpid/connection.py265
-rw-r--r--python/qpid/content.py47
-rw-r--r--python/qpid/delegate.py52
-rw-r--r--python/qpid/message.py81
-rw-r--r--python/qpid/peer.py209
-rw-r--r--python/qpid/queue.py42
-rw-r--r--python/qpid/spec.py349
-rw-r--r--python/qpid/testlib.py221
-rw-r--r--python/qpid/xmlutil.py116
-rwxr-xr-xpython/rule2test89
-rwxr-xr-xpython/run-tests24
-rw-r--r--python/tests/__init__.py1
-rw-r--r--python/tests/basic.py115
-rw-r--r--python/tests/broker.py84
-rw-r--r--python/tests/example.py91
-rw-r--r--python/tests/exchange.py234
-rw-r--r--python/tests/queue.py254
26 files changed, 3000 insertions, 0 deletions
diff --git a/python/README.txt b/python/README.txt
new file mode 100644
index 0000000000..0a64f0e2f2
--- /dev/null
+++ b/python/README.txt
@@ -0,0 +1,24 @@
+= RUNNING THE PYTHON TESTS =
+
+The tests/ directory contains a collection of python unit tests to
+exercise functions of a broker.
+
+Simplest way to run the tests:
+
+ * Run a broker on the default port
+
+ * ./run_tests
+
+For additional options: ./run_tests --help
+
+
+== Expected failures ==
+
+Until we complete functionality, tests may fail because the tested
+functionality is missing in the broker. To skip expected failures
+in the C++ or Java brokers:
+
+ ./run_tests -I cpp_failing.txt
+ ./run_tests -I java_failing.txt
+
+If you fix a failure, please remove it from the corresponding list.
diff --git a/python/amqp-doc b/python/amqp-doc
new file mode 100755
index 0000000000..a5b785fd73
--- /dev/null
+++ b/python/amqp-doc
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys, re
+from qpid.spec import load, pythonize
+from getopt import gnu_getopt as getopt, GetoptError
+from fnmatch import fnmatchcase as fnmatch
+
+def die(msg):
+ print >> sys.stderr, msg
+ sys.exit(1)
+
+def usage(msg = ""):
+ return ("""%s
+
+Usage %s [<options>] [<pattern_1> ... <pattern_n>]
+
+Options:
+ -e, --regexp use regex instead of glob when matching
+ -s, --spec <url> location of amqp.xml
+""" % (msg, sys.argv[0])).strip()
+
+try:
+ opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="])
+except GetoptError, e:
+ die(str(e))
+
+regexp = False
+spec = "../specs/amqp-8.0.xml"
+for k, v in opts:
+ if k == "-e" or k == "--regexp": regexp = True
+ if k == "-s" or k == "--spec": spec = v
+
+if regexp:
+ def match(pattern, value):
+ try:
+ return re.match(pattern, value)
+ except Exception, e:
+ die("error: '%s': %s" % (pattern, e))
+else:
+ def match(pattern, value):
+ return fnmatch(value, pattern)
+
+spec = load(spec)
+methods = {}
+patterns = args
+for pattern in patterns:
+ for c in spec.classes:
+ for m in c.methods:
+ name = pythonize("%s_%s" % (c.name, m.name))
+ if match(pattern, name):
+ methods[name] = m.define_method(name)
+
+if patterns:
+ if methods:
+ AMQP = type("AMQP[%s]" % ", ".join(patterns), (), methods)
+ else:
+ die("no matches")
+else:
+ AMQP = spec.define_class("AMQP")
+
+help(AMQP)
diff --git a/python/cpp_failing.txt b/python/cpp_failing.txt
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/python/cpp_failing.txt
diff --git a/python/doc/test-requirements.txt b/python/doc/test-requirements.txt
new file mode 100644
index 0000000000..a1ba414eb2
--- /dev/null
+++ b/python/doc/test-requirements.txt
@@ -0,0 +1,10 @@
+ * start and stop server, possibly in different configurations, should
+ at least be able to specify host and port
+
+ * initiate multiple connections/server
+
+ * initiate multiple channels/connection
+
+ * enable positive and negative tests for any protocol interaction
+
+ * test harness must be as robust as possible to spec changes
diff --git a/python/java_failing.txt b/python/java_failing.txt
new file mode 100644
index 0000000000..2e61363817
--- /dev/null
+++ b/python/java_failing.txt
@@ -0,0 +1,13 @@
+tests.basic.BasicTests.test_consume_exclusive
+tests.basic.BasicTests.test_consume_no_local
+tests.basic.BasicTests.test_consume_queue_errors
+tests.basic.BasicTests.test_consume_unique_consumers
+tests.exchange.RecommendedTypesRuleTests.testFanout
+tests.exchange.RequiredInstancesRuleTests.testAmqFanOut
+tests.queue.QueueTests.test_declare_exclusive
+tests.queue.QueueTests.test_declare_passive
+tests.queue.QueueTests.test_delete_ifempty
+tests.queue.QueueTests.test_delete_ifunused
+tests.queue.QueueTests.test_delete_simple
+tests.queue.QueueTests.test_purge
+tests.queue.QueueTests.test_bind
diff --git a/python/pal2py b/python/pal2py
new file mode 100755
index 0000000000..48fa0111de
--- /dev/null
+++ b/python/pal2py
@@ -0,0 +1,255 @@
+#!/usr/bin/env python
+import sys, os, xml
+
+from qpid.spec import load, pythonize
+from textwrap import TextWrapper
+from xml.sax.handler import ContentHandler
+
+class Block:
+
+ def __init__(self, children):
+ self.children = children
+
+ def emit(self, out):
+ for child in self.children:
+ if not hasattr(child, "emit"):
+ raise ValueError(child)
+ child.emit(out)
+
+ if not self.children:
+ out.line("pass")
+
+class If:
+
+ def __init__(self, expr, cons, alt = None):
+ self.expr = expr
+ self.cons = cons
+ self.alt = alt
+
+ def emit(self, out):
+ out.line("if ")
+ self.expr.emit(out)
+ out.write(":")
+ out.level += 1
+ self.cons.emit(out)
+ out.level -= 1
+ if self.alt:
+ out.line("else:")
+ out.level += 1
+ self.alt.emit(out)
+ out.level -= 1
+
+class Stmt:
+
+ def __init__(self, code):
+ self.code = code
+
+ def emit(self, out):
+ out.line(self.code)
+
+class Expr:
+
+ def __init__(self, code):
+ self.code = code
+
+ def emit(self, out):
+ out.write(self.code)
+
+class Abort:
+
+ def __init__(self, expr):
+ self.expr = expr
+
+ def emit(self, out):
+ out.line("assert False, ")
+ self.expr.emit(out)
+
+WRAPPER = TextWrapper()
+
+def wrap(text):
+ return WRAPPER.wrap(" ".join(text.split()))
+
+class Doc:
+
+ def __init__(self, text):
+ self.text = text
+
+ def emit(self, out):
+ out.line('"""')
+ for line in wrap(self.text):
+ out.line(line)
+ out.line('"""')
+
+class Frame:
+
+ def __init__(self, attrs):
+ self.attrs = attrs
+ self.children = []
+ self.text = None
+
+ def __getattr__(self, attr):
+ return self.attrs[attr]
+
+def isunicode(s):
+ if isinstance(s, str):
+ return False
+ for ch in s:
+ if ord(ch) > 127:
+ return True
+ return False
+
+def string_literal(s):
+ if s == None:
+ return None
+ if isunicode(s):
+ return "%r" % s
+ else:
+ return "%r" % str(s)
+
+TRUTH = {
+ "1": True,
+ "0": False,
+ "true": True,
+ "false": False
+ }
+
+LITERAL = {
+ "shortstr": string_literal,
+ "longstr": string_literal,
+ "bit": lambda s: TRUTH[s.lower()],
+ "longlong": lambda s: "%r" % long(s)
+ }
+
+def literal(s, field):
+ return LITERAL[field.type](s)
+
+def palexpr(s, field):
+ if s.startswith("$"):
+ return "msg.%s" % s[1:]
+ else:
+ return literal(s, field)
+
+class Translator(ContentHandler):
+
+ def __init__(self, spec):
+ self.spec = spec
+ self.stack = []
+ self.content = None
+ self.root = Frame(None)
+ self.push(self.root)
+
+ def emit(self, out):
+ blk = Block(self.root.children)
+ blk.emit(out)
+ out.write("\n")
+
+ def peek(self):
+ return self.stack[-1]
+
+ def pop(self):
+ return self.stack.pop()
+
+ def push(self, frame):
+ self.stack.append(frame)
+
+ def startElement(self, name, attrs):
+ self.push(Frame(attrs))
+
+ def endElement(self, name):
+ frame = self.pop()
+ if hasattr(self, name):
+ child = getattr(self, name)(frame)
+ else:
+ child = self.handle(name, frame)
+
+ if child:
+ self.peek().children.append(child)
+
+ def characters(self, text):
+ frame = self.peek()
+ if frame.text:
+ frame.text += text
+ else:
+ frame.text = text
+
+ def handle(self, name, frame):
+ for klass in self.spec.classes:
+ pyklass = pythonize(klass.name)
+ if name.startswith(pyklass):
+ name = name[len(pyklass) + 1:]
+ break
+ else:
+ raise ValueError("unknown class: %s" % name)
+
+ for method in klass.methods:
+ pymethod = pythonize(method.name)
+ if name == pymethod:
+ break
+ else:
+ raise ValueError("unknown method: %s" % name)
+
+ args = ["%s = %s" % (key, palexpr(val, method.fields.bypyname[key]))
+ for key, val in frame.attrs.items()]
+ if method.content and self.content:
+ args.append("content = %r" % string_literal(self.content))
+ code = "ssn.%s_%s(%s)" % (pyklass, pymethod, ", ".join(args))
+ if pymethod == "consume":
+ code = "consumer_tag = %s.consumer_tag" % code
+ return Stmt(code)
+
+ def pal(self, frame):
+ return Block([Doc(frame.text)] + frame.children)
+
+ def include(self, frame):
+ base, ext = os.path.splitext(frame.filename)
+ return Stmt("from %s import *" % base)
+
+ def session(self, frame):
+ return Block([Stmt("cli = open()"), Stmt("ssn = cli.channel(0)"),
+ Stmt("ssn.channel_open()")] + frame.children)
+
+ def empty(self, frame):
+ return If(Expr("msg == None"), Block(frame.children))
+
+ def abort(self, frame):
+ return Abort(Expr(string_literal(frame.text)))
+
+ def wait(self, frame):
+ return Stmt("msg = ssn.queue(consumer_tag).get(timeout=%r)" %
+ (int(frame.timeout)/1000))
+
+ def basic_arrived(self, frame):
+ if frame.children:
+ return If(Expr("msg != None"), Block(frame.children))
+
+ def basic_content(self, frame):
+ self.content = frame.text
+
+class Emitter:
+
+ def __init__(self, out):
+ self.out = out
+ self.level = 0
+
+ def write(self, code):
+ self.out.write(code)
+
+ def line(self, code):
+ self.write("\n%s%s" % (" "*self.level, code))
+
+ def flush(self):
+ self.out.flush()
+
+ def close(self):
+ self.out.close()
+
+
+for f in sys.argv[2:]:
+ base, ext = os.path.splitext(f)
+ spec = load(sys.argv[1])
+ t = Translator(spec)
+ xml.sax.parse(f, t)
+# out = Emitter(open("%s.py" % base))
+ out = Emitter(sys.stdout)
+ t.emit(out)
+ out.close()
diff --git a/python/qpid/__init__.py b/python/qpid/__init__.py
new file mode 100644
index 0000000000..3f69e88e24
--- /dev/null
+++ b/python/qpid/__init__.py
@@ -0,0 +1,17 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import spec, codec, connection, content, peer, delegate, client
diff --git a/python/qpid/client.py b/python/qpid/client.py
new file mode 100644
index 0000000000..cef10622ac
--- /dev/null
+++ b/python/qpid/client.py
@@ -0,0 +1,111 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+An AQMP client implementation that uses a custom delegate for
+interacting with the server.
+"""
+
+import threading
+from peer import Peer, Closed
+from delegate import Delegate
+from connection import Connection, Frame
+from spec import load
+from queue import Queue
+
+
+class Client:
+
+ def __init__(self, host, port, spec, vhost = None):
+ self.host = host
+ self.port = port
+ self.spec = spec
+
+ self.mechanism = None
+ self.response = None
+ self.locale = None
+
+ self.vhost = vhost
+ if self.vhost == None:
+ self.vhost = self.host
+
+ self.queues = {}
+ self.lock = threading.Lock()
+
+ self.closed = False
+ self.started = threading.Event()
+
+ self.conn = Connection(self.host, self.port, self.spec)
+ self.peer = Peer(self.conn, ClientDelegate(self))
+
+ def wait(self):
+ self.started.wait()
+ if self.closed:
+ raise EOFError()
+
+ def queue(self, key):
+ self.lock.acquire()
+ try:
+ try:
+ q = self.queues[key]
+ except KeyError:
+ q = Queue(0)
+ self.queues[key] = q
+ finally:
+ self.lock.release()
+ return q
+
+ def start(self, response, mechanism="AMQPLAIN", locale="en_US"):
+ self.mechanism = mechanism
+ self.response = response
+ self.locale = locale
+
+ self.conn.connect()
+ self.conn.init()
+ self.peer.start()
+ self.wait()
+ self.channel(0).connection_open(self.vhost)
+
+ def channel(self, id):
+ return self.peer.channel(id)
+
+class ClientDelegate(Delegate):
+
+ def __init__(self, client):
+ Delegate.__init__(self)
+ self.client = client
+
+ def connection_start(self, ch, msg):
+ ch.connection_start_ok(mechanism=self.client.mechanism,
+ response=self.client.response,
+ locale=self.client.locale)
+
+ def connection_tune(self, ch, msg):
+ ch.connection_tune_ok(*msg.fields)
+ self.client.started.set()
+
+ def basic_deliver(self, ch, msg):
+ self.client.queue(msg.consumer_tag).put(msg)
+
+ def channel_close(self, ch, msg):
+ ch.close(msg)
+
+ def connection_close(self, ch, msg):
+ self.client.peer.close(msg)
+
+ def close(self, reason):
+ self.client.closed = True
+ self.client.started.set()
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
new file mode 100644
index 0000000000..c4bbe91f32
--- /dev/null
+++ b/python/qpid/codec.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utility code to translate between python objects and AMQP encoded data
+fields.
+"""
+
+from cStringIO import StringIO
+from struct import *
+
+class EOF(Exception):
+ pass
+
+class Codec:
+
+ def __init__(self, stream):
+ self.stream = stream
+ self.nwrote = 0
+ self.nread = 0
+ self.incoming_bits = []
+ self.outgoing_bits = []
+
+ def read(self, n):
+ data = self.stream.read(n)
+ if n > 0 and len(data) == 0:
+ raise EOF()
+ self.nread += len(data)
+ return data
+
+ def write(self, s):
+ self.flushbits()
+ self.stream.write(s)
+ self.nwrote += len(s)
+
+ def flush(self):
+ self.flushbits()
+ self.stream.flush()
+
+ def flushbits(self):
+ if len(self.outgoing_bits) > 0:
+ bytes = []
+ index = 0
+ for b in self.outgoing_bits:
+ if index == 0: bytes.append(0)
+ if b: bytes[-1] |= 1 << index
+ index = (index + 1) % 8
+ del self.outgoing_bits[:]
+ for byte in bytes:
+ self.encode_octet(byte)
+
+ def pack(self, fmt, *args):
+ self.write(pack(fmt, *args))
+
+ def unpack(self, fmt):
+ size = calcsize(fmt)
+ data = self.read(size)
+ values = unpack(fmt, data)
+ if len(values) == 1:
+ return values[0]
+ else:
+ return values
+
+ def encode(self, type, value):
+ getattr(self, "encode_" + type)(value)
+
+ def decode(self, type):
+ return getattr(self, "decode_" + type)()
+
+ # bit
+ def encode_bit(self, o):
+ if o:
+ self.outgoing_bits.append(True)
+ else:
+ self.outgoing_bits.append(False)
+
+ def decode_bit(self):
+ if len(self.incoming_bits) == 0:
+ bits = self.decode_octet()
+ for i in range(8):
+ self.incoming_bits.append(bits >> i & 1 != 0)
+ return self.incoming_bits.pop(0)
+
+ # octet
+ def encode_octet(self, o):
+ self.pack("!B", o)
+
+ def decode_octet(self):
+ return self.unpack("!B")
+
+ # short
+ def encode_short(self, o):
+ self.pack("!H", o)
+
+ def decode_short(self):
+ return self.unpack("!H")
+
+ # long
+ def encode_long(self, o):
+ self.pack("!L", o)
+
+ def decode_long(self):
+ return self.unpack("!L")
+
+ # longlong
+ def encode_longlong(self, o):
+ self.pack("!Q", o)
+
+ def decode_longlong(self):
+ return self.unpack("!Q")
+
+ def enc_str(self, fmt, s):
+ size = len(s)
+ self.pack(fmt, size)
+ self.write(s)
+
+ def dec_str(self, fmt):
+ size = self.unpack(fmt)
+ return self.read(size)
+
+ # shortstr
+ def encode_shortstr(self, s):
+ self.enc_str("!B", s)
+
+ def decode_shortstr(self):
+ return self.dec_str("!B")
+
+ # longstr
+ def encode_longstr(self, s):
+ if isinstance(s, dict):
+ self.encode_table(s)
+ else:
+ self.enc_str("!L", s)
+
+ def decode_longstr(self):
+ return self.dec_str("!L")
+
+ # table
+ def encode_table(self, tbl):
+ enc = StringIO()
+ codec = Codec(enc)
+ for key, value in tbl.items():
+ codec.encode_shortstr(key)
+ if isinstance(value, basestring):
+ codec.write("S")
+ codec.encode_longstr(value)
+ else:
+ codec.write("I")
+ codec.encode_long(value)
+ s = enc.getvalue()
+ self.encode_long(len(s))
+ self.write(s)
+
+ def decode_table(self):
+ size = self.decode_long()
+ start = self.nread
+ result = {}
+ while self.nread - start < size:
+ key = self.decode_shortstr()
+ type = self.read(1)
+ if type == "S":
+ value = self.decode_longstr()
+ elif type == "I":
+ value = self.decode_long()
+ else:
+ raise ValueError(repr(type))
+ result[key] = value
+ return result
+
+def test(type, value):
+ if isinstance(value, (list, tuple)):
+ values = value
+ else:
+ values = [value]
+ stream = StringIO()
+ codec = Codec(stream)
+ for v in values:
+ codec.encode(type, v)
+ codec.flush()
+ enc = stream.getvalue()
+ stream.reset()
+ dup = []
+ for i in xrange(len(values)):
+ dup.append(codec.decode(type))
+ if values != dup:
+ raise AssertionError("%r --> %r --> %r" % (values, enc, dup))
+
+if __name__ == "__main__":
+ def dotest(type, value):
+ args = (type, value)
+ test(*args)
+
+ for value in ("1", "0", "110", "011", "11001", "10101", "10011"):
+ for i in range(10):
+ dotest("bit", map(lambda x: x == "1", value*i))
+
+ for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}):
+ dotest("table", value)
+
+ for type in ("octet", "short", "long", "longlong"):
+ for value in range(0, 256):
+ dotest(type, value)
+
+ for type in ("shortstr", "longstr"):
+ for value in ("", "a", "asdf"):
+ dotest(type, value)
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
new file mode 100644
index 0000000000..f4d0817e60
--- /dev/null
+++ b/python/qpid/connection.py
@@ -0,0 +1,265 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A Connection class containing socket code that uses the spec metadata
+to read and write Frame objects. This could be used by a client,
+server, or even a proxy implementation.
+"""
+
+import socket, codec
+from cStringIO import StringIO
+from spec import load, pythonize
+from codec import EOF
+
+class SockIO:
+
+ def __init__(self, sock):
+ self.sock = sock
+
+ def write(self, buf):
+# print "OUT: %r" % buf
+ self.sock.sendall(buf)
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.error:
+ break
+ if len(s) == 0:
+ break
+# print "IN: %r" % s
+ data += s
+ return data
+
+ def flush(self):
+ pass
+
+class Connection:
+
+ def __init__(self, host, port, spec):
+ self.host = host
+ self.port = port
+ self.spec = spec
+ self.FRAME_END = self.spec.constants.byname["frame end"].id
+
+ def connect(self):
+ sock = socket.socket()
+ sock.connect((self.host, self.port))
+ sock.setblocking(1)
+ self.codec = codec.Codec(SockIO(sock))
+
+ def flush(self):
+ self.codec.flush()
+
+ INIT="!4s4B"
+
+ def init(self):
+ self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
+ self.spec.minor)
+
+ def write(self, frame):
+ c = self.codec
+ c.encode_octet(self.spec.constants.byname[frame.payload.type].id)
+ c.encode_short(frame.channel)
+ frame.payload.encode(c)
+ c.encode_octet(self.FRAME_END)
+
+ def read(self):
+ c = self.codec
+ type = self.spec.constants.byid[c.decode_octet()].name
+ channel = c.decode_short()
+ payload = Frame.DECODERS[type].decode(self.spec, c)
+ end = c.decode_octet()
+ if end != self.FRAME_END:
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
+ frame = Frame(channel, payload)
+ return frame
+
+class Frame:
+
+ METHOD = "frame method"
+ HEADER = "frame header"
+ BODY = "frame body"
+ OOB_METHOD = "frame oob method"
+ OOB_HEADER = "frame oob header"
+ OOB_BODY = "frame oob body"
+ TRACE = "frame trace"
+ HEARTBEAT = "frame heartbeat"
+
+ DECODERS = {}
+
+ def __init__(self, channel, payload):
+ self.channel = channel
+ self.payload = payload
+
+ def __str__(self):
+ return "[%d] %s" % (self.channel, self.payload)
+
+class Payload:
+
+ class __metaclass__(type):
+
+ def __new__(cls, name, bases, dict):
+ for req in ("encode", "decode", "type"):
+ if not dict.has_key(req):
+ raise TypeError("%s must define %s" % (name, req))
+ dict["decode"] = staticmethod(dict["decode"])
+ t = type.__new__(cls, name, bases, dict)
+ if t.type != None:
+ Frame.DECODERS[t.type] = t
+ return t
+
+ type = None
+
+ def encode(self, enc): abstract
+
+ def decode(spec, dec): abstract
+
+class Method(Payload):
+
+ type = Frame.METHOD
+
+ def __init__(self, method, *args):
+ if len(args) != len(method.fields):
+ argspec = ["%s: %s" % (pythonize(f.name), f.type)
+ for f in method.fields]
+ raise TypeError("%s.%s expecting (%s), got %s" %
+ (pythonize(method.klass.name),
+ pythonize(method.name), ", ".join(argspec), args))
+ self.method = method
+ self.args = args
+
+ def encode(self, enc):
+ buf = StringIO()
+ c = codec.Codec(buf)
+ c.encode_short(self.method.klass.id)
+ c.encode_short(self.method.id)
+ for field, arg in zip(self.method.fields, self.args):
+ c.encode(field.type, arg)
+ c.flush()
+ enc.encode_longstr(buf.getvalue())
+
+ def decode(spec, dec):
+ enc = dec.decode_longstr()
+ c = codec.Codec(StringIO(enc))
+ klass = spec.classes.byid[c.decode_short()]
+ meth = klass.methods.byid[c.decode_short()]
+ args = tuple([c.decode(f.type) for f in meth.fields])
+ return Method(meth, *args)
+
+ def __str__(self):
+ return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
+
+class Header(Payload):
+
+ type = Frame.HEADER
+
+ def __init__(self, klass, weight, size, **properties):
+ self.klass = klass
+ self.weight = weight
+ self.size = size
+ self.properties = properties
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
+
+ def encode(self, enc):
+ buf = StringIO()
+ c = codec.Codec(buf)
+ c.encode_short(self.klass.id)
+ c.encode_short(self.weight)
+ c.encode_longlong(self.size)
+
+ # property flags
+ nprops = len(self.klass.fields)
+ flags = 0
+ for i in range(nprops):
+ f = self.klass.fields.items[i]
+ flags <<= 1
+ if self.properties.get(f.name) != None:
+ flags |= 1
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0:
+ flags <<= 1
+ if nprops > (i + 1):
+ flags |= 1
+ c.encode_short(flags)
+ flags = 0
+ flags <<= ((16 - (nprops % 15)) % 16)
+ c.encode_short(flags)
+
+ # properties
+ for f in self.klass.fields:
+ v = self.properties.get(f.name)
+ if v != None:
+ c.encode(f.type, v)
+ c.flush()
+ enc.encode_longstr(buf.getvalue())
+
+ def decode(spec, dec):
+ c = codec.Codec(StringIO(dec.decode_longstr()))
+ klass = spec.classes.byid[c.decode_short()]
+ weight = c.decode_short()
+ size = c.decode_longlong()
+
+ # property flags
+ bits = []
+ while True:
+ flags = c.decode_short()
+ for i in range(15, 0, -1):
+ if flags >> i & 0x1 != 0:
+ bits.append(True)
+ else:
+ bits.append(False)
+ if flags & 0x1 == 0:
+ break
+
+ # properties
+ properties = {}
+ for b, f in zip(bits, klass.fields):
+ if b:
+ properties[f.name] = c.decode(f.type)
+
+ return Header(klass, weight, size, **properties)
+
+ def __str__(self):
+ return "%s %s %s %s" % (self.klass, self.weight, self.size,
+ self.properties)
+
+class Body(Payload):
+
+ type = Frame.BODY
+
+ def __init__(self, content):
+ self.content = content
+
+ def encode(self, enc):
+ enc.encode_longstr(self.content)
+
+ def decode(spec, dec):
+ return Body(dec.decode_longstr())
+
+ def __str__(self):
+ return "Body(%r)" % self.content
diff --git a/python/qpid/content.py b/python/qpid/content.py
new file mode 100644
index 0000000000..33c9ec35f4
--- /dev/null
+++ b/python/qpid/content.py
@@ -0,0 +1,47 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple python representation for AMQP content.
+"""
+
+def default(val, defval):
+ if val == None:
+ return defval
+ else:
+ return val
+
+class Content:
+
+ def __init__(self, body = "", children = None, properties = None):
+ self.body = body
+ self.children = default(children, [])
+ self.properties = default(properties, {})
+
+ def size(self):
+ return len(self.body)
+
+ def weight(self):
+ return len(self.children)
+
+ def __getitem__(self, name):
+ return self.properties[name]
+
+ def __setitem__(self, name, value):
+ self.properties[name] = value
+
+ def __delitem__(self, name):
+ del self.properties[name]
diff --git a/python/qpid/delegate.py b/python/qpid/delegate.py
new file mode 100644
index 0000000000..0467162498
--- /dev/null
+++ b/python/qpid/delegate.py
@@ -0,0 +1,52 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Delegate implementation intended for use with the peer module.
+"""
+
+import threading, inspect
+from spec import pythonize
+
+class Delegate:
+
+ def __init__(self):
+ self.handlers = {}
+ self.invokers = {}
+ # initialize all the mixins
+ self.invoke_all("init")
+
+ def invoke_all(self, meth, *args, **kwargs):
+ for cls in inspect.getmro(self.__class__):
+ if hasattr(cls, meth):
+ getattr(cls, meth)(self, *args, **kwargs)
+
+ def dispatch(self, channel, message):
+ method = message.method
+ spec = method.klass.spec
+
+ try:
+ handler = self.handlers[method]
+ except KeyError:
+ name = "%s_%s" % (pythonize(method.klass.name),
+ pythonize(method.name))
+ handler = getattr(self, name)
+ self.handlers[method] = handler
+
+ return handler(channel, message)
+
+ def close(self, reason):
+ self.invoke_all("close", reason)
diff --git a/python/qpid/message.py b/python/qpid/message.py
new file mode 100644
index 0000000000..08b3e70c0b
--- /dev/null
+++ b/python/qpid/message.py
@@ -0,0 +1,81 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from sets import Set
+
+class Message:
+
+ COMMON_FIELDS = Set(("content", "method", "fields"))
+
+ def __init__(self, method, fields, content = None):
+ self.method = method
+ self.fields = fields
+ self.content = content
+
+ def __len__(self):
+ l = len(self.fields)
+ if method.content:
+ l += 1
+ return len(self.fields)
+
+ def _idx(self, idx):
+ if idx < 0: idx += len(self)
+ if idx < 0 or idx > len(self):
+ raise IndexError(idx)
+ return idx
+
+ def __getitem__(self, idx):
+ idx = self._idx(idx)
+ if idx == len(self.fields):
+ return self.content
+ else:
+ return self.fields[idx]
+
+ def __setitem__(self, idx, value):
+ idx = self._idx(idx)
+ if idx == len(self.fields):
+ self.content = value
+ else:
+ self.fields[idx] = value
+
+ def _slot(self, attr):
+ if attr in Message.COMMON_FIELDS:
+ env = self.__dict__
+ key = attr
+ else:
+ env = self.fields
+ try:
+ field = self.method.fields.bypyname[attr]
+ key = self.method.fields.index(field)
+ except KeyError:
+ raise AttributeError(attr)
+ return env, key
+
+ def __getattr__(self, attr):
+ env, key = self._slot(attr)
+ return env[key]
+
+ def __setattr__(self, attr, value):
+ env, key = self._slot(attr)
+ env[attr] = value
+
+ STR = "%s %s content = %s"
+ REPR = STR.replace("%s", "%r")
+
+ def __str__(self):
+ return Message.STR % (self.method, self.fields, self.content)
+
+ def __repr__(self):
+ return Message.REPR % (self.method, self.fields, self.content)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
new file mode 100644
index 0000000000..4179a05568
--- /dev/null
+++ b/python/qpid/peer.py
@@ -0,0 +1,209 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module contains a skeletal peer implementation useful for
+implementing an AMQP server, client, or proxy. The peer implementation
+sorts incoming frames to their intended channels, and dispatches
+incoming method frames to a delegate.
+"""
+
+import thread, traceback, socket, sys, logging
+from connection import Frame, EOF, Method, Header, Body
+from message import Message
+from queue import Queue, Closed as QueueClosed
+from content import Content
+from cStringIO import StringIO
+
+class Peer:
+
+ def __init__(self, conn, delegate):
+ self.conn = conn
+ self.delegate = delegate
+ self.outgoing = Queue(0)
+ self.work = Queue(0)
+ self.channels = {}
+ self.Channel = type("Channel%s" % conn.spec.klass.__name__,
+ (Channel, conn.spec.klass), {})
+ self.lock = thread.allocate_lock()
+
+ def channel(self, id):
+ self.lock.acquire()
+ try:
+ try:
+ ch = self.channels[id]
+ except KeyError:
+ ch = self.Channel(id, self.outgoing)
+ self.channels[id] = ch
+ finally:
+ self.lock.release()
+ return ch
+
+ def start(self):
+ thread.start_new_thread(self.writer, ())
+ thread.start_new_thread(self.reader, ())
+ thread.start_new_thread(self.worker, ())
+
+ def fatal(message=None):
+ """Call when an unexpected exception occurs that will kill a thread.
+
+ In this case it's better to crash the process than to continue in
+ an invalid state with a missing thread."""
+ if message: print >> sys.stderr, message
+ traceback.print_exc()
+
+ def reader(self):
+ try:
+ while True:
+ try:
+ frame = self.conn.read()
+ except EOF, e:
+ self.close(e)
+ break
+ ch = self.channel(frame.channel)
+ ch.dispatch(frame, self.work)
+ except:
+ self.fatal()
+
+ def close(self, reason):
+ for ch in self.channels.values():
+ ch.close(reason)
+ self.delegate.close(reason)
+
+ def writer(self):
+ try:
+ while True:
+ try:
+ message = self.outgoing.get()
+ self.conn.write(message)
+ except socket.error, e:
+ self.close(e)
+ break
+ self.conn.flush()
+ except:
+ self.fatal()
+
+ def worker(self):
+ try:
+ while True:
+ self.dispatch(self.work.get())
+ except:
+ self.fatal()
+
+ def dispatch(self, queue):
+ frame = queue.get()
+ channel = self.channel(frame.channel)
+ payload = frame.payload
+ if payload.method.content:
+ content = read_content(queue)
+ else:
+ content = None
+ # Let the caller deal with exceptions thrown here.
+ message = Message(payload.method, payload.args, content)
+ self.delegate.dispatch(channel, message)
+
+class Closed(Exception): pass
+
+class Channel:
+
+ def __init__(self, id, outgoing):
+ self.id = id
+ self.outgoing = outgoing
+ self.incoming = Queue(0)
+ self.responses = Queue(0)
+ self.queue = None
+ self.closed = False
+ self.reason = None
+
+ def close(self, reason):
+ if self.closed:
+ return
+ self.closed = True
+ self.reason = reason
+ self.incoming.close()
+ self.responses.close()
+
+ def dispatch(self, frame, work):
+ payload = frame.payload
+ if isinstance(payload, Method):
+ if payload.method.response:
+ self.queue = self.responses
+ else:
+ self.queue = self.incoming
+ work.put(self.incoming)
+ self.queue.put(frame)
+
+ def invoke(self, method, args, content = None):
+ if self.closed:
+ raise Closed(self.reason)
+
+ frame = Frame(self.id, Method(method, *args))
+ self.outgoing.put(frame)
+
+ if method.content:
+ if content == None:
+ content = Content()
+ self.write_content(method.klass, content, self.outgoing)
+
+ try:
+ # here we depend on all nowait fields being named nowait
+ f = method.fields.byname["nowait"]
+ nowait = args[method.fields.index(f)]
+ except KeyError:
+ nowait = False
+
+ try:
+ if not nowait and method.responses:
+ resp = self.responses.get().payload
+ if resp.method.content:
+ content = read_content(self.responses)
+ else:
+ content = None
+ if resp.method in method.responses:
+ return Message(resp.method, resp.args, content)
+ else:
+ raise ValueError(resp)
+ except QueueClosed, e:
+ if self.closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+
+ def write_content(self, klass, content, queue):
+ size = content.size()
+ header = Frame(self.id, Header(klass, content.weight(), size))
+ queue.put(header)
+ for child in content.children:
+ self.write_content(klass, child, queue)
+ # should split up if content.body exceeds max frame size
+ if size > 0:
+ queue.put(Frame(self.id, Body(content.body)))
+
+def read_content(queue):
+ frame = queue.get()
+ header = frame.payload
+ children = []
+ for i in range(header.weight):
+ children.append(read_content(queue))
+ size = header.size
+ read = 0
+ buf = StringIO()
+ while read < size:
+ body = queue.get()
+ content = body.payload.content
+ buf.write(content)
+ read += len(content)
+ return Content(buf.getvalue(), children, header.properties.copy())
diff --git a/python/qpid/queue.py b/python/qpid/queue.py
new file mode 100644
index 0000000000..491cc3947d
--- /dev/null
+++ b/python/qpid/queue.py
@@ -0,0 +1,42 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module augments the standard python multithreaded Queue
+implementation to add a close() method so that threads blocking on the
+content of a queue can be notified if the queue is no longer in use.
+"""
+
+from Queue import Queue as BaseQueue, Empty, Full
+
+class Closed(Exception): pass
+
+class Queue(BaseQueue):
+
+ END = object()
+
+ def close(self):
+ self.put(Queue.END)
+
+ def get(self, block = True, timeout = None):
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed()
+ else:
+ return result
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
new file mode 100644
index 0000000000..70e09aa1e9
--- /dev/null
+++ b/python/qpid/spec.py
@@ -0,0 +1,349 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module loads protocol metadata into python objects. It provides
+access to spec metadata via a python object model, and can also
+dynamically creating python methods, classes, and modules based on the
+spec metadata. All the generated methods have proper signatures and
+doc strings based on the spec metadata so the python help system can
+be used to browse the spec documentation. The generated methods all
+dispatch to the self.invoke(meth, args) callback of the containing
+class so that the generated code can be reused in a variety of
+situations.
+"""
+
+import re, textwrap, new, xmlutil
+
+class SpecContainer:
+
+ def __init__(self):
+ self.items = []
+ self.byname = {}
+ self.byid = {}
+ self.indexes = {}
+ self.bypyname = {}
+
+ def add(self, item):
+ if self.byname.has_key(item.name):
+ raise ValueError("duplicate name: %s" % item)
+ if self.byid.has_key(item.id):
+ raise ValueError("duplicate id: %s" % item)
+ pyname = pythonize(item.name)
+ if self.bypyname.has_key(pyname):
+ raise ValueError("duplicate pyname: %s" % item)
+ self.indexes[item] = len(self.items)
+ self.items.append(item)
+ self.byname[item.name] = item
+ self.byid[item.id] = item
+ self.bypyname[pyname] = item
+
+ def index(self, item):
+ try:
+ return self.indexes[item]
+ except KeyError:
+ raise ValueError(item)
+
+ def __iter__(self):
+ return iter(self.items)
+
+ def __len__(self):
+ return len(self.items)
+
+class Metadata:
+
+ PRINT = []
+
+ def __init__(self):
+ pass
+
+ def __str__(self):
+ args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT)
+ return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
+
+ def __repr__(self):
+ return str(self)
+
+class Spec(Metadata):
+
+ PRINT=["major", "minor", "file"]
+
+ def __init__(self, major, minor, file):
+ Metadata.__init__(self)
+ self.major = major
+ self.minor = minor
+ self.file = file
+ self.constants = SpecContainer()
+ self.classes = SpecContainer()
+
+ def post_load(self):
+ self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
+ self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
+
+ def parse_method(self, name):
+ parts = re.split(r"\s*\.\s*", name)
+ if len(parts) != 2:
+ raise ValueError(name)
+ klass, meth = parts
+ return self.classes.byname[klass].methods.byname[meth]
+
+ def define_module(self, name, doc = None):
+ module = new.module(name, doc)
+ module.__file__ = self.file
+ for c in self.classes:
+ classname = pythonize(c.name)
+ cls = c.define_class(classname)
+ cls.__module__ = module.__name__
+ setattr(module, classname, cls)
+ return module
+
+ def define_class(self, name):
+ methods = {}
+ for c in self.classes:
+ for m in c.methods:
+ meth = pythonize(m.klass.name + "_" + m.name)
+ methods[meth] = m.define_method(meth)
+ return type(name, (), methods)
+
+class Constant(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, spec, name, id, klass, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.name = name
+ self.id = id
+ self.klass = klass
+ self.docs = docs
+
+class Class(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, spec, name, id, handler, docs):
+ Metadata.__init__(self)
+ self.spec = spec
+ self.name = name
+ self.id = id
+ self.handler = handler
+ self.fields = SpecContainer()
+ self.methods = SpecContainer()
+ self.docs = docs
+
+ def define_class(self, name):
+ methods = {}
+ for m in self.methods:
+ meth = pythonize(m.name)
+ methods[meth] = m.define_method(meth)
+ return type(name, (), methods)
+
+class Method(Metadata):
+
+ PRINT=["name", "id"]
+
+ def __init__(self, klass, name, id, content, responses, synchronous,
+ description, docs):
+ Metadata.__init__(self)
+ self.klass = klass
+ self.name = name
+ self.id = id
+ self.content = content
+ self.responses = responses
+ self.synchronous = synchronous
+ self.fields = SpecContainer()
+ self.description = description
+ self.docs = docs
+ self.response = False
+
+ def docstring(self):
+ s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
+ for f in self.fields:
+ if f.docs:
+ s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] +
+ [fill(d, 4) for d in f.docs[1:]])
+ return s
+
+ METHOD = "__method__"
+ DEFAULTS = {"bit": False,
+ "shortstr": "",
+ "longstr": "",
+ "table": {},
+ "octet": 0,
+ "short": 0,
+ "long": 0,
+ "longlong": 0}
+
+ def define_method(self, name):
+ g = {Method.METHOD: self}
+ l = {}
+ args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields]
+ if self.content:
+ args += [("content", None)]
+ code = "def %s(self, %s):\n" % \
+ (name, ", ".join(["%s = %r" % a for a in args]))
+ code += " %r\n" % self.docstring()
+ if self.content:
+ methargs = args[:-1]
+ else:
+ methargs = args
+ argnames = ", ".join([a[0] for a in methargs])
+ code += " return self.invoke(%s" % Method.METHOD
+ if argnames:
+ code += ", (%s,)" % argnames
+ if self.content:
+ code += ", content"
+ code += ")"
+ exec code in g, l
+ return l[name]
+
+class Field(Metadata):
+
+ PRINT=["name", "id", "type"]
+
+ def __init__(self, name, id, type, docs):
+ Metadata.__init__(self)
+ self.name = name
+ self.id = id
+ self.type = type
+ self.docs = docs
+
+def get_docs(nd):
+ return [n.text for n in nd["doc"]]
+
+def load_fields(nd, l, domains):
+ for f_nd in nd["field"]:
+ try:
+ type = f_nd["@type"]
+ except KeyError:
+ type = domains[f_nd["@domain"]]
+ l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd)))
+
+def load(specfile):
+ doc = xmlutil.parse(specfile)
+ root = doc["amqp"][0]
+ spec = Spec(int(root["@major"]), int(root["@minor"]), specfile)
+
+ # constants
+ for nd in root["constant"]:
+ const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"),
+ get_docs(nd))
+ spec.constants.add(const)
+
+ # domains are typedefs
+ domains = {}
+ for nd in root["domain"]:
+ domains[nd["@name"]] = nd["@type"]
+
+ # classes
+ for c_nd in root["class"]:
+ klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"],
+ get_docs(c_nd))
+ load_fields(c_nd, klass.fields, domains)
+ for m_nd in c_nd["method"]:
+ meth = Method(klass, m_nd["@name"],
+ int(m_nd["@index"]),
+ m_nd.get_bool("@content", False),
+ [nd["@name"] for nd in m_nd["response"]],
+ m_nd.get_bool("@synchronous", False),
+ m_nd.text,
+ get_docs(m_nd))
+ load_fields(m_nd, meth.fields, domains)
+ klass.methods.add(meth)
+ # resolve the responses
+ for m in klass.methods:
+ m.responses = [klass.methods.byname[r] for r in m.responses]
+ for resp in m.responses:
+ resp.response = True
+ spec.classes.add(klass)
+ spec.post_load()
+ return spec
+
+REPLACE = {" ": "_", "-": "_"}
+KEYWORDS = {"global": "global_",
+ "return": "return_"}
+
+def pythonize(name):
+ name = str(name)
+ for key, val in REPLACE.items():
+ name = name.replace(key, val)
+ try:
+ name = KEYWORDS[name]
+ except KeyError:
+ pass
+ return name
+
+def fill(text, indent, heading = None):
+ sub = indent * " "
+ if heading:
+ init = (indent - 2) * " " + heading + " -- "
+ else:
+ init = sub
+ w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub)
+ return w.fill(" ".join(text.split()))
+
+class Rule(Metadata):
+
+ PRINT = ["text", "implement", "tests"]
+
+ def __init__(self, text, implement, tests, path):
+ self.text = text
+ self.implement = implement
+ self.tests = tests
+ self.path = path
+
+def find_rules(node, rules):
+ if node.name == "rule":
+ rules.append(Rule(node.text, node.get("@implement"),
+ [ch.text for ch in node if ch.name == "test"],
+ node.path()))
+ if node.name == "doc" and node.get("@name") == "rule":
+ tests = []
+ if node.has("@test"):
+ tests.append(node["@test"])
+ rules.append(Rule(node.text, None, tests, node.path()))
+ for child in node:
+ find_rules(child, rules)
+
+def load_rules(specfile):
+ rules = []
+ find_rules(xmlutil.parse(specfile), rules)
+ return rules
+
+def test_summary():
+ template = """
+ <html><head><title>AMQP Tests</title></head>
+ <body>
+ <table width="80%%" align="center">
+ %s
+ </table>
+ </body>
+ </html>
+ """
+ rows = []
+ for rule in load_rules("amqp.org/specs/amqp7.xml"):
+ if rule.tests:
+ tests = ", ".join(rule.tests)
+ else:
+ tests = "&nbsp;"
+ rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>'
+ '<td><b>Implement:</b> %s</td>'
+ '<td><b>Tests:</b> %s</td></tr>' %
+ (rule.path[len("/root/amqp"):], rule.implement, tests))
+ rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text)
+ rows.append('<tr><td colspan="3">&nbsp;</td></tr>')
+
+ print template % "\n".join(rows)
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
new file mode 100644
index 0000000000..ff9ecbee8a
--- /dev/null
+++ b/python/qpid/testlib.py
@@ -0,0 +1,221 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Support library for qpid python tests.
+#
+
+import sys, re, unittest, os, random, logging
+import qpid.client, qpid.spec
+from getopt import getopt, GetoptError
+
+
+def findmodules(root):
+ """Find potential python modules under directory root"""
+ found = []
+ for dirpath, subdirs, files in os.walk(root):
+ modpath = dirpath.replace(os.sep, '.')
+ if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
+ for f in files:
+ match = re.match(r'(.+)\.py$', f)
+ if match and f != '__init__.py':
+ found.append('.'.join([modpath, match.group(1)]))
+ return found
+
+def default(value, default):
+ if (value == None): return default
+ else: return value
+
+class TestRunner:
+ """Runs unit tests.
+
+ Parses command line arguments, provides utility functions for tests,
+ runs the selected test suite.
+ """
+
+ def _die(self, message = None):
+ if message: print message
+ print """
+run-tests [options] [test*]
+The name of a test is package.module.ClassName.testMethod
+Options:
+ -?/-h/--help : this message
+ -s/--spec <spec.xml> : file containing amqp XML spec
+ -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
+ """
+ sys.exit(1)
+
+ def setBroker(self, broker):
+ rex = re.compile(r"""
+ # [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+ match = rex.match(broker)
+ if not match: self._die("'%s' is not a valid broker" % (broker))
+ self.user, self.password, self.host, self.port = match.groups()
+ self.port = int(default(self.port, 5672))
+ self.user = default(self.user, "guest")
+ self.password = default(self.password, "guest")
+
+ def __init__(self):
+ # Defaults
+ self.setBroker("localhost")
+ self.spec = "../specs/amqp-8.0.xml"
+ self.verbose = 1
+ self.ignore = []
+
+ def ignoreFile(self, filename):
+ f = file(filename)
+ for line in f.readlines(): self.ignore.append(line.strip())
+ f.close()
+
+ def _parseargs(self, args):
+ try:
+ opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"])
+ except GetoptError, e:
+ self._die(str(e))
+ for opt, value in opts:
+ if opt in ("-?", "-h", "--help"): self._die()
+ if opt in ("-s", "--spec"): self.spec = value
+ if opt in ("-b", "--broker"): self.setBroker(value)
+ if opt in ("-v", "--verbose"): self.verbose = 2
+ if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
+ if opt in ("-i", "--ignore"): self.ignore.append(value)
+ if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+
+ if len(self.tests) == 0: self.tests=findmodules("tests")
+
+ def testSuite(self):
+ class IgnoringTestSuite(unittest.TestSuite):
+ def addTest(self, test):
+ if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore:
+ return
+ unittest.TestSuite.addTest(self, test)
+
+ # Use our IgnoringTestSuite in the test loader.
+ unittest.TestLoader.suiteClass = IgnoringTestSuite
+ return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
+
+ def run(self, args=sys.argv[1:]):
+ self._parseargs(args)
+ runner = unittest.TextTestRunner(descriptions=False,
+ verbosity=self.verbose)
+ result = runner.run(self.testSuite())
+ if (self.ignore):
+ print "======================================="
+ print "NOTE: the following tests were ignored:"
+ for t in self.ignore: print t
+ print "======================================="
+ return result.wasSuccessful()
+
+ def connect(self, host=None, port=None, spec=None, user=None, password=None):
+ """Connect to the broker, returns a qpid.client.Client"""
+ host = host or self.host
+ port = port or self.port
+ spec = spec or self.spec
+ user = user or self.user
+ password = password or self.password
+ client = qpid.client.Client(host, port, qpid.spec.load(spec))
+ client.start({"LOGIN": user, "PASSWORD": password})
+ return client
+
+
+# Global instance for tests to call connect.
+testrunner = TestRunner()
+
+
+class TestBase(unittest.TestCase):
+ """Base class for Qpid test cases.
+
+ self.client is automatically connected with channel 1 open before
+ the test methods are run.
+
+ Deletes queues and exchanges after. Tests call
+ self.queue_declare(channel, ...) and self.exchange_declare(chanel,
+ ...) which are wrappers for the Channel functions that note
+ resources to clean up later.
+ """
+
+ def setUp(self):
+ self.queues = []
+ self.exchanges = []
+ self.client = self.connect()
+ self.channel = self.client.channel(1)
+ self.channel.channel_open()
+
+ def tearDown(self):
+ # TODO aconway 2006-09-05: Wrong behaviour here, we should
+ # close all open channels (checking for exceptions on the
+ # channesl) then open a channel to clean up qs and exs,
+ # finally close that channel.
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+
+ def connect(self, *args, **keys):
+ """Create a new connction, return the Client object"""
+ return testrunner.connect(*args, **keys)
+
+ def queue_declare(self, channel=None, *args, **keys):
+ channel = channel or self.channel
+ reply = channel.queue_declare(*args, **keys)
+ self.queues.append((channel, reply.queue))
+ return reply
+
+ def exchange_declare(self, channel=None, ticket=0, exchange='',
+ type='', passive=False, durable=False,
+ auto_delete=False, internal=False, nowait=False,
+ arguments={}):
+ channel = channel or self.channel
+ reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
+ # TODO aconway 2006-09-14: Don't add exchange on failure.
+ self.exchanges.append((channel,exchange))
+ return reply
+
+ def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ """
+ Publish a message and consume it, assert it comes back intact.
+
+ queue can be a single queue name or a list of queue names.
+ For a list assert the message appears on all queues.
+ Crude attempt to make unique messages so we can't consume
+ a message not really meant for us.
+ """
+ body = "TestMessage("+str(random.randint(999999, 1000000))+")"
+ self.channel.basic_publish(exchange=exchange,
+ content=qpid.content.Content(body),
+ routing_key=routing_key)
+ if not isinstance(queue, list): queue = [queue]
+ for q in queue:
+ reply = self.channel.basic_consume(queue=q, no_ack=True)
+ msg = self.client.queue(reply.consumer_tag).get(timeout=2)
+ self.assertEqual(body, msg.content.body)
+
+
+ def assertChannelException(self, expectedCode, message):
+ self.assertEqual(message.method.klass.name, "channel")
+ self.assertEqual(message.method.name, "close")
+ self.assertEqual(message.reply_code, expectedCode)
+
+
+ def assertConnectionException(self, expectedCode, message):
+ self.assertEqual(message.method.klass.name, "connection")
+ self.assertEqual(message.method.name, "close")
+ self.assertEqual(message.reply_code, expectedCode)
+
diff --git a/python/qpid/xmlutil.py b/python/qpid/xmlutil.py
new file mode 100644
index 0000000000..8f8a7116f5
--- /dev/null
+++ b/python/qpid/xmlutil.py
@@ -0,0 +1,116 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+XML utilities used by spec.py
+"""
+
+import xml.sax
+from xml.sax.handler import ContentHandler
+
+def parse(file):
+ doc = Node("root")
+ xml.sax.parse(file, Builder(doc))
+ return doc
+
+class Node:
+
+ def __init__(self, name, attrs = None, text = None, parent = None):
+ self.name = name
+ self.attrs = attrs
+ self.text = text
+ self.parent = parent
+ self.children = []
+ if parent != None:
+ parent.children.append(self)
+
+ def get_bool(self, key, default = False):
+ v = self.get(key)
+ if v == None:
+ return default
+ else:
+ return bool(int(v))
+
+ def index(self):
+ if self.parent:
+ return self.parent.children.index(self)
+ else:
+ return 0
+
+ def has(self, key):
+ try:
+ result = self[key]
+ return True
+ except KeyError:
+ return False
+ except IndexError:
+ return False
+
+ def get(self, key, default = None):
+ if self.has(key):
+ return self[key]
+ else:
+ return default
+
+ def __getitem__(self, key):
+ if callable(key):
+ return filter(key, self.children)
+ else:
+ t = key.__class__
+ meth = "__get%s__" % t.__name__
+ if hasattr(self, meth):
+ return getattr(self, meth)(key)
+ else:
+ raise KeyError(key)
+
+ def __getstr__(self, name):
+ if name[:1] == "@":
+ return self.attrs[name[1:]]
+ else:
+ return self[lambda nd: nd.name == name]
+
+ def __getint__(self, index):
+ return self.children[index]
+
+ def __iter__(self):
+ return iter(self.children)
+
+ def path(self):
+ if self.parent == None:
+ return "/%s" % self.name
+ else:
+ return "%s/%s" % (self.parent.path(), self.name)
+
+class Builder(ContentHandler):
+
+ def __init__(self, start = None):
+ self.node = start
+
+ def __setitem__(self, element, type):
+ self.types[element] = type
+
+ def startElement(self, name, attrs):
+ self.node = Node(name, attrs, None, self.node)
+
+ def endElement(self, name):
+ self.node = self.node.parent
+
+ def characters(self, content):
+ if self.node.text == None:
+ self.node.text = content
+ else:
+ self.node.text += content
+
diff --git a/python/rule2test b/python/rule2test
new file mode 100755
index 0000000000..b57ea9e24e
--- /dev/null
+++ b/python/rule2test
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+#
+# Convert rules to tests
+#
+import sys, re, os.path
+from getopt import getopt, GetoptError
+from string import capitalize
+from xml import dom
+from xml.dom.minidom import parse
+
+def camelcase(s):
+ """Convert 'string like this' to 'StringLikeThis'"""
+ return "".join([capitalize(w) for w in re.split(re.compile("\W*"), s)])
+
+def uncapitalize(s): return s[0].lower()+s[1:]
+
+def ancestors(node):
+ "Return iterator of ancestors from top-level element to node"
+ def generator(node):
+ while node and node.parentNode:
+ yield node
+ node = node.parentNode
+ return reversed(list(generator(node)))
+
+def tagAndName(element):
+ nameAttr = element.getAttribute("name");
+ if (nameAttr) : return camelcase(nameAttr) + camelcase(element.tagName)
+ else: return camelcase(element.tagName)
+
+def nodeText(n):
+ """Recursively collect text from all text nodes under n"""
+ if n.nodeType == dom.Node.TEXT_NODE:
+ return n.data
+ if n.childNodes:
+ return reduce(lambda t, c: t + nodeText(c), n.childNodes, "")
+ return ""
+
+def cleanup(docString, level=8):
+ unindent = re.sub("\n[ \t]*", "\n", docString.strip())
+ emptyLines = re.sub("\n\n\n", "\n\n", unindent)
+ indented = re.sub("\n", "\n"+level*" ", emptyLines)
+ return level*" " + indented
+
+def printTest(test, docstring):
+ print "class %s(TestBase):" % test
+ print ' """'
+ print docstring
+ print ' """'
+ print
+ print
+
+def printTests(doc, module):
+ """Returns dictionary { classname : [ (methodname, docstring)* ] * }"""
+ tests = {}
+ rules = doc.getElementsByTagName("rule")
+ for r in rules:
+ path = list(ancestors(r))
+ if module == path[1].getAttribute("name").lower():
+ test = "".join(map(tagAndName, path[2:])) + "Tests"
+ docstring = cleanup(nodeText(r), 4)
+ printTest(test, docstring)
+
+def usage(message=None):
+ if message: print >>sys.stderr, message
+ print >>sys.stderr, """
+rule2test [options] <amqpclass>
+
+Print test classes for each rule for the amqpclass in amqp.xml.
+
+Options:
+ -?/-h/--help : this message
+ -s/--spec <spec.xml> : file containing amqp XML spec
+"""
+ return 1
+
+def main(argv):
+ try: opts, args = getopt(argv[1:], "h?s:", ["help", "spec="])
+ except GetoptError, e: return usage(e)
+ spec = "../specs/amqp.xml" # Default
+ for opt, val in opts:
+ if (opt in ("-h", "-?", "--help")): return usage()
+ if (opt in ("-s", "--spec")): spec = val
+ doc = parse(spec)
+ if len(args) == 0: return usage()
+ printTests(doc, args[0])
+ return 0
+
+if (__name__ == "__main__"): sys.exit(main(sys.argv))
diff --git a/python/run-tests b/python/run-tests
new file mode 100755
index 0000000000..c49bd32a96
--- /dev/null
+++ b/python/run-tests
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from qpid.testlib import testrunner
+
+if not testrunner.run(): sys.exit(1)
+
+
+
diff --git a/python/tests/__init__.py b/python/tests/__init__.py
new file mode 100644
index 0000000000..d55ff3fd85
--- /dev/null
+++ b/python/tests/__init__.py
@@ -0,0 +1 @@
+# Do not delete - marks this directory as a python package.
diff --git a/python/tests/basic.py b/python/tests/basic.py
new file mode 100644
index 0000000000..b912fc40be
--- /dev/null
+++ b/python/tests/basic.py
@@ -0,0 +1,115 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class BasicTests(TestBase):
+ """Tests for 'methods' on the amqp basic 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
+ channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.basic_publish(exchange="amq.direct", routing_key="test-queue-1a", content=Content("consume_no_local"))
+ channel.basic_publish(exchange="amq.direct", routing_key="test-queue-1b", content=Content("consume_no_local"))
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2")
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ channel.basic_consume(queue="invalid-queue")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.basic_consume(queue="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ try:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
diff --git a/python/tests/broker.py b/python/tests/broker.py
new file mode 100644
index 0000000000..1345076604
--- /dev/null
+++ b/python/tests/broker.py
@@ -0,0 +1,84 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class BrokerTests(TestBase):
+ """Tests for basic Broker functionality"""
+
+ def test_amqp_basic_13(self):
+ """
+ First, this test tries to receive a message with a no-ack
+ consumer. Second, this test tries to explicitely receive and
+ acknowledge a message with an acknowledging consumer.
+ """
+ ch = self.channel
+ self.queue_declare(ch, queue = "myqueue")
+
+ # No ack consumer
+ ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag
+ body = "test no-ack"
+ ch.basic_publish(routing_key = "myqueue", content = Content(body))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ self.assert_(msg.content.body == body)
+
+ # Acknowleding consumer
+ self.queue_declare(ch, queue = "otherqueue")
+ ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag
+ body = "test ack"
+ ch.basic_publish(routing_key = "otherqueue", content = Content(body))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ ch.basic_ack(delivery_tag = msg.delivery_tag)
+ self.assert_(msg.content.body == body)
+
+ # TODO: Ensure we get a failure if an ack consumer doesn't ack.
+
+ def test_basic_delivery_immediate(self):
+ """
+ Test basic message delivery where consume is issued before publish
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+
+ body = "Immediate Delivery"
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ # TODO: Ensure we fail if immediate=True and there's no consumer.
+
+
+ def test_basic_delivery_queued(self):
+ """
+ Test basic message delivery where publish is issued before consume
+ (i.e. requires queueing of the message)
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ body = "Queued Delivery"
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body))
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
diff --git a/python/tests/example.py b/python/tests/example.py
new file mode 100644
index 0000000000..47d07fe9c5
--- /dev/null
+++ b/python/tests/example.py
@@ -0,0 +1,91 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExampleTest (TestBase):
+ """
+ An example Qpid test, illustrating the unittest frameowkr and the
+ python Qpid client. The test class must inherit TestCase. The
+ test code uses the Qpid client to interact with a qpid broker and
+ verify it behaves as expected.
+ """
+
+ def test_example(self):
+ """
+ An example test. Note that test functions must start with 'test_'
+ to be recognized by the test framework.
+ """
+
+ # By inheriting TestBase, self.client is automatically connected
+ # and self.channel is automatically opened as channel(1)
+ # Other channel methods mimic the protocol.
+ channel = self.channel
+
+ # Now we can send regular commands. If you want to see what the method
+ # arguments mean or what other commands are available, you can use the
+ # python builtin help() method. For example:
+ #help(chan)
+ #help(chan.exchange_declare)
+
+ # If you want browse the available protocol methods without being
+ # connected to a live server you can use the amqp-doc utility:
+ #
+ # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+ #
+ # Options:
+ # -e, --regexp use regex instead of glob when matching
+
+ # Now that we know what commands are available we can use them to
+ # interact with the server.
+
+ # Here we use ordinal arguments.
+ self.exchange_declare(channel, 0, "test", "direct")
+
+ # Here we use keyword arguments.
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+ # Call Channel.basic_consume to register as a consumer.
+ # All the protocol methods return a message object. The message object
+ # has fields corresponding to the reply method fields, plus a content
+ # field that is filled if the reply includes content. In this case the
+ # interesting field is the consumer_tag.
+ reply = channel.basic_consume(queue="test-queue")
+
+ # We can use the Client.queue(...) method to access the queue
+ # corresponding to our consumer_tag.
+ queue = self.client.queue(reply.consumer_tag)
+
+ # Now lets publish a message and see if our consumer gets it. To do
+ # this we need to import the Content class.
+ body = "Hello World!"
+ channel.basic_publish(exchange="test",
+ routing_key="key",
+ content=Content(body))
+
+ # Now we'll wait for the message to arrive. We can use the timeout
+ # argument in case the server hangs. By default queue.get() will wait
+ # until a message arrives or the connection to the server dies.
+ msg = queue.get(timeout=10)
+
+ # And check that we got the right response with assertEqual
+ self.assertEqual(body, msg.content.body)
+
+ # Now acknowledge the message.
+ channel.basic_ack(msg.delivery_tag, True)
+
diff --git a/python/tests/exchange.py b/python/tests/exchange.py
new file mode 100644
index 0000000000..b9b16bad78
--- /dev/null
+++ b/python/tests/exchange.py
@@ -0,0 +1,234 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import logging, Queue
+from qpid.testlib import TestBase
+from qpid.content import Content
+
+
+# TODO aconway 2006-09-01: Investigate and add tests as appropriate.
+# Observered on C++:
+#
+# No exception raised for basic_consume on non-existent queue name.
+# No exception for basic_publish with bad routing key.
+# No exception for binding to non-existent exchange?
+# queue_bind hangs with invalid exchange name
+#
+# Do server exceptions get propagated properly?
+# Do Java exceptions propagate with any data (or just Closed())
+
+class StandardExchangeVerifier:
+ """Verifies standard exchange behavior.
+
+ Used as base class for classes that test standard exchanges."""
+
+ def verifyDirectExchange(self, ex):
+ """Verify that ex behaves like a direct exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+ try:
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+ self.fail("Expected Empty exception")
+ except Queue.Empty: None # Expected
+
+ def verifyFanOutExchange(self, ex):
+ """Verify that ex behaves like a fanout exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex)
+ self.queue_declare(queue="p")
+ self.channel.queue_bind(queue="p", exchange=ex)
+ self.assertPublishConsume(exchange=ex, queue=["q","p"])
+
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server SHOULD implement these standard exchange types: topic, headers.
+
+ Client attempts to declare an exchange with each of these standard types.
+ """
+
+ def testDirect(self):
+ """Declare and test a direct exchange"""
+ self.exchange_declare(0, exchange="d", type="direct")
+ self.verifyDirectExchange("d")
+
+ def testFanout(self):
+ """Declare and test a fanout exchange"""
+ self.exchange_declare(0, exchange="f", type="fanout")
+ self.verifyFanOutExchange("f")
+
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST, in each virtual host, pre-declare an exchange instance
+ for each standard exchange type that it implements, where the name of the
+ exchange instance is amq. followed by the exchange type name.
+
+ Client creates a temporary queue and attempts to bind to each required
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
+ those types are defined).
+ """
+ # TODO aconway 2006-09-01: Add tests for 3.1.3.1:
+ # - Test auto binding by q name
+ # - Test the nameless "default publish" exchange.
+ # - Auto created amq.fanout exchange
+
+ def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+ def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+ def testAmqTopic(self):
+ self.exchange_declare(0, exchange="amq.topic", passive="true")
+ # TODO aconway 2006-09-14: verify topic behavior
+
+ def testAmqHeaders(self):
+ self.exchange_declare(0, exchange="amq.headers", passive="true")
+ # TODO aconway 2006-09-14: verify headers behavior
+
+class DefaultExchangeRuleTests(TestBase):
+ """
+ The server MUST predeclare a direct exchange to act as the default exchange
+ for content Publish methods and for default queue bindings.
+
+ Client checks that the default exchange is active by specifying a queue
+ binding with no exchange name, and publishing a message with a suitable
+ routing key but without specifying the exchange name, then ensuring that
+ the message arrives in the queue correctly.
+ """
+
+
+class DefaultAccessRuleTests(TestBase):
+ """
+ The server MUST NOT allow clients to access the default exchange except
+ by specifying an empty exchange name in the Queue.Bind and content Publish
+ methods.
+ """
+
+
+class ExtensionsRuleTests(TestBase):
+ """
+ The server MAY implement other exchange types as wanted.
+ """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+ """
+ The server SHOULD support a minimum of 16 exchanges per virtual host and
+ ideally, impose no limit except as defined by available resources.
+
+ The client creates as many exchanges as it can until the server reports
+ an error; the number of exchanges successfuly created must be at least
+ sixteen.
+ """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access to
+ the realm in which the exchange exists or will be created, or "passive"
+ access if the if-exists flag is set.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+ """
+ Exchange names starting with "amq." are reserved for predeclared and
+ standardised exchanges. The client MUST NOT attempt to create an exchange
+ starting with "amq.".
+
+
+ """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+ """
+ Exchanges cannot be redeclared with different types. The client MUST not
+ attempt to redeclare an existing exchange with a different type than used
+ in the original Exchange.Declare method.
+
+
+ """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to create an exchange with a type that the
+ server does not support.
+
+
+ """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+ """
+ If set, and the exchange does not already exist, the server MUST raise a
+ channel exception with reply code 404 (not found).
+
+
+ """
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+ """
+ The server MUST support both durable and transient exchanges.
+
+
+ """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the durable field if the exchange already exists.
+
+
+ """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the auto-delete field if the exchange already
+ exists.
+
+
+ """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access
+ rights to the exchange's access realm.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to delete an exchange that does not exist.
+ """
+
+
diff --git a/python/tests/queue.py b/python/tests/queue.py
new file mode 100644
index 0000000000..92260a7d64
--- /dev/null
+++ b/python/tests/queue.py
@@ -0,0 +1,254 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueueTests(TestBase):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_purge(self):
+ """
+ Test that the purge method removes messages from the queue
+ """
+ channel = self.channel
+ #setup, declare a queue and add some messages to it:
+ channel.exchange_declare(exchange="test-exchange", type="direct")
+ channel.queue_declare(queue="test-queue", exclusive=True)
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one"))
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two"))
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three"))
+
+ #check that the queue now reports 3 messages:
+ reply = channel.queue_declare(queue="test-queue")
+ self.assertEqual(3, reply.message_count)
+
+ #now do the purge, then test that three messages are purged and the count drops to 0
+ reply = channel.queue_purge(queue="test-queue");
+ self.assertEqual(3, reply.message_count)
+ reply = channel.queue_declare(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
+
+ #send a further message and consume it, ensuring that the other messages are really gone
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four"))
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEqual("four", msg.content.body)
+
+ #check error conditions (use new channels):
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue specified but doesn't exist:
+ channel.queue_purge(queue="invalid-queue")
+ self.fail("Expected failure when purging non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(3)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.queue_purge()
+ self.fail("Expected failure when purging unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ #cleanup
+ channel = self.client.channel(4)
+ channel.channel_open()
+ channel.exchange_delete(exchange="test-exchange")
+
+ def test_declare_exclusive(self):
+ """
+ Test that the exclusive field is honoured in queue.declare
+ """
+ # TestBase.setUp has already opened channel(1)
+ c1 = self.channel
+ # Here we open a second separate connection:
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.channel_open()
+
+ #declare an exclusive queue:
+ c1.queue_declare(queue="exclusive-queue", exclusive="True")
+ try:
+ #other connection should not be allowed to declare this:
+ c2.queue_declare(queue="exclusive-queue", exclusive="True")
+ self.fail("Expected second exclusive queue_declare to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(405, e.args[0])
+
+
+ def test_declare_passive(self):
+ """
+ Test that the passive field is honoured in queue.declare
+ """
+ channel = self.channel
+ #declare an exclusive queue:
+ channel.queue_declare(queue="passive-queue-1", exclusive="True")
+ channel.queue_declare(queue="passive-queue-1", passive="True")
+ try:
+ #other connection should not be allowed to declare this:
+ channel.queue_declare(queue="passive-queue-2", passive="True")
+ self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_bind(self):
+ """
+ Test various permutations of the queue.bind method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-1", exclusive="True")
+
+ #straightforward case, both exchange & queue exist so no errors expected:
+ channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+
+ #bind the default queue for the channel (i.e. last one declared):
+ channel.queue_bind(exchange="amq.direct", routing_key="key2")
+
+ #use the queue name where neither routing key nor queue are specified:
+ channel.queue_bind(exchange="amq.direct")
+
+ #try and bind to non-existant exchange
+ try:
+ channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+ self.fail("Expected bind to non-existant exchange to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #need to reopen a channel:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #try and bind non-existant queue:
+ try:
+ channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+ self.fail("Expected bind of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_delete_simple(self):
+ """
+ Test basic queue deletion
+ """
+ channel = self.client.channel(1)
+ channel.channel_open()
+
+ #straight-forward case:
+ channel.queue_declare(queue="delete-me")
+ channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("a"))
+ channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("b"))
+ channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("c"))
+ reply = channel.queue_delete(queue="delete-me")
+ self.assertEqual(3, reply.message_count)
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="delete-me", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #check attempted deletion of non-existant queue is handled correctly:
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ channel.queue_delete(queue="i-dont-exist", if_empty="True")
+ self.fail("Expected delete of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+
+ def test_delete_ifempty(self):
+ """
+ Test that if_empty field of queue_delete is honoured
+ """
+ channel = self.client.channel(1)
+ channel.channel_open()
+
+ #create a queue and add a message to it (use default binding):
+ channel.queue_declare(queue="delete-me-2")
+ channel.queue_declare(queue="delete-me-2", passive="True")
+ channel.basic_publish(exchange="amq.direct", routing_key="delete-me-2", content=Content("message"))
+
+ #try to delete, but only if empty:
+ try:
+ channel.queue_delete(queue="delete-me-2", if_empty="True")
+ self.fail("Expected delete if_empty to fail for non-empty queue")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+ #need new channel now:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #empty queue:
+ reply = channel.basic_consume(queue="delete-me-2", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEqual("message", msg.content.body)
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+ #retry deletion on empty queue:
+ channel.queue_delete(queue="delete-me-2", if_empty="True")
+
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-2", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ def test_delete_ifunused(self):
+ """
+ Test that if_unused field of queue_delete is honoured
+ """
+ channel = self.client.channel(1)
+ channel.channel_open()
+
+ #create a queue and register a consumer:
+ channel.queue_declare(queue="delete-me-3")
+ channel.queue_declare(queue="delete-me-3", passive="True")
+ reply = channel.basic_consume(queue="delete-me-3", no_ack=True)
+
+ #need new channel now:
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ #try to delete, but only if empty:
+ try:
+ channel2.queue_delete(queue="delete-me-3", if_unused="True")
+ self.fail("Expected delete if_unused to fail for queue with existing consumer")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+ channel.queue_delete(queue="delete-me-3", if_unused="True")
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-3", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+