summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/tests')
-rw-r--r--qpid/python/qpid/tests/__init__.py60
-rw-r--r--qpid/python/qpid/tests/codec.py601
-rw-r--r--qpid/python/qpid/tests/codec010.py133
-rw-r--r--qpid/python/qpid/tests/connection.py227
-rw-r--r--qpid/python/qpid/tests/datatypes.py296
-rw-r--r--qpid/python/qpid/tests/framing.py289
-rw-r--r--qpid/python/qpid/tests/messaging/__init__.py185
-rw-r--r--qpid/python/qpid/tests/messaging/address.py321
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py1335
-rw-r--r--qpid/python/qpid/tests/messaging/message.py155
-rw-r--r--qpid/python/qpid/tests/mimetype.py56
-rw-r--r--qpid/python/qpid/tests/parser.py37
-rw-r--r--qpid/python/qpid/tests/queue.py71
-rw-r--r--qpid/python/qpid/tests/spec010.py74
14 files changed, 3840 insertions, 0 deletions
diff --git a/qpid/python/qpid/tests/__init__.py b/qpid/python/qpid/tests/__init__.py
new file mode 100644
index 0000000000..101a0c3759
--- /dev/null
+++ b/qpid/python/qpid/tests/__init__.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Test:
+
+ def __init__(self, name):
+ self.name = name
+
+ def configure(self, config):
+ self.config = config
+
+# API Tests
+import qpid.tests.framing
+import qpid.tests.mimetype
+import qpid.tests.messaging
+
+# Legacy Tests
+import qpid.tests.codec
+import qpid.tests.queue
+import qpid.tests.datatypes
+import qpid.tests.connection
+import qpid.tests.spec010
+import qpid.tests.codec010
+
+class TestTestsXXX(Test):
+
+ def testFoo(self):
+ print "this test has output"
+
+ def testBar(self):
+ print "this test "*8
+ print "has"*10
+ print "a"*75
+ print "lot of"*10
+ print "output"*10
+
+ def testQux(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+
+ def testQuxFail(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+ fdsa
diff --git a/qpid/python/qpid/tests/codec.py b/qpid/python/qpid/tests/codec.py
new file mode 100644
index 0000000000..8fd0528636
--- /dev/null
+++ b/qpid/python/qpid/tests/codec.py
@@ -0,0 +1,601 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from qpid.codec import Codec
+from qpid.spec08 import load
+from cStringIO import StringIO
+from qpid.reference import ReferenceId
+
+__doc__ = """
+
+ This is a unit test script for qpid/codec.py
+
+ It can be run standalone or as part of the existing test framework.
+
+ To run standalone:
+ -------------------
+
+ Place in the qpid/python/tests/ directory and type...
+
+ python codec.py
+
+ A brief output will be printed on screen. The verbose output will be placed inn a file called
+ codec_unit_test_output.txt. [TODO: make this filename configurable]
+
+ To run as part of the existing test framework:
+ -----------------------------------------------
+
+ python run-tests tests.codec
+
+ Change History:
+ -----------------
+ Jimmy John 05/19/2007 Initial draft
+ Jimmy John 05/22/2007 Implemented comments by Rafael Schloming
+
+
+"""
+
+from qpid.specs_config import amqp_spec_0_8
+SPEC = load(amqp_spec_0_8)
+
+# --------------------------------------
+# --------------------------------------
+class BaseDataTypes(unittest.TestCase):
+
+
+ """
+ Base class containing common functions
+ """
+
+ # ---------------
+ def setUp(self):
+ """
+ standard setUp for unitetest (refer unittest documentation for details)
+ """
+ self.codec = Codec(StringIO(), SPEC)
+
+ # ------------------
+ def tearDown(self):
+ """
+ standard tearDown for unitetest (refer unittest documentation for details)
+ """
+ self.codec.stream.flush()
+ self.codec.stream.close()
+
+ # ----------------------------------------
+ def callFunc(self, functionName, *args):
+ """
+ helper function - given a function name and arguments, calls the function with the args and
+ returns the contents of the stream
+ """
+ getattr(self.codec, functionName)(args[0])
+ return self.codec.stream.getvalue()
+
+ # ----------------------------------------
+ def readFunc(self, functionName, *args):
+ """
+ helper function - creates a input stream and then calls the function with arguments as have been
+ supplied
+ """
+ self.codec.stream = StringIO(args[0])
+ return getattr(self.codec, functionName)()
+
+
+# ----------------------------------------
+# ----------------------------------------
+class IntegerTestCase(BaseDataTypes):
+
+ """
+ Handles octet, short, long, long long
+
+ """
+
+ # -------------------------
+ def __init__(self, *args):
+ """
+ sets constants for use in tests
+ """
+
+ BaseDataTypes.__init__(self, *args)
+ self.const_integer = 2
+ self.const_integer_octet_encoded = '\x02'
+ self.const_integer_short_encoded = '\x00\x02'
+ self.const_integer_long_encoded = '\x00\x00\x00\x02'
+ self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02'
+
+ # -------------------------- #
+ # Unsigned Octect - 8 bits #
+ # -------------------------- #
+
+ # --------------------------
+ def test_unsigned_octet(self):
+ """
+ ubyte format requires 0<=number<=255
+ """
+ self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...')
+
+ # -------------------------------------------
+ def test_octet_out_of_upper_range(self):
+ """
+ testing for input above acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_octet, 256)
+
+ # -------------------------------------------
+ def test_uoctet_out_of_lower_range(self):
+ """
+ testing for input below acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_octet, -1)
+
+ # ---------------------------------
+ def test_uoctet_with_fraction(self):
+ """
+ the fractional part should be ignored...
+ """
+ self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...')
+
+ # ------------------------------------
+ def test_unsigned_octet_decode(self):
+ """
+ octet decoding
+ """
+ self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...')
+
+ # ----------------------------------- #
+ # Unsigned Short Integers - 16 bits #
+ # ----------------------------------- #
+
+ # -----------------------
+ def test_ushort_int(self):
+ """
+ testing unsigned short integer
+ """
+ self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...')
+
+ # -------------------------------------------
+ def test_ushort_int_out_of_upper_range(self):
+ """
+ testing for input above acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_short, 65536)
+
+ # -------------------------------------------
+ def test_ushort_int_out_of_lower_range(self):
+ """
+ testing for input below acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_short, -1)
+
+ # ---------------------------------
+ def test_ushort_int_with_fraction(self):
+ """
+ the fractional part should be ignored...
+ """
+ self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...')
+
+ # ------------------------------------
+ def test_ushort_int_decode(self):
+ """
+ unsigned short decoding
+ """
+ self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...')
+
+
+ # ---------------------------------- #
+ # Unsigned Long Integers - 32 bits #
+ # ---------------------------------- #
+
+ # -----------------------
+ def test_ulong_int(self):
+ """
+ testing unsigned long iteger
+ """
+ self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...')
+
+ # -------------------------------------------
+ def test_ulong_int_out_of_upper_range(self):
+ """
+ testing for input above acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296)
+
+ # -------------------------------------------
+ def test_ulong_int_out_of_lower_range(self):
+ """
+ testing for input below acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_long, -1)
+
+ # ---------------------------------
+ def test_ulong_int_with_fraction(self):
+ """
+ the fractional part should be ignored...
+ """
+ self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...')
+
+ # -------------------------------
+ def test_ulong_int_decode(self):
+ """
+ unsigned long decoding
+ """
+ self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...')
+
+
+ # --------------------------------------- #
+ # Unsigned Long Long Integers - 64 bits #
+ # --------------------------------------- #
+
+ # -----------------------
+ def test_ulong_long_int(self):
+ """
+ testing unsinged long long integer
+ """
+ self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...')
+
+ # -------------------------------------------
+ def test_ulong_long_int_out_of_upper_range(self):
+ """
+ testing for input above acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616)
+
+ # -------------------------------------------
+ def test_ulong_long_int_out_of_lower_range(self):
+ """
+ testing for input below acceptable range
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_longlong, -1)
+
+ # ---------------------------------
+ def test_ulong_long_int_with_fraction(self):
+ """
+ the fractional part should be ignored...
+ """
+ self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...')
+
+ # ------------------------------------
+ def test_ulong_long_int_decode(self):
+ """
+ unsigned long long decoding
+ """
+ self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...')
+
+# -----------------------------------
+# -----------------------------------
+class BitTestCase(BaseDataTypes):
+
+ """
+ Handles bits
+ """
+
+ # ----------------------------------------------
+ def callFunc(self, functionName, *args):
+ """
+ helper function
+ """
+ for ele in args:
+ getattr(self.codec, functionName)(ele)
+
+ self.codec.flush()
+ return self.codec.stream.getvalue()
+
+ # -------------------
+ def test_bit1(self):
+ """
+ sends in 11
+ """
+ self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...')
+
+ # -------------------
+ def test_bit2(self):
+ """
+ sends in 10011
+ """
+ self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...')
+
+ # -------------------
+ def test_bit3(self):
+ """
+ sends in 1110100111 [10 bits(right to left), should be compressed into two octets]
+ """
+ self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...')
+
+ # ------------------------------------
+ def test_bit_decode_1(self):
+ """
+ decode bit 1
+ """
+ self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...')
+
+ # ------------------------------------
+ def test_bit_decode_0(self):
+ """
+ decode bit 0
+ """
+ self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...')
+
+# -----------------------------------
+# -----------------------------------
+class StringTestCase(BaseDataTypes):
+
+ """
+ Handles short strings, long strings
+ """
+
+ # ------------------------------------------------------------- #
+ # Short Strings - 8 bit length followed by zero or more octets #
+ # ------------------------------------------------------------- #
+
+ # ---------------------------------------
+ def test_short_string_zero_length(self):
+ """
+ 0 length short string
+ """
+ self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...')
+
+ # -------------------------------------------
+ def test_short_string_positive_length(self):
+ """
+ positive length short string
+ """
+ self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...')
+
+ # -------------------------------------------
+ def test_short_string_out_of_upper_range(self):
+ """
+ string length > 255
+ """
+ self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256)
+
+ # ------------------------------------
+ def test_short_string_decode(self):
+ """
+ short string decode
+ """
+ self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...')
+
+
+ # ------------------------------------------------------------- #
+ # Long Strings - 32 bit length followed by zero or more octets #
+ # ------------------------------------------------------------- #
+
+ # ---------------------------------------
+ def test_long_string_zero_length(self):
+ """
+ 0 length long string
+ """
+ self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...')
+
+ # -------------------------------------------
+ def test_long_string_positive_length(self):
+ """
+ positive length long string
+ """
+ self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...')
+
+ # ------------------------------------
+ def test_long_string_decode(self):
+ """
+ long string decode
+ """
+ self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...')
+
+
+# --------------------------------------
+# --------------------------------------
+class TimestampTestCase(BaseDataTypes):
+
+ """
+ No need of any test cases here as timestamps are implemented as long long which is tested above
+ """
+ pass
+
+# ---------------------------------------
+# ---------------------------------------
+class FieldTableTestCase(BaseDataTypes):
+
+ """
+ Handles Field Tables
+
+ Only S/I type messages seem to be implemented currently
+ """
+
+ # -------------------------
+ def __init__(self, *args):
+ """
+ sets constants for use in tests
+ """
+
+ BaseDataTypes.__init__(self, *args)
+ self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'}
+ self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1'
+
+ # -------------------------------------------
+ def test_field_table_name_value_pair(self):
+ """
+ valid name value pair
+ """
+ self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...')
+
+ # ---------------------------------------------------
+ def test_field_table_multiple_name_value_pair(self):
+ """
+ multiple name value pair
+ """
+ self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...')
+
+ # ------------------------------------
+ def test_field_table_decode(self):
+ """
+ field table decode
+ """
+ self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...')
+
+
+# ------------------------------------
+# ------------------------------------
+class ContentTestCase(BaseDataTypes):
+
+ """
+ Handles Content data types
+ """
+
+ # -----------------------------
+ def test_content_inline(self):
+ """
+ inline content
+ """
+ self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...')
+
+ # --------------------------------
+ def test_content_reference(self):
+ """
+ reference content
+ """
+ self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...')
+
+ # ------------------------------------
+ def test_content_inline_decode(self):
+ """
+ inline content decode
+ """
+ self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...')
+
+ # ------------------------------------
+ def test_content_reference_decode(self):
+ """
+ reference content decode
+ """
+ self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...')
+
+# ------------------------ #
+# Pre - existing test code #
+# ------------------------ #
+
+# ---------------------
+def test(type, value):
+ """
+ old test function cut/copy/paste from qpid/codec.py
+ """
+ if isinstance(value, (list, tuple)):
+ values = value
+ else:
+ values = [value]
+ stream = StringIO()
+ codec = Codec(stream, SPEC)
+ for v in values:
+ codec.encode(type, v)
+ codec.flush()
+ enc = stream.getvalue()
+ stream.reset()
+ dup = []
+ for i in xrange(len(values)):
+ dup.append(codec.decode(type))
+ if values != dup:
+ raise AssertionError("%r --> %r --> %r" % (values, enc, dup))
+
+# -----------------------
+def dotest(type, value):
+ """
+ old test function cut/copy/paste from qpid/codec.py
+ """
+ args = (type, value)
+ test(*args)
+
+# -------------
+def oldtests():
+ """
+ old test function cut/copy/paste from qpid/codec.py
+ """
+ for value in ("1", "0", "110", "011", "11001", "10101", "10011"):
+ for i in range(10):
+ dotest("bit", map(lambda x: x == "1", value*i))
+
+ for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}):
+ dotest("table", value)
+
+ for type in ("octet", "short", "long", "longlong"):
+ for value in range(0, 256):
+ dotest(type, value)
+
+ for type in ("shortstr", "longstr"):
+ for value in ("", "a", "asdf"):
+ dotest(type, value)
+
+# -----------------------------------------
+class oldTests(unittest.TestCase):
+
+ """
+ class to handle pre-existing test cases
+ """
+
+ # ---------------------------
+ def test_oldtestcases(self):
+ """
+ call the old tests
+ """
+ return oldtests()
+
+# ---------------------------
+# ---------------------------
+if __name__ == '__main__':
+
+ codec_test_suite = unittest.TestSuite()
+
+ #adding all the test suites...
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase))
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase))
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase))
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase))
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase))
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase))
+
+ #loading pre-existing test case from qpid/codec.py
+ codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests))
+
+ run_output_stream = StringIO()
+ test_runner = unittest.TextTestRunner(run_output_stream, '', '')
+ test_result = test_runner.run(codec_test_suite)
+
+ print '\n%d test run...' % (test_result.testsRun)
+
+ if test_result.wasSuccessful():
+ print '\nAll tests successful\n'
+
+ if test_result.failures:
+ print '\n----------'
+ print '%d FAILURES:' % (len(test_result.failures))
+ print '----------\n'
+ for failure in test_result.failures:
+ print str(failure[0]) + ' ... FAIL'
+
+ if test_result.errors:
+ print '\n---------'
+ print '%d ERRORS:' % (len(test_result.errors))
+ print '---------\n'
+
+ for error in test_result.errors:
+ print str(error[0]) + ' ... ERROR'
+
+ f = open('codec_unit_test_output.txt', 'w')
+ f.write(str(run_output_stream.getvalue()))
+ f.close()
diff --git a/qpid/python/qpid/tests/codec010.py b/qpid/python/qpid/tests/codec010.py
new file mode 100644
index 0000000000..787ebc146f
--- /dev/null
+++ b/qpid/python/qpid/tests/codec010.py
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+
+from unittest import TestCase
+from qpid.codec010 import StringCodec
+from qpid.datatypes import timestamp, uuid4
+from qpid.ops import PRIMITIVE
+
+class CodecTest(TestCase):
+
+ def check(self, type, value, compare=True):
+ t = PRIMITIVE[type]
+ sc = StringCodec()
+ sc.write_primitive(t, value)
+ decoded = sc.read_primitive(t)
+ if compare:
+ assert decoded == value, "%s, %s" % (decoded, value)
+ return decoded
+
+ def testMapString(self):
+ self.check("map", {"string": "this is a test"})
+
+ def testMapUnicode(self):
+ self.check("map", {"unicode": u"this is a unicode test"})
+
+ def testMapBinary(self):
+ self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"})
+
+ def testMapBuffer(self):
+ s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"
+ dec = self.check("map", {"buffer": buffer(s)}, False)
+ assert dec["buffer"] == s
+
+ def testMapInt(self):
+ self.check("map", {"int": 3})
+
+ def testMapLong(self):
+ self.check("map", {"long": 2**32})
+ self.check("map", {"long": 1 << 34})
+ self.check("map", {"long": -(1 << 34)})
+
+ def testMapTimestamp(self):
+ decoded = self.check("map", {"timestamp": timestamp(0)})
+ assert isinstance(decoded["timestamp"], timestamp)
+
+ def testMapDatetime(self):
+ decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False)
+ assert isinstance(decoded["datetime"], timestamp)
+ assert decoded["datetime"] == 0.0
+
+ def testMapNone(self):
+ self.check("map", {"none": None})
+
+ def testMapNested(self):
+ self.check("map", {"map": {"string": "nested test"}})
+
+ def testMapList(self):
+ self.check("map", {"list": [1, "two", 3.0, -4]})
+
+ def testMapUUID(self):
+ self.check("map", {"uuid": uuid4()})
+
+ def testMapAll(self):
+ decoded = self.check("map", {"string": "this is a test",
+ "unicode": u"this is a unicode test",
+ "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5",
+ "int": 3,
+ "long": 2**32,
+ "timestamp": timestamp(0),
+ "none": None,
+ "map": {"string": "nested map"},
+ "list": [1, "two", 3.0, -4],
+ "uuid": uuid4()})
+ assert isinstance(decoded["timestamp"], timestamp)
+
+ def testMapEmpty(self):
+ self.check("map", {})
+
+ def testMapNone(self):
+ self.check("map", None)
+
+ def testList(self):
+ self.check("list", [1, "two", 3.0, -4])
+
+ def testListEmpty(self):
+ self.check("list", [])
+
+ def testListNone(self):
+ self.check("list", None)
+
+ def testArrayInt(self):
+ self.check("array", [1, 2, 3, 4])
+
+ def testArrayString(self):
+ self.check("array", ["one", "two", "three", "four"])
+
+ def testArrayEmpty(self):
+ self.check("array", [])
+
+ def testArrayNone(self):
+ self.check("array", None)
+
+ def testInt16(self):
+ self.check("int16", 3)
+ self.check("int16", -3)
+
+ def testInt64(self):
+ self.check("int64", 3)
+ self.check("int64", -3)
+ self.check("int64", 1<<34)
+ self.check("int64", -(1<<34))
+
+ def testDatetime(self):
+ self.check("datetime", timestamp(0))
+ self.check("datetime", timestamp(long(time.time())))
diff --git a/qpid/python/qpid/tests/connection.py b/qpid/python/qpid/tests/connection.py
new file mode 100644
index 0000000000..6847285f69
--- /dev/null
+++ b/qpid/python/qpid/tests/connection.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from threading import *
+from unittest import TestCase
+from qpid.util import connect, listen
+from qpid.connection import *
+from qpid.datatypes import Message
+from qpid.delegates import Server
+from qpid.queue import Queue
+from qpid.session import Delegate
+from qpid.ops import QueueQueryResult
+
+PORT = 1234
+
+class TestServer:
+
+ def __init__(self, queue):
+ self.queue = queue
+
+ def connection(self, connection):
+ return Server(connection, delegate=self.session)
+
+ def session(self, session):
+ session.auto_sync = False
+ return TestSession(session, self.queue)
+
+class TestSession(Delegate):
+
+ def __init__(self, session, queue):
+ self.session = session
+ self.queue = queue
+
+ def execution_sync(self, es):
+ pass
+
+ def queue_query(self, qq):
+ return QueueQueryResult(qq.queue)
+
+ def message_transfer(self, cmd):
+ if cmd.destination == "echo":
+ m = Message(cmd.payload)
+ m.headers = cmd.headers
+ self.session.message_transfer(cmd.destination, cmd.accept_mode,
+ cmd.acquire_mode, m)
+ elif cmd.destination == "abort":
+ self.session.channel.connection.sock.close()
+ elif cmd.destination == "heartbeat":
+ self.session.channel.connection_heartbeat()
+ else:
+ self.queue.put(cmd)
+
+class ConnectionTest(TestCase):
+
+ def setUp(self):
+ self.queue = Queue()
+ self.running = True
+ started = Event()
+
+ def run():
+ ts = TestServer(self.queue)
+ for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
+ conn = Connection(s, delegate=ts.connection)
+ try:
+ conn.start(5)
+ except Closed:
+ pass
+
+ self.server = Thread(target=run)
+ self.server.setDaemon(True)
+ self.server.start()
+
+ started.wait(3)
+ assert started.isSet()
+
+ def tearDown(self):
+ self.running = False
+ connect("127.0.0.1", PORT).close()
+ self.server.join(3)
+
+ def connect(self, **kwargs):
+ return Connection(connect("127.0.0.1", PORT), **kwargs)
+
+ def test(self):
+ c = self.connect()
+ c.start(10)
+
+ ssn1 = c.session("test1", timeout=10)
+ ssn2 = c.session("test2", timeout=10)
+
+ assert ssn1 == c.sessions["test1"]
+ assert ssn2 == c.sessions["test2"]
+ assert ssn1.channel != None
+ assert ssn2.channel != None
+ assert ssn1 in c.attached.values()
+ assert ssn2 in c.attached.values()
+
+ ssn1.close(5)
+
+ assert ssn1.channel == None
+ assert ssn1 not in c.attached.values()
+ assert ssn2 in c.sessions.values()
+
+ ssn2.close(5)
+
+ assert ssn2.channel == None
+ assert ssn2 not in c.attached.values()
+ assert ssn2 not in c.sessions.values()
+
+ ssn = c.session("session", timeout=10)
+
+ assert ssn.channel != None
+ assert ssn in c.sessions.values()
+
+ destinations = ("one", "two", "three")
+
+ for d in destinations:
+ ssn.message_transfer(d)
+
+ for d in destinations:
+ cmd = self.queue.get(10)
+ assert cmd.destination == d
+ assert cmd.headers == None
+ assert cmd.payload == None
+
+ msg = Message("this is a test")
+ ssn.message_transfer("four", message=msg)
+ cmd = self.queue.get(10)
+ assert cmd.destination == "four"
+ assert cmd.headers == None
+ assert cmd.payload == msg.body
+
+ qq = ssn.queue_query("asdf")
+ assert qq.queue == "asdf"
+ c.close(5)
+
+ def testCloseGet(self):
+ c = self.connect()
+ c.start(10)
+ ssn = c.session("test", timeout=10)
+ echos = ssn.incoming("echo")
+
+ for i in range(10):
+ ssn.message_transfer("echo", message=Message("test%d" % i))
+
+ ssn.auto_sync=False
+ ssn.message_transfer("abort")
+
+ for i in range(10):
+ m = echos.get(timeout=10)
+ assert m.body == "test%d" % i
+
+ try:
+ m = echos.get(timeout=10)
+ assert False
+ except Closed, e:
+ pass
+
+ def testCloseListen(self):
+ c = self.connect()
+ c.start(10)
+ ssn = c.session("test", timeout=10)
+ echos = ssn.incoming("echo")
+
+ messages = []
+ exceptions = []
+ condition = Condition()
+ def listener(m): messages.append(m)
+ def exc_listener(e):
+ condition.acquire()
+ exceptions.append(e)
+ condition.notify()
+ condition.release()
+
+ echos.listen(listener, exc_listener)
+
+ for i in range(10):
+ ssn.message_transfer("echo", message=Message("test%d" % i))
+
+ ssn.auto_sync=False
+ ssn.message_transfer("abort")
+
+ condition.acquire()
+ start = time.time()
+ elapsed = 0
+ while not exceptions and elapsed < 10:
+ condition.wait(10 - elapsed)
+ elapsed = time.time() - start
+ condition.release()
+
+ for i in range(10):
+ m = messages.pop(0)
+ assert m.body == "test%d" % i
+
+ assert len(exceptions) == 1
+
+ def testSync(self):
+ c = self.connect()
+ c.start(10)
+ s = c.session("test")
+ s.auto_sync = False
+ s.message_transfer("echo", message=Message("test"))
+ s.sync(10)
+
+ def testHeartbeat(self):
+ c = self.connect(heartbeat=10)
+ c.start(10)
+ s = c.session("test")
+ s.channel.connection_heartbeat()
+ s.message_transfer("heartbeat")
diff --git a/qpid/python/qpid/tests/datatypes.py b/qpid/python/qpid/tests/datatypes.py
new file mode 100644
index 0000000000..00e649d6cf
--- /dev/null
+++ b/qpid/python/qpid/tests/datatypes.py
@@ -0,0 +1,296 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase
+from qpid.datatypes import *
+from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties
+
+class SerialTest(TestCase):
+
+ def test(self):
+ for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)):
+ assert s + 1 > s
+ assert s - 1 < s
+ assert s < s + 1
+ assert s > s - 1
+
+ assert serial(0xFFFFFFFFL) + 1 == serial(0)
+
+ assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL)
+ assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0)
+
+ def testIncr(self):
+ s = serial(0)
+ s += 1
+ assert s == serial(1)
+
+ def testIn(self):
+ l = [serial(1), serial(2), serial(3), serial(4)]
+ assert serial(1) in l
+ assert serial(0xFFFFFFFFL + 2) in l
+ assert 4 in l
+
+ def testNone(self):
+ assert serial(0) != None
+
+ def testHash(self):
+ d = {}
+ d[serial(0)] = "zero"
+ assert d[0] == "zero"
+
+ def testAdd(self):
+ assert serial(2) + 2 == serial(4)
+ assert serial(2) + 2 == 4
+
+ def testSub(self):
+ delta = serial(4) - serial(2)
+ assert isinstance(delta, int) or isinstance(delta, long)
+ assert delta == 2
+
+ delta = serial(4) - 2
+ assert isinstance(delta, Serial)
+ assert delta == serial(2)
+
+class RangedSetTest(TestCase):
+
+ def check(self, ranges):
+ posts = []
+ for range in ranges:
+ posts.append(range.lower)
+ posts.append(range.upper)
+
+ sorted = posts[:]
+ sorted.sort()
+
+ assert posts == sorted
+
+ idx = 1
+ while idx + 1 < len(posts):
+ assert posts[idx] + 1 != posts[idx+1]
+ idx += 2
+
+ def test(self):
+ rs = RangedSet()
+
+ self.check(rs.ranges)
+
+ rs.add(1)
+
+ assert 1 in rs
+ assert 2 not in rs
+ assert 0 not in rs
+ self.check(rs.ranges)
+
+ rs.add(2)
+
+ assert 0 not in rs
+ assert 1 in rs
+ assert 2 in rs
+ assert 3 not in rs
+ self.check(rs.ranges)
+
+ rs.add(0)
+
+ assert -1 not in rs
+ assert 0 in rs
+ assert 1 in rs
+ assert 2 in rs
+ assert 3 not in rs
+ self.check(rs.ranges)
+
+ rs.add(37)
+
+ assert -1 not in rs
+ assert 0 in rs
+ assert 1 in rs
+ assert 2 in rs
+ assert 3 not in rs
+ assert 36 not in rs
+ assert 37 in rs
+ assert 38 not in rs
+ self.check(rs.ranges)
+
+ rs.add(-1)
+ self.check(rs.ranges)
+
+ rs.add(-3)
+ self.check(rs.ranges)
+
+ rs.add(1, 20)
+ assert 21 not in rs
+ assert 20 in rs
+ self.check(rs.ranges)
+
+ def testAddSelf(self):
+ a = RangedSet()
+ a.add(0, 8)
+ self.check(a.ranges)
+ a.add(0, 8)
+ self.check(a.ranges)
+ assert len(a.ranges) == 1
+ range = a.ranges[0]
+ assert range.lower == 0
+ assert range.upper == 8
+
+ def testEmpty(self):
+ s = RangedSet()
+ assert s.empty()
+ s.add(0, -1)
+ assert s.empty()
+ s.add(0, 0)
+ assert not s.empty()
+
+ def testMinMax(self):
+ s = RangedSet()
+ assert s.max() is None
+ assert s.min() is None
+ s.add(0, 10)
+ assert s.max() == 10
+ assert s.min() == 0
+ s.add(0, 5)
+ assert s.max() == 10
+ assert s.min() == 0
+ s.add(0, 11)
+ assert s.max() == 11
+ assert s.min() == 0
+ s.add(15, 20)
+ assert s.max() == 20
+ assert s.min() == 0
+ s.add(-10, -5)
+ assert s.max() == 20
+ assert s.min() == -10
+
+class RangeTest(TestCase):
+
+ def testIntersect1(self):
+ a = Range(0, 10)
+ b = Range(9, 20)
+ i1 = a.intersect(b)
+ i2 = b.intersect(a)
+ assert i1.upper == 10
+ assert i2.upper == 10
+ assert i1.lower == 9
+ assert i2.lower == 9
+
+ def testIntersect2(self):
+ a = Range(0, 10)
+ b = Range(11, 20)
+ assert a.intersect(b) == None
+ assert b.intersect(a) == None
+
+ def testIntersect3(self):
+ a = Range(0, 10)
+ b = Range(3, 5)
+ i1 = a.intersect(b)
+ i2 = b.intersect(a)
+ assert i1.upper == 5
+ assert i2.upper == 5
+ assert i1.lower == 3
+ assert i2.lower == 3
+
+class UUIDTest(TestCase):
+
+ def test(self):
+ # this test is kind of lame, but it does excercise the basic
+ # functionality of the class
+ u = uuid4()
+ for i in xrange(1024):
+ assert u != uuid4()
+
+class MessageTest(TestCase):
+
+ def setUp(self):
+ self.mp = MessageProperties()
+ self.dp = DeliveryProperties()
+ self.fp = FragmentProperties()
+
+ def testHas(self):
+ m = Message(self.mp, self.dp, self.fp, "body")
+ assert m.has("message_properties")
+ assert m.has("delivery_properties")
+ assert m.has("fragment_properties")
+
+ def testGet(self):
+ m = Message(self.mp, self.dp, self.fp, "body")
+ assert m.get("message_properties") == self.mp
+ assert m.get("delivery_properties") == self.dp
+ assert m.get("fragment_properties") == self.fp
+
+ def testSet(self):
+ m = Message(self.mp, self.dp, "body")
+ assert m.get("fragment_properties") is None
+ m.set(self.fp)
+ assert m.get("fragment_properties") == self.fp
+
+ def testSetOnEmpty(self):
+ m = Message("body")
+ assert m.get("delivery_properties") is None
+ m.set(self.dp)
+ assert m.get("delivery_properties") == self.dp
+
+ def testSetReplace(self):
+ m = Message(self.mp, self.dp, self.fp, "body")
+ dp = DeliveryProperties()
+ assert m.get("delivery_properties") == self.dp
+ assert m.get("delivery_properties") != dp
+ m.set(dp)
+ assert m.get("delivery_properties") != self.dp
+ assert m.get("delivery_properties") == dp
+
+ def testClear(self):
+ m = Message(self.mp, self.dp, self.fp, "body")
+ assert m.get("message_properties") == self.mp
+ assert m.get("delivery_properties") == self.dp
+ assert m.get("fragment_properties") == self.fp
+ m.clear("fragment_properties")
+ assert m.get("fragment_properties") is None
+ assert m.get("message_properties") == self.mp
+ assert m.get("delivery_properties") == self.dp
+
+class TimestampTest(TestCase):
+
+ def check(self, expected, *values):
+ for v in values:
+ assert isinstance(v, timestamp)
+ assert v == expected
+ assert v == timestamp(expected)
+
+ def testAdd(self):
+ self.check(4.0,
+ timestamp(2.0) + 2.0,
+ 2.0 + timestamp(2.0))
+
+ def testSub(self):
+ self.check(2.0,
+ timestamp(4.0) - 2.0,
+ 4.0 - timestamp(2.0))
+
+ def testNeg(self):
+ self.check(-4.0, -timestamp(4.0))
+
+ def testPos(self):
+ self.check(+4.0, +timestamp(4.0))
+
+ def testAbs(self):
+ self.check(4.0, abs(timestamp(-4.0)))
+
+ def testConversion(self):
+ dt = timestamp(0).datetime()
+ t = timestamp(dt)
+ assert t == 0
diff --git a/qpid/python/qpid/tests/framing.py b/qpid/python/qpid/tests/framing.py
new file mode 100644
index 0000000000..0b33df8b9a
--- /dev/null
+++ b/qpid/python/qpid/tests/framing.py
@@ -0,0 +1,289 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+from qpid.tests import Test
+from qpid.framing import *
+
+class Base(Test):
+
+ def cmp_frames(self, frm1, frm2):
+ assert frm1.flags == frm2.flags, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.type == frm2.type, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.track == frm2.track, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.channel == frm2.channel, "expected: %r, got %r" % (frm1, frm2)
+ assert frm1.payload == frm2.payload, "expected: %r, got %r" % (frm1, frm2)
+
+ def cmp_segments(self, seg1, seg2):
+ assert seg1.first == seg2.first, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.last == seg2.last, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.type == seg2.type, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.track == seg2.track, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2)
+ assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2)
+
+ def cmp_list(self, l1, l2):
+ if l1 is None:
+ assert l2 is None
+ return
+
+ assert len(l1) == len(l2)
+ for v1, v2 in zip(l1, l2):
+ if isinstance(v1, Compound):
+ self.cmp_ops(v1, v2)
+ else:
+ assert v1 == v2
+
+ def cmp_ops(self, op1, op2):
+ if op1 is None:
+ assert op2 is None
+ return
+
+ assert op1.__class__ == op2.__class__
+ cls = op1.__class__
+ assert op1.NAME == op2.NAME
+ assert op1.CODE == op2.CODE
+ assert op1.FIELDS == op2.FIELDS
+ for f in cls.FIELDS:
+ v1 = getattr(op1, f.name)
+ v2 = getattr(op2, f.name)
+ if COMPOUND.has_key(f.type) or f.type == "struct32":
+ self.cmp_ops(v1, v2)
+ elif f.type in ("list", "array"):
+ self.cmp_list(v1, v2)
+ else:
+ assert v1 == v2, "expected: %r, got %r" % (v1, v2)
+
+ if issubclass(cls, Command) or issubclass(cls, Control):
+ assert op1.channel == op2.channel
+
+ if issubclass(cls, Command):
+ assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync)
+ assert (op1.headers is None and op2.headers is None) or \
+ (op1.headers is not None and op2.headers is not None)
+ if op1.headers is not None:
+ assert len(op1.headers) == len(op2.headers)
+ for h1, h2 in zip(op1.headers, op2.headers):
+ self.cmp_ops(h1, h2)
+
+class FrameTest(Base):
+
+ def enc_dec(self, frames, encoded=None):
+ enc = FrameEncoder()
+ dec = FrameDecoder()
+
+ enc.write(*frames)
+ bytes = enc.read()
+ if encoded is not None:
+ assert bytes == encoded, "expected %r, got %r" % (encoded, bytes)
+ dec.write(bytes)
+ dframes = dec.read()
+
+ assert len(frames) == len(dframes)
+ for f, df, in zip(frames, dframes):
+ self.cmp_frames(f, df)
+
+ def testEmpty(self):
+ self.enc_dec([Frame(0, 0, 0, 0, "")],
+ "\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x00")
+
+ def testSingle(self):
+ self.enc_dec([Frame(0, 0, 0, 1, "payload")],
+ "\x00\x00\x00\x13\x00\x00\x00\x01\x00\x00\x00\x00payload")
+
+ def testMaxChannel(self):
+ self.enc_dec([Frame(0, 0, 0, 65535, "max-channel")],
+ "\x00\x00\x00\x17\x00\x00\xff\xff\x00\x00\x00\x00max-channel")
+
+ def testMaxType(self):
+ self.enc_dec([Frame(0, 255, 0, 0, "max-type")],
+ "\x00\xff\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00max-type")
+
+ def testMaxTrack(self):
+ self.enc_dec([Frame(0, 0, 15, 0, "max-track")],
+ "\x00\x00\x00\x15\x00\x0f\x00\x00\x00\x00\x00\x00max-track")
+
+ def testSequence(self):
+ self.enc_dec([Frame(0, 0, 0, 0, "zero"),
+ Frame(0, 0, 0, 1, "one"),
+ Frame(0, 0, 1, 0, "two"),
+ Frame(0, 0, 1, 1, "three"),
+ Frame(0, 1, 0, 0, "four"),
+ Frame(0, 1, 0, 1, "five"),
+ Frame(0, 1, 1, 0, "six"),
+ Frame(0, 1, 1, 1, "seven"),
+ Frame(1, 0, 0, 0, "eight"),
+ Frame(1, 0, 0, 1, "nine"),
+ Frame(1, 0, 1, 0, "ten"),
+ Frame(1, 0, 1, 1, "eleven"),
+ Frame(1, 1, 0, 0, "twelve"),
+ Frame(1, 1, 0, 1, "thirteen"),
+ Frame(1, 1, 1, 0, "fourteen"),
+ Frame(1, 1, 1, 1, "fifteen")])
+
+class SegmentTest(Base):
+
+ def enc_dec(self, segments, frames=None, interleave=None, max_payload=Frame.MAX_PAYLOAD):
+ enc = SegmentEncoder(max_payload)
+ dec = SegmentDecoder()
+
+ enc.write(*segments)
+ frms = enc.read()
+ if frames is not None:
+ assert len(frames) == len(frms), "expected %s, got %s" % (frames, frms)
+ for f1, f2 in zip(frames, frms):
+ self.cmp_frames(f1, f2)
+ if interleave is not None:
+ ilvd = []
+ for f in frms:
+ ilvd.append(f)
+ if interleave:
+ ilvd.append(interleave.pop(0))
+ ilvd.extend(interleave)
+ dec.write(*ilvd)
+ else:
+ dec.write(*frms)
+ segs = dec.read()
+ assert len(segments) == len(segs)
+ for s1, s2 in zip(segments, segs):
+ self.cmp_segments(s1, s2)
+
+ def testEmpty(self):
+ self.enc_dec([Segment(True, True, 0, 0, 0, "")],
+ [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0,
+ "")])
+
+ def testSingle(self):
+ self.enc_dec([Segment(True, True, 0, 0, 0, "payload")],
+ [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0,
+ "payload")])
+
+ def testMaxChannel(self):
+ self.enc_dec([Segment(False, False, 0, 0, 65535, "max-channel")],
+ [Frame(FIRST_FRM | LAST_FRM, 0, 0, 65535, "max-channel")])
+
+ def testMaxType(self):
+ self.enc_dec([Segment(False, False, 255, 0, 0, "max-type")],
+ [Frame(FIRST_FRM | LAST_FRM, 255, 0, 0, "max-type")])
+
+ def testMaxTrack(self):
+ self.enc_dec([Segment(False, False, 0, 15, 0, "max-track")],
+ [Frame(FIRST_FRM | LAST_FRM, 0, 15, 0, "max-track")])
+
+ def testSequence(self):
+ self.enc_dec([Segment(True, False, 0, 0, 0, "one"),
+ Segment(False, False, 0, 0, 0, "two"),
+ Segment(False, True, 0, 0, 0, "three")],
+ [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG, 0, 0, 0, "one"),
+ Frame(FIRST_FRM | LAST_FRM, 0, 0, 0, "two"),
+ Frame(FIRST_FRM | LAST_FRM | LAST_SEG, 0, 0, 0, "three")])
+
+ def testInterleaveChannel(self):
+ frames = [Frame(0, 0, 0, 0, chr(ord("a") + i)) for i in range(7)]
+ frames[0].flags |= FIRST_FRM
+ frames[-1].flags |= LAST_FRM
+
+ ilvd = [Frame(0, 0, 0, 1, chr(ord("a") + i)) for i in range(7)]
+
+ self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefg")], frames, ilvd, max_payload=1)
+
+ def testInterleaveTrack(self):
+ frames = [Frame(0, 0, 0, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1))
+ for i in range(0, 8, 2)]
+ frames[0].flags |= FIRST_FRM
+ frames[-1].flags |= LAST_FRM
+
+ ilvd = [Frame(0, 0, 1, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1))
+ for i in range(0, 8, 2)]
+
+ self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2)
+
+from qpid.ops import *
+
+class OpTest(Base):
+
+ def enc_dec(self, ops):
+ enc = OpEncoder()
+ dec = OpDecoder()
+ enc.write(*ops)
+ segs = enc.read()
+ dec.write(*segs)
+ dops = dec.read()
+ assert len(ops) == len(dops)
+ for op1, op2 in zip(ops, dops):
+ self.cmp_ops(op1, op2)
+
+ def testEmtpyMT(self):
+ self.enc_dec([MessageTransfer()])
+
+ def testEmptyMTSync(self):
+ self.enc_dec([MessageTransfer(sync=True)])
+
+ def testMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf")])
+
+ def testSyncMT(self):
+ self.enc_dec([MessageTransfer(destination="asdf", sync=True)])
+
+ def testEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="")])
+
+ def testPayloadMT(self):
+ self.enc_dec([MessageTransfer(payload="test payload")])
+
+ def testHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])])
+
+ def testHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")])
+
+ def testMultiHeadersEmptyPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])])
+
+ def testMultiHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")])
+
+ def testContentTypeHeadersPayloadMT(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")])
+
+ def testMulti(self):
+ self.enc_dec([MessageTransfer(),
+ MessageTransfer(sync=True),
+ MessageTransfer(destination="one"),
+ MessageTransfer(destination="two", sync=True),
+ MessageTransfer(destination="three", payload="test payload")])
+
+ def testControl(self):
+ self.enc_dec([SessionAttach(name="asdf")])
+
+ def testMixed(self):
+ self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")])
+
+ def testChannel(self):
+ self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)])
+
+ def testCompound(self):
+ self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])])
+
+ def testListCompound(self):
+ self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"),
+ Xid(global_id="two"),
+ Xid(global_id="three")]))])
diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py
new file mode 100644
index 0000000000..8f6680d5e3
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/__init__.py
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from math import ceil
+from qpid.harness import Skipped
+from qpid.messaging import *
+from qpid.tests import Test
+
+class Base(Test):
+
+ def setup_connection(self):
+ return None
+
+ def setup_session(self):
+ return None
+
+ def setup_sender(self):
+ return None
+
+ def setup_receiver(self):
+ return None
+
+ def setup(self):
+ self.test_id = uuid4()
+ self.broker = self.config.broker
+ try:
+ self.conn = self.setup_connection()
+ except ConnectError, e:
+ raise Skipped(e)
+ self.ssn = self.setup_session()
+ self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
+ self.rcv = self.setup_receiver()
+
+ def teardown(self):
+ if self.conn is not None and self.conn.attached():
+ self.teardown_connection(self.conn)
+ self.conn = None
+
+ def teardown_connection(self, conn):
+ conn.close(timeout=self.timeout())
+
+ def content(self, base, count = None):
+ if count is None:
+ return "%s[%s]" % (base, self.test_id)
+ else:
+ return "%s[%s, %s]" % (base, count, self.test_id)
+
+ def message(self, base, count = None, **kwargs):
+ return Message(content=self.content(base, count), **kwargs)
+
+ def ping(self, ssn):
+ PING_Q = 'ping-queue; {create: always, delete: always}'
+ # send a message
+ sender = ssn.sender(PING_Q, durable=self.durable())
+ content = self.content("ping")
+ sender.send(content)
+ receiver = ssn.receiver(PING_Q)
+ msg = receiver.fetch(0)
+ ssn.acknowledge()
+ assert msg.content == content, "expected %r, got %r" % (content, msg.content)
+
+ def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
+ messages = []
+ try:
+ while limit is None or len(messages) < limit:
+ messages.append(rcv.fetch(timeout=timeout))
+ except Empty:
+ pass
+ if expected is not None:
+ self.assertEchos(expected, messages, redelivered)
+ return messages
+
+ def diff(self, m1, m2, excluded_properties=()):
+ result = {}
+ for attr in ("id", "subject", "user_id", "reply_to",
+ "correlation_id", "durable", "priority", "ttl",
+ "redelivered", "content_type", "content"):
+ a1 = getattr(m1, attr)
+ a2 = getattr(m2, attr)
+ if a1 != a2:
+ result[attr] = (a1, a2)
+ p1 = dict(m1.properties)
+ p2 = dict(m2.properties)
+ for ep in excluded_properties:
+ p1.pop(ep, None)
+ p2.pop(ep, None)
+ if p1 != p2:
+ result["properties"] = (p1, p2)
+ return result
+
+ def assertEcho(self, msg, echo, redelivered=False):
+ if not isinstance(msg, Message) or not isinstance(echo, Message):
+ if isinstance(msg, Message):
+ msg = msg.content
+ if isinstance(echo, Message):
+ echo = echo.content
+ assert msg == echo, "expected %s, got %s" % (msg, echo)
+ else:
+ delta = self.diff(msg, echo, ("x-amqp-0-10.routing-key",))
+ mttl, ettl = delta.pop("ttl", (0, 0))
+ if redelivered:
+ assert echo.redelivered, \
+ "expected %s to be redelivered: %s" % (msg, echo)
+ if delta.has_key("redelivered"):
+ del delta["redelivered"]
+ assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
+ assert mttl >= ettl, "%s, %s" % (mttl, ettl)
+ assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
+
+ def assertEchos(self, msgs, echoes, redelivered=False):
+ assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
+ for m, e in zip(msgs, echoes):
+ self.assertEcho(m, e, redelivered)
+
+ def assertEmpty(self, rcv):
+ contents = self.drain(rcv)
+ assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
+
+ def assertAvailable(self, rcv, expected=None, lower=None, upper=None):
+ if expected is not None:
+ if lower is not None or upper is not None:
+ raise ValueError("cannot specify lower or upper when specifying expected")
+ lower = expected
+ upper = expected
+ else:
+ if lower is None:
+ lower = int(ceil(rcv.threshold*rcv.capacity))
+ if upper is None:
+ upper = rcv.capacity
+
+ p = rcv.available()
+ if upper == lower:
+ assert p == lower, "expected %s, got %s" % (lower, p)
+ else:
+ assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper)
+
+ def sleep(self):
+ time.sleep(self.delay())
+
+ def delay(self):
+ return float(self.config.defines.get("delay", "2"))
+
+ def timeout(self):
+ return float(self.config.defines.get("timeout", "60"))
+
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
+
+ def transport(self):
+ if self.broker.scheme == self.broker.AMQPS:
+ return "ssl"
+ else:
+ return "tcp"
+
+ def connection_options(self):
+ return {"reconnect": self.reconnect(),
+ "transport": self.transport()}
+
+import address, endpoints, message
diff --git a/qpid/python/qpid/tests/messaging/address.py b/qpid/python/qpid/tests/messaging/address.py
new file mode 100644
index 0000000000..aa9562a717
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/address.py
@@ -0,0 +1,321 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from qpid.tests import Test
+from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, \
+ SYM, WSPACE, LEXER
+from qpid.lexer import Token
+from qpid.harness import Skipped
+from qpid.tests.parser import ParserBase
+
+def indent(st):
+ return " " + st.replace("\n", "\n ")
+
+def pprint_address(name, subject, options):
+ return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \
+ (pprint(name), pprint(subject), pprint(options))
+
+def pprint(o):
+ if isinstance(o, dict):
+ return pprint_map(o)
+ elif isinstance(o, list):
+ return pprint_list(o)
+ elif isinstance(o, basestring):
+ return pprint_string(o)
+ else:
+ return repr(o)
+
+def pprint_map(m):
+ items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()]
+ items.sort()
+ return pprint_items("{", items, "}")
+
+def pprint_list(l):
+ return pprint_items("[", [pprint(x) for x in l], "]")
+
+def pprint_items(start, items, end):
+ if items:
+ return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end)
+ else:
+ return "%s%s" % (start, end)
+
+def pprint_string(s):
+ result = "'"
+ for c in s:
+ if c == "'":
+ result += "\\'"
+ elif c == "\n":
+ result += "\\n"
+ elif ord(c) >= 0x80:
+ result += "\\u%04x" % ord(c)
+ else:
+ result += c
+ result += "'"
+ return result
+
+class AddressTests(ParserBase, Test):
+
+ EXCLUDE = (WSPACE, EOF)
+
+ def fields(self, line, n):
+ result = line.split(":", n - 1)
+ result.extend([None]*(n - len(result)))
+ return result
+
+ def call(self, parser, mode, input):
+ try:
+ from subprocess import Popen, PIPE, STDOUT
+ po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
+ except ImportError, e:
+ raise Skipped("%s" % e)
+ except OSError, e:
+ raise Skipped("%s: %s" % (e, parser))
+ out, _ = po.communicate(input=input)
+ return out
+
+ def parser(self):
+ return self.config.defines.get("address.parser")
+
+ def do_lex(self, st):
+ parser = self.parser()
+ if parser:
+ out = self.call(parser, "lex", st)
+ lines = out.split("\n")
+ toks = []
+ for line in lines:
+ if line.strip():
+ name, position, value = self.fields(line, 3)
+ toks.append(Token(LEXER.type(name), value, position, st))
+ return toks
+ else:
+ return lex(st)
+
+ def do_parse(self, st):
+ return parse(st)
+
+ def valid(self, addr, name=None, subject=None, options=None):
+ parser = self.parser()
+ if parser:
+ got = self.call(parser, "parse", addr)
+ expected = "%s\n" % pprint_address(name, subject, options)
+ assert expected == got, "expected\n<EXP>%s</EXP>\ngot\n<GOT>%s</GOT>" % (expected, got)
+ else:
+ ParserBase.valid(self, addr, (name, subject, options))
+
+ def invalid(self, addr, error=None):
+ parser = self.parser()
+ if parser:
+ got = self.call(parser, "parse", addr)
+ expected = "ERROR: %s\n" % error
+ assert expected == got, "expected %r, got %r" % (expected, got)
+ else:
+ ParserBase.invalid(self, addr, error)
+
+ def testDashInId1(self):
+ self.lex("foo-bar", ID)
+
+ def testDashInId2(self):
+ self.lex("foo-3", ID)
+
+ def testDashAlone1(self):
+ self.lex("foo - bar", ID, SYM, ID)
+
+ def testDashAlone2(self):
+ self.lex("foo - 3", ID, SYM, NUMBER)
+
+ def testLeadingDash(self):
+ self.lex("-foo", SYM, ID)
+
+ def testTrailingDash(self):
+ self.lex("foo-", ID, SYM)
+
+ def testNegativeNum(self):
+ self.lex("-3", NUMBER)
+
+ def testIdNum(self):
+ self.lex("id1", ID)
+
+ def testIdSpaceNum(self):
+ self.lex("id 1", ID, NUMBER)
+
+ def testHash(self):
+ self.valid("foo/bar.#", "foo", "bar.#")
+
+ def testStar(self):
+ self.valid("foo/bar.*", "foo", "bar.*")
+
+ def testColon(self):
+ self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf")
+
+ def testOptions(self):
+ self.valid("foo.bar/baz.qux:moo:arf; {key: value}",
+ "foo.bar", "baz.qux:moo:arf", {"key": "value"})
+
+ def testOptionsTrailingComma(self):
+ self.valid("name/subject; {key: value,}", "name", "subject",
+ {"key": "value"})
+
+ def testOptionsNone(self):
+ self.valid("name/subject; {key: None}", "name", "subject",
+ {"key": None})
+
+ def testSemiSubject(self):
+ self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}",
+ "foo.bar", "baz.qux;moo:arf", {"key": "value"})
+
+ def testCommaSubject(self):
+ self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}")
+
+ def testCommaSubjectOptions(self):
+ self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar",
+ "baz.qux.{moo,arf}", {"key": "value"})
+
+ def testUnbalanced(self):
+ self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar",
+ "baz.qux.{moo,arf", {"key": "value"})
+
+ def testSlashQuote(self):
+ self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}",
+ "foo.bar/baz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashHexEsc1(self):
+ self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}",
+ "foo.bar\x00baz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashHexEsc2(self):
+ self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}",
+ "foo.bar\xffbaz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashHexEsc3(self):
+ self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}",
+ "foo.bar\xFFbaz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashUnicode1(self):
+ self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}",
+ u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"})
+
+ def testSlashUnicode2(self):
+ self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}",
+ u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"})
+
+ def testSlashUnicode3(self):
+ self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}",
+ u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"})
+
+ def testSlashUnicode4(self):
+ self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}",
+ u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"})
+
+ def testNoName(self):
+ self.invalid("; {key: value}",
+ "unexpected token SEMI(;) line:1,0:; {key: value}")
+
+ def testEmpty(self):
+ self.invalid("", "unexpected token EOF line:1,0:")
+
+ def testNoNameSlash(self):
+ self.invalid("/asdf; {key: value}",
+ "unexpected token SLASH(/) line:1,0:/asdf; {key: value}")
+
+ def testBadOptions1(self):
+ self.invalid("name/subject; {",
+ "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), "
+ "got EOF line:1,15:name/subject; {")
+
+ def testBadOptions2(self):
+ self.invalid("name/subject; { 3",
+ "expecting COLON, got EOF "
+ "line:1,17:name/subject; { 3")
+
+ def testBadOptions3(self):
+ self.invalid("name/subject; { key:",
+ "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF "
+ "line:1,20:name/subject; { key:")
+
+ def testBadOptions4(self):
+ self.invalid("name/subject; { key: value",
+ "expecting (COMMA, RBRACE), got EOF "
+ "line:1,26:name/subject; { key: value")
+
+ def testBadOptions5(self):
+ self.invalid("name/subject; { key: value asdf",
+ "expecting (COMMA, RBRACE), got ID(asdf) "
+ "line:1,27:name/subject; { key: value asdf")
+
+ def testBadOptions6(self):
+ self.invalid("name/subject; { key: value,",
+ "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF "
+ "line:1,27:name/subject; { key: value,")
+
+ def testBadOptions7(self):
+ self.invalid("name/subject; { key: value } asdf",
+ "expecting EOF, got ID(asdf) "
+ "line:1,29:name/subject; { key: value } asdf")
+
+ def testList1(self):
+ self.valid("name/subject; { key: [] }", "name", "subject", {"key": []})
+
+ def testList2(self):
+ self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']})
+
+ def testList3(self):
+ self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject",
+ {"key": [1, 2, 3]})
+
+ def testList4(self):
+ self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject",
+ {"key": [1, [2, 3], 4]})
+
+ def testBadList1(self):
+ self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), "
+ "got RBRACE(}) line:1,23:name/subject; { key: [ }")
+
+ def testBadList2(self):
+ self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), "
+ "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }")
+
+ def testBadList3(self):
+ self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), "
+ "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }")
+
+ def testBadList4(self):
+ self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), "
+ "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }")
+
+ def testMap1(self):
+ self.valid("name/subject; { 'key': value }",
+ "name", "subject", {"key": "value"})
+
+ def testMap2(self):
+ self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"})
+
+ def testMap3(self):
+ self.valid('name/subject; { "foo.bar": value }',
+ "name", "subject", {"foo.bar": "value"})
+
+ def testBoolean(self):
+ self.valid("name/subject; { true1: True, true2: true, "
+ "false1: False, false2: false }",
+ "name", "subject", {"true1": True, "true2": True,
+ "false1": False, "false2": False})
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
new file mode 100644
index 0000000000..db5ec03df2
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -0,0 +1,1335 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+import errno, os, socket, sys, time
+from qpid import compat
+from qpid.compat import set
+from qpid.messaging import *
+from qpid.messaging.transports import TRANSPORTS
+from qpid.tests.messaging import Base
+from threading import Thread
+
+class SetupTests(Base):
+
+ def testEstablish(self):
+ self.conn = Connection.establish(self.broker, **self.connection_options())
+ self.ping(self.conn.session())
+
+ def testOpen(self):
+ self.conn = Connection(self.broker, **self.connection_options())
+ self.conn.open()
+ self.ping(self.conn.session())
+
+ def testOpenReconnectURLs(self):
+ options = self.connection_options()
+ options["reconnect_urls"] = [self.broker, self.broker]
+ self.conn = Connection(self.broker, **options)
+ self.conn.open()
+ self.ping(self.conn.session())
+
+ def testTcpNodelay(self):
+ self.conn = Connection.establish(self.broker, tcp_nodelay=True)
+ assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+
+ def testConnectError(self):
+ try:
+ # Specifying port 0 yields a bad address on Windows; port 4 is unassigned
+ self.conn = Connection.establish("localhost:4")
+ assert False, "connect succeeded"
+ except ConnectError, e:
+ assert "refused" in str(e)
+
+ def testGetError(self):
+ self.conn = Connection("localhost:0")
+ try:
+ self.conn.open()
+ assert False, "connect succeeded"
+ except ConnectError, e:
+ assert self.conn.get_error() == e
+
+ def use_fds(self):
+ fds = []
+ try:
+ while True:
+ fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY))
+ except OSError, e:
+ if e.errno != errno.EMFILE:
+ raise e
+ else:
+ return fds
+
+ def testOpenCloseResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection.establish(self.broker, **self.connection_options())
+ conn.close()
+ finally:
+ while fds:
+ os.close(fds.pop())
+
+ def testOpenFailResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection("localhost:0", **self.connection_options())
+ # XXX: we need to force a waiter to be created for this test
+ # to work
+ conn._lock.acquire()
+ conn._wait(lambda: False, timeout=0.001)
+ conn._lock.release()
+ try:
+ conn.open()
+ except ConnectError, e:
+ pass
+ finally:
+ while fds:
+ os.close(fds.pop())
+
+ def testReconnect(self):
+ options = self.connection_options()
+ real = TRANSPORTS["tcp"]
+
+ class flaky:
+
+ def __init__(self, conn, host, port):
+ self.real = real(conn, host, port)
+ self.sent_count = 0
+ self.recv_count = 0
+
+ def fileno(self):
+ return self.real.fileno()
+
+ def reading(self, reading):
+ return self.real.reading(reading)
+
+ def writing(self, writing):
+ return self.real.writing(writing)
+
+ def send(self, bytes):
+ if self.sent_count > 2048:
+ raise socket.error("fake error")
+ n = self.real.send(bytes)
+ self.sent_count += n
+ return n
+
+ def recv(self, n):
+ if self.recv_count > 2048:
+ return ""
+ bytes = self.real.recv(n)
+ self.recv_count += len(bytes)
+ return bytes
+
+ def close(self):
+ self.real.close()
+
+ TRANSPORTS["flaky"] = flaky
+
+ options["reconnect"] = True
+ options["reconnect_interval"] = 0
+ options["reconnect_limit"] = 100
+ options["reconnect_log"] = False
+ options["transport"] = "flaky"
+
+ self.conn = Connection.establish(self.broker, **options)
+ ssn = self.conn.session()
+ snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
+ rcv = ssn.receiver(snd.target)
+
+ msgs = [self.message("testReconnect", i) for i in range(20)]
+ for m in msgs:
+ snd.send(m)
+
+ content = set()
+ drained = []
+ duplicates = []
+ try:
+ while True:
+ m = rcv.fetch(timeout=0)
+ if m.content not in content:
+ content.add(m.content)
+ drained.append(m)
+ else:
+ duplicates.append(m)
+ ssn.acknowledge(m)
+ except Empty:
+ pass
+ # XXX: apparently we don't always get duplicates, should figure out why
+ #assert duplicates, "no duplicates"
+ assert len(drained) == len(msgs)
+ for m, d in zip(msgs, drained):
+ # XXX: we should figure out how to provide proper end to end
+ # redelivered
+ self.assertEcho(m, d, d.redelivered)
+
+class ConnectionTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def testCheckClosed(self):
+ assert not self.conn.check_closed()
+
+ def testSessionAnon(self):
+ ssn1 = self.conn.session()
+ ssn2 = self.conn.session()
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+
+ def testSessionNamed(self):
+ ssn1 = self.conn.session("one")
+ ssn2 = self.conn.session("two")
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+ assert ssn1 is self.conn.session("one")
+ assert ssn2 is self.conn.session("two")
+
+ def testDetach(self):
+ ssn = self.conn.session()
+ self.ping(ssn)
+ self.conn.detach()
+ try:
+ self.ping(ssn)
+ assert False, "ping succeeded"
+ except Detached:
+ # this is the expected failure when pinging on a detached
+ # connection
+ pass
+ self.conn.attach()
+ self.ping(ssn)
+
+ def testClose(self):
+ self.conn.close()
+ assert not self.conn.attached()
+
+ def testSimultaneousClose(self):
+ ssns = [self.conn.session() for i in range(3)]
+ for s in ssns:
+ for i in range(3):
+ s.receiver("amq.topic")
+ s.sender("amq.topic")
+
+ def closer(errors):
+ try:
+ self.conn.close()
+ except:
+ _, e, _ = sys.exc_info()
+ errors.append(compat.format_exc(e))
+
+ t1_errors = []
+ t2_errors = []
+ t1 = Thread(target=lambda: closer(t1_errors))
+ t2 = Thread(target=lambda: closer(t2_errors))
+ t1.start()
+ t2.start()
+ t1.join(self.delay())
+ t2.join(self.delay())
+
+ assert not t1_errors, t1_errors[0]
+ assert not t2_errors, t2_errors[0]
+
+class hangable:
+
+ def __init__(self, conn, host, port):
+ self.tcp = TRANSPORTS["tcp"](conn, host, port)
+ self.hung = False
+
+ def hang(self):
+ self.hung = True
+
+ def fileno(self):
+ return self.tcp.fileno()
+
+ def reading(self, reading):
+ if self.hung:
+ return True
+ else:
+ return self.tcp.reading(reading)
+
+ def writing(self, writing):
+ if self.hung:
+ return False
+ else:
+ return self.tcp.writing(writing)
+
+ def send(self, bytes):
+ if self.hung:
+ return 0
+ else:
+ return self.tcp.send(bytes)
+
+ def recv(self, n):
+ if self.hung:
+ return ""
+ else:
+ return self.tcp.recv(n)
+
+ def close(self):
+ self.tcp.close()
+
+TRANSPORTS["hangable"] = hangable
+
+class TimeoutTests(Base):
+
+ def setup_connection(self):
+ options = self.connection_options()
+ options["transport"] = "hangable"
+ return Connection.establish(self.broker, **options)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("amq.topic")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}")
+
+ def teardown_connection(self, conn):
+ try:
+ conn.detach(timeout=0)
+ except Timeout:
+ pass
+
+ def hang(self):
+ self.conn._driver._transport.hang()
+
+ def timeoutTest(self, method):
+ self.hang()
+ try:
+ method(timeout=self.delay())
+ assert False, "did not time out"
+ except Timeout:
+ pass
+
+ def testSenderSync(self):
+ self.snd.send(self.content("testSenderSync"), sync=False)
+ self.timeoutTest(self.snd.sync)
+
+ def testSenderClose(self):
+ self.snd.send(self.content("testSenderClose"), sync=False)
+ self.timeoutTest(self.snd.close)
+
+ def testReceiverClose(self):
+ self.timeoutTest(self.rcv.close)
+
+ def testSessionSync(self):
+ self.snd.send(self.content("testSessionSync"), sync=False)
+ self.timeoutTest(self.ssn.sync)
+
+ def testSessionClose(self):
+ self.timeoutTest(self.ssn.close)
+
+ def testConnectionDetach(self):
+ self.timeoutTest(self.conn.detach)
+
+ def testConnectionClose(self):
+ self.timeoutTest(self.conn.close)
+
+ACK_QC = 'test-ack-queue; {create: always}'
+ACK_QD = 'test-ack-queue; {delete: always}'
+
+class SessionTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testSender(self):
+ snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
+ durable=self.durable())
+ snd2 = self.ssn.sender(snd.target, durable=self.durable())
+ assert snd is not snd2
+ snd2.close()
+
+ content = self.content("testSender")
+ snd.send(content)
+ rcv = self.ssn.receiver(snd.target)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testReceiver(self):
+ rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
+ rcv2 = self.ssn.receiver(rcv.source)
+ assert rcv is not rcv2
+ rcv2.close()
+
+ content = self.content("testReceiver")
+ snd = self.ssn.sender(rcv.source, durable=self.durable())
+ snd.send(content)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+ snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
+
+ def testDetachedReceiver(self):
+ self.conn.detach()
+ rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
+ m = self.content("testDetachedReceiver")
+ self.conn.attach()
+ snd = self.ssn.sender("test-dis-rcv-queue")
+ snd.send(m)
+ self.drain(rcv, expected=[m])
+
+ def testNextReceiver(self):
+ ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
+ rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+
+ snd = self.ssn.sender(ADDR)
+
+ msgs = []
+ for i in range(10):
+ content = self.content("testNextReceiver", i)
+ snd.send(content)
+ msgs.append(content)
+
+ fetched = []
+ try:
+ while True:
+ rcv = self.ssn.next_receiver(timeout=self.delay())
+ assert rcv in (rcv1, rcv2, rcv3)
+ assert rcv.available() > 0
+ fetched.append(rcv.fetch().content)
+ except Empty:
+ pass
+ assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
+ self.ssn.acknowledge()
+ #we set the capacity to 0 to prevent the deletion of the queue -
+ #triggered the deletion policy when the first receiver is closed -
+ #resulting in session exceptions being issued for the remaining
+ #active subscriptions:
+ for r in [rcv1, rcv2, rcv3]:
+ r.capacity = 0
+
+ # XXX, we need a convenient way to assert that required queues are
+ # empty on setup, and possibly also to drain queues on teardown
+ def ackTest(self, acker, ack_capacity=None):
+ # send a bunch of messages
+ snd = self.ssn.sender(ACK_QC, durable=self.durable())
+ contents = [self.content("ackTest", i) for i in range(15)]
+ for c in contents:
+ snd.send(c)
+
+ # drain the queue, verify the messages are there and then close
+ # without acking
+ rcv = self.ssn.receiver(ACK_QC)
+ self.drain(rcv, expected=contents)
+ self.ssn.close()
+
+ # drain the queue again, verify that they are all the messages
+ # were requeued, and ack this time before closing
+ self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
+ rcv = self.ssn.receiver(ACK_QC)
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
+ self.ssn.close()
+
+ # drain the queue a final time and verify that the messages were
+ # dequeued
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver(ACK_QD)
+ self.assertEmpty(rcv)
+
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver(ACK_QD))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node: {
+ x-declare: {
+ alternate-exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
+ for i in range(count):
+ c = self.message(base, i)
+ snd.send(c)
+ messages.append(c)
+ snd.close()
+ return messages
+
+ def txTest(self, commit):
+ TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
+ TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
+ txssn = self.conn.session(transactional=True)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
+ rcv = self.ssn.receiver(txrcv.source)
+ copy_rcv = self.ssn.receiver(txsnd.target)
+ self.assertEmpty(copy_rcv)
+ for i in range(3):
+ m = txrcv.fetch(0)
+ txsnd.send(m)
+ self.assertEmpty(copy_rcv)
+ txssn.acknowledge()
+ if commit:
+ txssn.commit()
+ self.assertEmpty(rcv)
+ self.drain(copy_rcv, expected=messages)
+ else:
+ txssn.rollback()
+ self.drain(rcv, expected=messages, redelivered=True)
+ self.assertEmpty(copy_rcv)
+ self.ssn.acknowledge()
+
+ def testCommit(self):
+ self.txTest(True)
+
+ def testRollback(self):
+ self.txTest(False)
+
+ def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
+ txssn = self.conn.session(transactional=True)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
+ self.assertEmpty(rcv)
+
+ if commit:
+ txssn.commit()
+ self.drain(rcv, expected=messages)
+ self.ssn.acknowledge()
+ else:
+ txssn.rollback()
+ self.assertEmpty(rcv)
+ txssn.commit()
+ self.assertEmpty(rcv)
+
+ def testCommitSend(self):
+ self.txTestSend(True)
+
+ def testRollbackSend(self):
+ self.txTestSend(False)
+
+ def txTestAck(self, commit):
+ TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
+ txssn = self.conn.session(transactional=True)
+ txrcv = txssn.receiver(TX_ACK_QC)
+ self.assertEmpty(txrcv)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
+
+ if commit:
+ txssn.acknowledge()
+ else:
+ txssn.rollback()
+ self.drain(txrcv, expected=messages, redelivered=True)
+ txssn.acknowledge()
+ txssn.rollback()
+ self.drain(txrcv, expected=messages, redelivered=True)
+ txssn.commit() # commit without ack
+ self.assertEmpty(txrcv)
+
+ txssn.close()
+
+ txssn = self.conn.session(transactional=True)
+ txrcv = txssn.receiver(TX_ACK_QC)
+ self.drain(txrcv, expected=messages, redelivered=True)
+ txssn.acknowledge()
+ txssn.commit()
+ rcv = self.ssn.receiver(TX_ACK_QD)
+ self.assertEmpty(rcv)
+ txssn.close()
+ self.assertEmpty(rcv)
+
+ def testCommitAck(self):
+ self.txTestAck(True)
+
+ def testRollbackAck(self):
+ self.txTestAck(False)
+
+ def testDoubleCommit(self):
+ ssn = self.conn.session(transactional=True)
+ snd = ssn.sender("amq.direct")
+ rcv = ssn.receiver("amq.direct")
+ msgs = [self.message("testDoubleCommit", i) for i in range(3)]
+ for m in msgs:
+ snd.send(m)
+ ssn.commit()
+ self.drain(rcv, expected=msgs)
+ ssn.acknowledge()
+ ssn.commit()
+
+ def testClose(self):
+ self.ssn.close()
+ try:
+ self.ping(self.ssn)
+ assert False, "ping succeeded"
+ except Detached:
+ pass
+
+RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
+
+class ReceiverTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(RECEIVER_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(RECEIVER_Q)
+
+ def send(self, base, count = None, sync=True):
+ content = self.content(base, count)
+ self.snd.send(content, sync=sync)
+ return content
+
+ def testFetch(self):
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ pass
+ try:
+ start = time.time()
+ msg = self.rcv.fetch(self.delay())
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ elapsed = time.time() - start
+ assert elapsed >= self.delay()
+
+ one = self.send("testFetch", 1)
+ two = self.send("testFetch", 2)
+ three = self.send("testFetch", 3)
+ msg = self.rcv.fetch(0)
+ assert msg.content == one
+ msg = self.rcv.fetch(self.delay())
+ assert msg.content == two
+ msg = self.rcv.fetch()
+ assert msg.content == three
+ self.ssn.acknowledge()
+
+ def fetchFromClosedTest(self, entry):
+ entry.close()
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected result: %s" % msg
+ except Empty, e:
+ assert False, "unexpected exception: %s" % e
+ except LinkClosed, e:
+ pass
+
+ def testFetchFromClosedReceiver(self):
+ self.fetchFromClosedTest(self.rcv)
+
+ def testFetchFromClosedSession(self):
+ self.fetchFromClosedTest(self.ssn)
+
+ def testFetchFromClosedConnection(self):
+ self.fetchFromClosedTest(self.conn)
+
+ def fetchFromConcurrentCloseTest(self, entry):
+ def closer():
+ self.sleep()
+ entry.close()
+ t = Thread(target=closer)
+ t.start()
+ try:
+ msg = self.rcv.fetch()
+ assert False, "unexpected result: %s" % msg
+ except Empty, e:
+ assert False, "unexpected exception: %s" % e
+ except LinkClosed, e:
+ pass
+ t.join()
+
+ def testFetchFromConcurrentCloseReceiver(self):
+ self.fetchFromConcurrentCloseTest(self.rcv)
+
+ def testFetchFromConcurrentCloseSession(self):
+ self.fetchFromConcurrentCloseTest(self.ssn)
+
+ def testFetchFromConcurrentCloseConnection(self):
+ self.fetchFromConcurrentCloseTest(self.conn)
+
+ def testCapacityIncrease(self):
+ content = self.send("testCapacityIncrease")
+ self.sleep()
+ assert self.rcv.available() == 0
+ self.rcv.capacity = UNLIMITED
+ self.sleep()
+ assert self.rcv.available() == 1
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+ assert self.rcv.available() == 0
+ self.ssn.acknowledge()
+
+ def testCapacityDecrease(self):
+ self.rcv.capacity = UNLIMITED
+ one = self.send("testCapacityDecrease", 1)
+ self.sleep()
+ assert self.rcv.available() == 1
+ msg = self.rcv.fetch(0)
+ assert msg.content == one
+
+ self.rcv.capacity = 0
+
+ two = self.send("testCapacityDecrease", 2)
+ self.sleep()
+ assert self.rcv.available() == 0
+ msg = self.rcv.fetch(0)
+ assert msg.content == two
+
+ self.ssn.acknowledge()
+
+ def capacityTest(self, capacity, threshold=None):
+ if threshold is not None:
+ self.rcv.threshold = threshold
+ self.rcv.capacity = capacity
+ self.assertAvailable(self.rcv, 0)
+
+ for i in range(2*capacity):
+ self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
+ self.snd.sync()
+ self.sleep()
+ self.assertAvailable(self.rcv)
+
+ first = capacity/2
+ second = capacity - first
+ self.drain(self.rcv, limit = first)
+ self.sleep()
+ self.assertAvailable(self.rcv)
+ self.drain(self.rcv, limit = second)
+ self.sleep()
+ self.assertAvailable(self.rcv)
+
+ drained = self.drain(self.rcv)
+ assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
+ self.assertAvailable(self.rcv, 0)
+
+ self.ssn.acknowledge()
+
+ def testCapacity5(self):
+ self.capacityTest(5)
+
+ def testCapacity5Threshold1(self):
+ self.capacityTest(5, 1)
+
+ def testCapacity10(self):
+ self.capacityTest(10)
+
+ def testCapacity10Threshold1(self):
+ self.capacityTest(10, 1)
+
+ def testCapacity100(self):
+ self.capacityTest(100)
+
+ def testCapacity100Threshold1(self):
+ self.capacityTest(100, 1)
+
+ def testCapacityUNLIMITED(self):
+ self.rcv.capacity = UNLIMITED
+ self.assertAvailable(self.rcv, 0)
+
+ for i in range(10):
+ self.send("testCapacityUNLIMITED", i)
+ self.sleep()
+ self.assertAvailable(self.rcv, 10)
+
+ self.drain(self.rcv)
+ self.assertAvailable(self.rcv, 0)
+
+ self.ssn.acknowledge()
+
+ def testAvailable(self):
+ self.rcv.capacity = UNLIMITED
+ assert self.rcv.available() == 0
+
+ for i in range(3):
+ self.send("testAvailable", i)
+ self.sleep()
+ assert self.rcv.available() == 3
+
+ for i in range(3, 10):
+ self.send("testAvailable", i)
+ self.sleep()
+ assert self.rcv.available() == 10
+
+ self.drain(self.rcv, limit=3)
+ assert self.rcv.available() == 7
+
+ self.drain(self.rcv)
+ assert self.rcv.available() == 0
+
+ self.ssn.acknowledge()
+
+ def testDoubleClose(self):
+ m1 = self.content("testDoubleClose", 1)
+ m2 = self.content("testDoubleClose", 2)
+
+ snd = self.ssn.sender("""test-double-close; {
+ create: always,
+ delete: sender,
+ node: {
+ type: topic
+ }
+}
+""")
+ r1 = self.ssn.receiver(snd.target)
+ r2 = self.ssn.receiver(snd.target)
+ snd.send(m1)
+ self.drain(r1, expected=[m1])
+ self.drain(r2, expected=[m1])
+ r1.close()
+ snd.send(m2)
+ self.drain(r2, expected=[m2])
+ r2.close()
+
+ # XXX: need testClose
+
+ def testMode(self):
+ msgs = [self.content("testMode", 1),
+ self.content("testMode", 2),
+ self.content("testMode", 3)]
+
+ for m in msgs:
+ self.snd.send(m)
+
+ rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
+ rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
+ self.drain(rb, expected=msgs)
+ self.drain(rc, expected=msgs)
+ rb2 = self.ssn.receiver(rb.source)
+ self.assertEmpty(rb2)
+ self.drain(self.rcv, expected=[])
+
+ # XXX: need testUnsettled()
+
+ def unreliabilityTest(self, mode="unreliable"):
+ msgs = [self.message("testUnreliable", i) for i in range(3)]
+ snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}")
+ rcv = self.ssn.receiver(snd.target)
+ for m in msgs:
+ snd.send(m)
+
+ # close without ack on reliable receiver, messages should be requeued
+ ssn = self.conn.session()
+ rrcv = ssn.receiver("test-unreliability-queue")
+ self.drain(rrcv, expected=msgs)
+ ssn.close()
+
+ # close without ack on unreliable receiver, messages should not be requeued
+ ssn = self.conn.session()
+ urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode)
+ self.drain(urcv, expected=msgs, redelivered=True)
+ ssn.close()
+
+ self.assertEmpty(rcv)
+
+ def testUnreliable(self):
+ self.unreliabilityTest(mode="unreliable")
+
+ def testAtMostOnce(self):
+ self.unreliabilityTest(mode="at-most-once")
+
+class AddressTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def badOption(self, options, error):
+ try:
+ self.ssn.sender("test-bad-options-snd; %s" % options)
+ assert False
+ except InvalidOption, e:
+ assert "error in options: %s" % error == str(e), e
+
+ try:
+ self.ssn.receiver("test-bad-options-rcv; %s" % options)
+ assert False
+ except InvalidOption, e:
+ assert "error in options: %s" % error == str(e), e
+
+ def testIllegalKey(self):
+ self.badOption("{create: always, node: "
+ "{this-property-does-not-exist: 3}}",
+ "node: this-property-does-not-exist: "
+ "illegal key")
+
+ def testWrongValue(self):
+ self.badOption("{create: asdf}", "create: asdf not in "
+ "('always', 'sender', 'receiver', 'never')")
+
+ def testWrongType1(self):
+ self.badOption("{node: asdf}",
+ "node: asdf is not a map")
+
+ def testWrongType2(self):
+ self.badOption("{node: {durable: []}}",
+ "node: durable: [] is not a bool")
+
+ def testCreateQueue(self):
+ snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
+ "node: {type: queue, durable: False, "
+ "x-declare: {auto_delete: true}}}")
+ content = self.content("testCreateQueue")
+ snd.send(content)
+ rcv = self.ssn.receiver("test-create-queue")
+ self.drain(rcv, expected=[content])
+
+ def createExchangeTest(self, props=""):
+ addr = """test-create-exchange; {
+ create: always,
+ delete: always,
+ node: {
+ type: topic,
+ durable: False,
+ x-declare: {auto_delete: true, %s}
+ }
+ }""" % props
+ snd = self.ssn.sender(addr)
+ snd.send("ping")
+ rcv1 = self.ssn.receiver("test-create-exchange/first")
+ rcv2 = self.ssn.receiver("test-create-exchange/first")
+ rcv3 = self.ssn.receiver("test-create-exchange/second")
+ for r in (rcv1, rcv2, rcv3):
+ try:
+ r.fetch(0)
+ assert False
+ except Empty:
+ pass
+ msg1 = Message(self.content("testCreateExchange", 1), subject="first")
+ msg2 = Message(self.content("testCreateExchange", 2), subject="second")
+ snd.send(msg1)
+ snd.send(msg2)
+ self.drain(rcv1, expected=[msg1.content])
+ self.drain(rcv2, expected=[msg1.content])
+ self.drain(rcv3, expected=[msg2.content])
+
+ def testCreateExchange(self):
+ self.createExchangeTest()
+
+ def testCreateExchangeDirect(self):
+ self.createExchangeTest("type: direct")
+
+ def testCreateExchangeTopic(self):
+ self.createExchangeTest("type: topic")
+
+ def testDeleteBySender(self):
+ snd = self.ssn.sender("test-delete; {create: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete; {delete: always}")
+ snd.send("ping")
+ snd.close()
+ try:
+ self.ssn.sender("test-delete")
+ except NotFound, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteByReceiver(self):
+ rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
+ try:
+ rcv.fetch(0)
+ except Empty:
+ pass
+ rcv.close()
+
+ try:
+ self.ssn.receiver("test-delete")
+ assert False
+ except NotFound, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteSpecial(self):
+ snd = self.ssn.sender("amq.topic; {delete: always}")
+ snd.send("asdf")
+ try:
+ snd.close()
+ assert False, "successfully deleted amq.topic"
+ except SessionError, e:
+ assert "Cannot delete default exchange" in str(e)
+ # XXX: need to figure out close after error
+ self.conn._remove_session(self.ssn)
+
+ def testNodeBindingsQueue(self):
+ snd = self.ssn.sender("""
+test-node-bindings-queue; {
+ create: always,
+ delete: always,
+ node: {
+ x-bindings: [{exchange: "amq.topic", key: "a.#"},
+ {exchange: "amq.direct", key: "b"},
+ {exchange: "amq.topic", key: "c.*"}]
+ }
+}
+""")
+ snd.send("one")
+ snd_a = self.ssn.sender("amq.topic/a.foo")
+ snd_b = self.ssn.sender("amq.direct/b")
+ snd_c = self.ssn.sender("amq.topic/c.bar")
+ snd_a.send("two")
+ snd_b.send("three")
+ snd_c.send("four")
+ rcv = self.ssn.receiver("test-node-bindings-queue")
+ self.drain(rcv, expected=["one", "two", "three", "four"])
+
+ def testNodeBindingsTopic(self):
+ rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
+ rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
+ rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
+ rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
+ snd = self.ssn.sender("""
+test-node-bindings-topic; {
+ create: always,
+ delete: always,
+ node: {
+ type: topic,
+ x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
+ {queue: test-node-bindings-topic-queue-a, key: "a.#"},
+ {queue: test-node-bindings-topic-queue-b, key: "b"},
+ {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
+ }
+}
+""")
+ m1 = Message("one")
+ m2 = Message(subject="a.foo", content="two")
+ m3 = Message(subject="b", content="three")
+ m4 = Message(subject="c.bar", content="four")
+ snd.send(m1)
+ snd.send(m2)
+ snd.send(m3)
+ snd.send(m4)
+ self.drain(rcv, expected=[m1, m2, m3, m4])
+ self.drain(rcv_a, expected=[m2])
+ self.drain(rcv_b, expected=[m3])
+ self.drain(rcv_c, expected=[m4])
+
+ def testLinkBindings(self):
+ m_a = self.message("testLinkBindings", 1, subject="a")
+ m_b = self.message("testLinkBindings", 2, subject="b")
+
+ self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
+ snd = self.ssn.sender("amq.topic")
+
+ snd.send(m_a)
+ snd.send(m_b)
+ snd.close()
+
+ rcv = self.ssn.receiver("test-link-bindings-queue")
+ self.assertEmpty(rcv)
+
+ snd = self.ssn.sender("""
+amq.topic; {
+ link: {
+ x-bindings: [{queue: test-link-bindings-queue, key: a}]
+ }
+}
+""")
+
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a])
+ rcv.close()
+
+ rcv = self.ssn.receiver("""
+test-link-bindings-queue; {
+ link: {
+ x-bindings: [{exchange: "amq.topic", key: b}]
+ }
+}
+""")
+
+ snd.send(m_a)
+ snd.send(m_b)
+
+ self.drain(rcv, expected=[m_a, m_b])
+
+ def testSubjectOverride(self):
+ snd = self.ssn.sender("amq.topic/a")
+ rcv_a = self.ssn.receiver("amq.topic/a")
+ rcv_b = self.ssn.receiver("amq.topic/b")
+ m1 = self.content("testSubjectOverride", 1)
+ m2 = self.content("testSubjectOverride", 2)
+ snd.send(m1)
+ snd.send(Message(subject="b", content=m2))
+ self.drain(rcv_a, expected=[m1])
+ self.drain(rcv_b, expected=[m2])
+
+ def testSubjectDefault(self):
+ m1 = self.content("testSubjectDefault", 1)
+ m2 = self.content("testSubjectDefault", 2)
+ snd = self.ssn.sender("amq.topic/a")
+ rcv = self.ssn.receiver("amq.topic")
+ snd.send(m1)
+ snd.send(Message(subject="b", content=m2))
+ e1 = rcv.fetch(timeout=0)
+ e2 = rcv.fetch(timeout=0)
+ assert e1.subject == "a", "subject: %s" % e1.subject
+ assert e2.subject == "b", "subject: %s" % e2.subject
+ self.assertEmpty(rcv)
+
+ def doReliabilityTest(self, reliability, messages, expected):
+ snd = self.ssn.sender("amq.topic")
+ rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
+ for m in messages:
+ snd.send(m)
+ self.conn.detach()
+ self.conn.attach()
+ self.drain(rcv, expected=expected)
+
+ def testReliabilityUnreliable(self):
+ msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
+ self.doReliabilityTest("unreliable", msgs, [])
+
+ def testReliabilityAtLeastOnce(self):
+ msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
+ self.doReliabilityTest("at-least-once", msgs, msgs)
+
+ def testLinkName(self):
+ msgs = [self.message("testLinkName", i) for i in range(3)]
+ snd = self.ssn.sender("amq.topic")
+ trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
+ qrcv = self.ssn.receiver("test-link-name")
+ for m in msgs:
+ snd.send(m)
+ self.drain(qrcv, expected=msgs)
+
+ def testAssert1(self):
+ try:
+ snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
+ assert 0, "assertion failed to trigger"
+ except AssertionFailed, e:
+ pass
+
+ def testAssert2(self):
+ snd = self.ssn.sender("amq.topic; {assert: always}")
+
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "name/subject; {bad options"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def senderErrorTest(self, addr, exc, check=lambda e: True):
+ try:
+ self.ssn.sender(addr, durable=self.durable())
+ assert False, "sender creation succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % compat.format_exc(e)
+
+ def receiverErrorTest(self, addr, exc, check=lambda e: True):
+ try:
+ self.ssn.receiver(addr)
+ assert False, "receiver creation succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % compat.format_exc(e)
+
+ def testNoneTarget(self):
+ self.senderErrorTest(None, MalformedAddress)
+
+ def testNoneSource(self):
+ self.receiverErrorTest(None, MalformedAddress)
+
+ def testNoTarget(self):
+ self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
+ lambda e: "expecting COLON" in str(e))
+
+ def testUnparseableSource(self):
+ self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
+ lambda e: "expecting COLON" in str(e))
+
+ def testUnlexableTarget(self):
+ self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress,
+ lambda e: "unrecognized characters" in str(e))
+
+ def testUnlexableSource(self):
+ self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress,
+ lambda e: "unrecognized characters" in str(e))
+
+ def testInvalidMode(self):
+ self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
+ InvalidOption,
+ lambda e: "not in ('browse', 'consume')" in str(e))
+
+SENDER_Q = 'test-sender-q; {create: always, delete: always}'
+
+class SenderTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(SENDER_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(SENDER_Q)
+
+ def checkContent(self, content):
+ self.snd.send(content)
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+
+ out = Message(content)
+ self.snd.send(out)
+ echo = self.rcv.fetch(0)
+ assert out.content == echo.content
+ assert echo.content == msg.content
+ self.ssn.acknowledge()
+
+ def testSendString(self):
+ self.checkContent(self.content("testSendString"))
+
+ def testSendList(self):
+ self.checkContent(["testSendList", 1, 3.14, self.test_id])
+
+ def testSendMap(self):
+ self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
+
+ def asyncTest(self, capacity):
+ self.snd.capacity = capacity
+ msgs = [self.content("asyncTest", i) for i in range(15)]
+ for m in msgs:
+ self.snd.send(m, sync=False)
+ self.drain(self.rcv, timeout=self.delay(), expected=msgs)
+ self.ssn.acknowledge()
+
+ def testSendAsyncCapacity0(self):
+ try:
+ self.asyncTest(0)
+ assert False, "send shouldn't succeed with zero capacity"
+ except InsufficientCapacity:
+ # this is expected
+ pass
+
+ def testSendAsyncCapacity1(self):
+ self.asyncTest(1)
+
+ def testSendAsyncCapacity5(self):
+ self.asyncTest(5)
+
+ def testSendAsyncCapacityUNLIMITED(self):
+ self.asyncTest(UNLIMITED)
+
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.snd.sync()
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"
diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py
new file mode 100644
index 0000000000..297374b82b
--- /dev/null
+++ b/qpid/python/qpid/tests/messaging/message.py
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+
+class MessageTests(Base):
+
+ def testCreateString(self):
+ m = Message("string")
+ assert m.content == "string"
+ assert m.content_type is None
+
+ def testCreateUnicode(self):
+ m = Message(u"unicode")
+ assert m.content == u"unicode"
+ assert m.content_type == "text/plain"
+
+ def testCreateMap(self):
+ m = Message({})
+ assert m.content == {}
+ assert m.content_type == "amqp/map"
+
+ def testCreateList(self):
+ m = Message([])
+ assert m.content == []
+ assert m.content_type == "amqp/list"
+
+ def testContentTypeOverride(self):
+ m = Message()
+ m.content_type = "text/html; charset=utf8"
+ m.content = u"<html/>"
+ assert m.content_type == "text/html; charset=utf8"
+
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
+
+class MessageEchoTests(Base):
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(ECHO_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(ECHO_Q)
+
+ def check(self, msg):
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+ self.assertEcho(msg, echo)
+ self.ssn.acknowledge(echo)
+
+ def testStringContent(self):
+ self.check(Message("string"))
+
+ def testUnicodeContent(self):
+ self.check(Message(u"unicode"))
+
+
+ TEST_MAP = {"key1": "string",
+ "key2": u"unicode",
+ "key3": 3,
+ "key4": -3,
+ "key5": 3.14,
+ "key6": -3.14,
+ "key7": ["one", 2, 3.14],
+ "key8": [],
+ "key9": {"sub-key0": 3},
+ "key10": True,
+ "key11": False,
+ "x-amqp-0-10.app-id": "test-app-id",
+ "x-amqp-0-10.content-encoding": "test-content-encoding"}
+
+ def testMapContent(self):
+ self.check(Message(MessageEchoTests.TEST_MAP))
+
+ def testListContent(self):
+ self.check(Message([]))
+ self.check(Message([1, 2, 3]))
+ self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+ def testProperties(self):
+ msg = Message()
+ msg.subject = "subject"
+ msg.correlation_id = str(self.test_id)
+ msg.durable = True
+ msg.priority = 7
+ msg.ttl = 60
+ msg.properties = MessageEchoTests.TEST_MAP
+ msg.reply_to = "reply-address"
+ self.check(msg)
+
+ def testContentTypeUnknown(self):
+ msg = Message(content_type = "this-content-type-does-not-exist")
+ self.check(msg)
+
+ def testTextPlain(self):
+ self.check(Message(content_type="text/plain", content="asdf"))
+
+ def testTextPlainEmpty(self):
+ self.check(Message(content_type="text/plain"))
+
+ def check_rt(self, addr, expected=None):
+ if expected is None:
+ expected = addr
+ msg = Message(reply_to=addr)
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+ assert echo.reply_to == expected, echo.reply_to
+ self.ssn.acknowledge(echo)
+
+ def testReplyTo(self):
+ self.check_rt("name")
+
+ def testReplyToQueue(self):
+ self.check_rt("name; {node: {type: queue}}", "name")
+
+ def testReplyToQueueSubject(self):
+ self.check_rt("name/subject; {node: {type: queue}}", "name")
+
+ def testReplyToTopic(self):
+ self.check_rt("name; {node: {type: topic}}")
+
+ def testReplyToTopicSubject(self):
+ self.check_rt("name/subject; {node: {type: topic}}")
+
+ def testBooleanEncoding(self):
+ msg = Message({"true": True, "false": False})
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+ self.assertEcho(msg, echo)
+ t = echo.content["true"]
+ f = echo.content["false"]
+ assert isinstance(t, bool), t
+ assert isinstance(f, bool), f
diff --git a/qpid/python/qpid/tests/mimetype.py b/qpid/python/qpid/tests/mimetype.py
new file mode 100644
index 0000000000..22760316f0
--- /dev/null
+++ b/qpid/python/qpid/tests/mimetype.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests import Test
+from qpid.mimetype import lex, parse, ParseError, EOF, WSPACE
+from parser import ParserBase
+
+class MimeTypeTests(ParserBase, Test):
+
+ EXCLUDE = (WSPACE, EOF)
+
+ def do_lex(self, st):
+ return lex(st)
+
+ def do_parse(self, st):
+ return parse(st)
+
+ def valid(self, addr, type=None, subtype=None, parameters=None):
+ ParserBase.valid(self, addr, (type, subtype, parameters))
+
+ def testTypeOnly(self):
+ self.invalid("type", "expecting SLASH, got EOF line:1,4:type")
+
+ def testTypeSubtype(self):
+ self.valid("type/subtype", "type", "subtype", [])
+
+ def testTypeSubtypeParam(self):
+ self.valid("type/subtype ; name=value",
+ "type", "subtype", [("name", "value")])
+
+ def testTypeSubtypeParamComment(self):
+ self.valid("type/subtype ; name(This is a comment.)=value",
+ "type", "subtype", [("name", "value")])
+
+ def testMultipleParams(self):
+ self.valid("type/subtype ; name1=value1 ; name2=value2",
+ "type", "subtype", [("name1", "value1"), ("name2", "value2")])
+
+ def testCaseInsensitivity(self):
+ self.valid("Type/Subtype", "type", "subtype", [])
diff --git a/qpid/python/qpid/tests/parser.py b/qpid/python/qpid/tests/parser.py
new file mode 100644
index 0000000000..a4865cc9fe
--- /dev/null
+++ b/qpid/python/qpid/tests/parser.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.parser import ParseError
+
+class ParserBase:
+
+ def lex(self, addr, *types):
+ toks = [t.type for t in self.do_lex(addr) if t.type not in self.EXCLUDE]
+ assert list(types) == toks, "expected %s, got %s" % (types, toks)
+
+ def valid(self, addr, expected):
+ got = self.do_parse(addr)
+ assert expected == got, "expected %s, got %s" % (expected, got)
+
+ def invalid(self, addr, error=None):
+ try:
+ p = self.do_parse(addr)
+ assert False, "invalid address parsed: %s" % p
+ except ParseError, e:
+ assert error == str(e), "expected %r, got %r" % (error, str(e))
diff --git a/qpid/python/qpid/tests/queue.py b/qpid/python/qpid/tests/queue.py
new file mode 100644
index 0000000000..e12354eb43
--- /dev/null
+++ b/qpid/python/qpid/tests/queue.py
@@ -0,0 +1,71 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import threading, time
+from unittest import TestCase
+from qpid.queue import Queue, Empty, Closed
+
+
+class QueueTest (TestCase):
+
+ # The qpid queue class just provides sime simple extensions to
+ # python's standard queue data structure, so we don't need to test
+ # all the queue functionality.
+
+ def test_listen(self):
+ values = []
+ heard = threading.Event()
+ def listener(x):
+ values.append(x)
+ heard.set()
+
+ q = Queue(0)
+ q.listen(listener)
+ heard.clear()
+ q.put(1)
+ heard.wait()
+ assert values[-1] == 1
+ heard.clear()
+ q.put(2)
+ heard.wait()
+ assert values[-1] == 2
+
+ q.listen(None)
+ q.put(3)
+ assert q.get(3) == 3
+ q.listen(listener)
+
+ heard.clear()
+ q.put(4)
+ heard.wait()
+ assert values[-1] == 4
+
+ def test_close(self):
+ q = Queue(0)
+ q.put(1); q.put(2); q.put(3); q.close()
+ assert q.get() == 1
+ assert q.get() == 2
+ assert q.get() == 3
+ for i in range(10):
+ try:
+ q.get()
+ raise AssertionError("expected Closed")
+ except Closed:
+ pass
diff --git a/qpid/python/qpid/tests/spec010.py b/qpid/python/qpid/tests/spec010.py
new file mode 100644
index 0000000000..ac04e1ee02
--- /dev/null
+++ b/qpid/python/qpid/tests/spec010.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, tempfile, shutil, stat
+from unittest import TestCase
+from qpid.codec010 import Codec, StringCodec
+from qpid.ops import *
+
+class SpecTest(TestCase):
+
+ def testSessionHeader(self):
+ sc = StringCodec()
+ sc.write_compound(Header(sync=True))
+ assert sc.encoded == "\x01\x01"
+
+ sc = StringCodec()
+ sc.write_compound(Header(sync=False))
+ assert sc.encoded == "\x01\x00"
+
+ def encdec(self, value):
+ sc = StringCodec()
+ sc.write_compound(value)
+ decoded = sc.read_compound(value.__class__)
+ return decoded
+
+ def testMessageProperties(self):
+ props = MessageProperties(content_length=3735928559L,
+ reply_to=ReplyTo(exchange="the exchange name",
+ routing_key="the routing key"))
+ dec = self.encdec(props)
+ assert props.content_length == dec.content_length
+ assert props.reply_to.exchange == dec.reply_to.exchange
+ assert props.reply_to.routing_key == dec.reply_to.routing_key
+
+ def testMessageSubscribe(self):
+ cmd = MessageSubscribe(exclusive=True, destination="this is a test")
+ dec = self.encdec(cmd)
+ assert cmd.exclusive == dec.exclusive
+ assert cmd.destination == dec.destination
+
+ def testXid(self):
+ sc = StringCodec()
+ xid = Xid(format=0, global_id="gid", branch_id="bid")
+ sc.write_compound(xid)
+ assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
+ dec = sc.read_compound(Xid)
+ assert xid.__dict__ == dec.__dict__
+
+# def testLoadReadOnly(self):
+# spec = "amqp.0-10-qpid-errata.xml"
+# f = testrunner.get_spec_file(spec)
+# dest = tempfile.mkdtemp()
+# shutil.copy(f, dest)
+# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
+# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
+# fname = os.path.join(dest, spec)
+# load(fname)
+# assert not os.path.exists("%s.pcl" % fname)