diff options
Diffstat (limited to 'qpid/python/qpid/tests')
-rw-r--r-- | qpid/python/qpid/tests/__init__.py | 60 | ||||
-rw-r--r-- | qpid/python/qpid/tests/codec.py | 601 | ||||
-rw-r--r-- | qpid/python/qpid/tests/codec010.py | 133 | ||||
-rw-r--r-- | qpid/python/qpid/tests/connection.py | 227 | ||||
-rw-r--r-- | qpid/python/qpid/tests/datatypes.py | 296 | ||||
-rw-r--r-- | qpid/python/qpid/tests/framing.py | 289 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/__init__.py | 185 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/address.py | 321 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 1335 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/message.py | 155 | ||||
-rw-r--r-- | qpid/python/qpid/tests/mimetype.py | 56 | ||||
-rw-r--r-- | qpid/python/qpid/tests/parser.py | 37 | ||||
-rw-r--r-- | qpid/python/qpid/tests/queue.py | 71 | ||||
-rw-r--r-- | qpid/python/qpid/tests/spec010.py | 74 |
14 files changed, 3840 insertions, 0 deletions
diff --git a/qpid/python/qpid/tests/__init__.py b/qpid/python/qpid/tests/__init__.py new file mode 100644 index 0000000000..101a0c3759 --- /dev/null +++ b/qpid/python/qpid/tests/__init__.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Test: + + def __init__(self, name): + self.name = name + + def configure(self, config): + self.config = config + +# API Tests +import qpid.tests.framing +import qpid.tests.mimetype +import qpid.tests.messaging + +# Legacy Tests +import qpid.tests.codec +import qpid.tests.queue +import qpid.tests.datatypes +import qpid.tests.connection +import qpid.tests.spec010 +import qpid.tests.codec010 + +class TestTestsXXX(Test): + + def testFoo(self): + print "this test has output" + + def testBar(self): + print "this test "*8 + print "has"*10 + print "a"*75 + print "lot of"*10 + print "output"*10 + + def testQux(self): + import sys + sys.stdout.write("this test has output with no newline") + + def testQuxFail(self): + import sys + sys.stdout.write("this test has output with no newline") + fdsa diff --git a/qpid/python/qpid/tests/codec.py b/qpid/python/qpid/tests/codec.py new file mode 100644 index 0000000000..8fd0528636 --- /dev/null +++ b/qpid/python/qpid/tests/codec.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from qpid.codec import Codec +from qpid.spec08 import load +from cStringIO import StringIO +from qpid.reference import ReferenceId + +__doc__ = """ + + This is a unit test script for qpid/codec.py + + It can be run standalone or as part of the existing test framework. + + To run standalone: + ------------------- + + Place in the qpid/python/tests/ directory and type... + + python codec.py + + A brief output will be printed on screen. The verbose output will be placed inn a file called + codec_unit_test_output.txt. [TODO: make this filename configurable] + + To run as part of the existing test framework: + ----------------------------------------------- + + python run-tests tests.codec + + Change History: + ----------------- + Jimmy John 05/19/2007 Initial draft + Jimmy John 05/22/2007 Implemented comments by Rafael Schloming + + +""" + +from qpid.specs_config import amqp_spec_0_8 +SPEC = load(amqp_spec_0_8) + +# -------------------------------------- +# -------------------------------------- +class BaseDataTypes(unittest.TestCase): + + + """ + Base class containing common functions + """ + + # --------------- + def setUp(self): + """ + standard setUp for unitetest (refer unittest documentation for details) + """ + self.codec = Codec(StringIO(), SPEC) + + # ------------------ + def tearDown(self): + """ + standard tearDown for unitetest (refer unittest documentation for details) + """ + self.codec.stream.flush() + self.codec.stream.close() + + # ---------------------------------------- + def callFunc(self, functionName, *args): + """ + helper function - given a function name and arguments, calls the function with the args and + returns the contents of the stream + """ + getattr(self.codec, functionName)(args[0]) + return self.codec.stream.getvalue() + + # ---------------------------------------- + def readFunc(self, functionName, *args): + """ + helper function - creates a input stream and then calls the function with arguments as have been + supplied + """ + self.codec.stream = StringIO(args[0]) + return getattr(self.codec, functionName)() + + +# ---------------------------------------- +# ---------------------------------------- +class IntegerTestCase(BaseDataTypes): + + """ + Handles octet, short, long, long long + + """ + + # ------------------------- + def __init__(self, *args): + """ + sets constants for use in tests + """ + + BaseDataTypes.__init__(self, *args) + self.const_integer = 2 + self.const_integer_octet_encoded = '\x02' + self.const_integer_short_encoded = '\x00\x02' + self.const_integer_long_encoded = '\x00\x00\x00\x02' + self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' + + # -------------------------- # + # Unsigned Octect - 8 bits # + # -------------------------- # + + # -------------------------- + def test_unsigned_octet(self): + """ + ubyte format requires 0<=number<=255 + """ + self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') + + # ------------------------------------------- + def test_octet_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_octet, 256) + + # ------------------------------------------- + def test_uoctet_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_octet, -1) + + # --------------------------------- + def test_uoctet_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') + + # ------------------------------------ + def test_unsigned_octet_decode(self): + """ + octet decoding + """ + self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') + + # ----------------------------------- # + # Unsigned Short Integers - 16 bits # + # ----------------------------------- # + + # ----------------------- + def test_ushort_int(self): + """ + testing unsigned short integer + """ + self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') + + # ------------------------------------------- + def test_ushort_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_short, 65536) + + # ------------------------------------------- + def test_ushort_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_short, -1) + + # --------------------------------- + def test_ushort_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') + + # ------------------------------------ + def test_ushort_int_decode(self): + """ + unsigned short decoding + """ + self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') + + + # ---------------------------------- # + # Unsigned Long Integers - 32 bits # + # ---------------------------------- # + + # ----------------------- + def test_ulong_int(self): + """ + testing unsigned long iteger + """ + self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') + + # ------------------------------------------- + def test_ulong_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) + + # ------------------------------------------- + def test_ulong_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_long, -1) + + # --------------------------------- + def test_ulong_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') + + # ------------------------------- + def test_ulong_int_decode(self): + """ + unsigned long decoding + """ + self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') + + + # --------------------------------------- # + # Unsigned Long Long Integers - 64 bits # + # --------------------------------------- # + + # ----------------------- + def test_ulong_long_int(self): + """ + testing unsinged long long integer + """ + self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') + + # ------------------------------------------- + def test_ulong_long_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) + + # ------------------------------------------- + def test_ulong_long_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) + + # --------------------------------- + def test_ulong_long_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') + + # ------------------------------------ + def test_ulong_long_int_decode(self): + """ + unsigned long long decoding + """ + self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') + +# ----------------------------------- +# ----------------------------------- +class BitTestCase(BaseDataTypes): + + """ + Handles bits + """ + + # ---------------------------------------------- + def callFunc(self, functionName, *args): + """ + helper function + """ + for ele in args: + getattr(self.codec, functionName)(ele) + + self.codec.flush() + return self.codec.stream.getvalue() + + # ------------------- + def test_bit1(self): + """ + sends in 11 + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') + + # ------------------- + def test_bit2(self): + """ + sends in 10011 + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') + + # ------------------- + def test_bit3(self): + """ + sends in 1110100111 [10 bits(right to left), should be compressed into two octets] + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') + + # ------------------------------------ + def test_bit_decode_1(self): + """ + decode bit 1 + """ + self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') + + # ------------------------------------ + def test_bit_decode_0(self): + """ + decode bit 0 + """ + self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') + +# ----------------------------------- +# ----------------------------------- +class StringTestCase(BaseDataTypes): + + """ + Handles short strings, long strings + """ + + # ------------------------------------------------------------- # + # Short Strings - 8 bit length followed by zero or more octets # + # ------------------------------------------------------------- # + + # --------------------------------------- + def test_short_string_zero_length(self): + """ + 0 length short string + """ + self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') + + # ------------------------------------------- + def test_short_string_positive_length(self): + """ + positive length short string + """ + self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') + + # ------------------------------------------- + def test_short_string_out_of_upper_range(self): + """ + string length > 255 + """ + self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) + + # ------------------------------------ + def test_short_string_decode(self): + """ + short string decode + """ + self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') + + + # ------------------------------------------------------------- # + # Long Strings - 32 bit length followed by zero or more octets # + # ------------------------------------------------------------- # + + # --------------------------------------- + def test_long_string_zero_length(self): + """ + 0 length long string + """ + self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') + + # ------------------------------------------- + def test_long_string_positive_length(self): + """ + positive length long string + """ + self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') + + # ------------------------------------ + def test_long_string_decode(self): + """ + long string decode + """ + self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') + + +# -------------------------------------- +# -------------------------------------- +class TimestampTestCase(BaseDataTypes): + + """ + No need of any test cases here as timestamps are implemented as long long which is tested above + """ + pass + +# --------------------------------------- +# --------------------------------------- +class FieldTableTestCase(BaseDataTypes): + + """ + Handles Field Tables + + Only S/I type messages seem to be implemented currently + """ + + # ------------------------- + def __init__(self, *args): + """ + sets constants for use in tests + """ + + BaseDataTypes.__init__(self, *args) + self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} + self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' + + # ------------------------------------------- + def test_field_table_name_value_pair(self): + """ + valid name value pair + """ + self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') + + # --------------------------------------------------- + def test_field_table_multiple_name_value_pair(self): + """ + multiple name value pair + """ + self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') + + # ------------------------------------ + def test_field_table_decode(self): + """ + field table decode + """ + self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') + + +# ------------------------------------ +# ------------------------------------ +class ContentTestCase(BaseDataTypes): + + """ + Handles Content data types + """ + + # ----------------------------- + def test_content_inline(self): + """ + inline content + """ + self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') + + # -------------------------------- + def test_content_reference(self): + """ + reference content + """ + self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') + + # ------------------------------------ + def test_content_inline_decode(self): + """ + inline content decode + """ + self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') + + # ------------------------------------ + def test_content_reference_decode(self): + """ + reference content decode + """ + self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') + +# ------------------------ # +# Pre - existing test code # +# ------------------------ # + +# --------------------- +def test(type, value): + """ + old test function cut/copy/paste from qpid/codec.py + """ + if isinstance(value, (list, tuple)): + values = value + else: + values = [value] + stream = StringIO() + codec = Codec(stream, SPEC) + for v in values: + codec.encode(type, v) + codec.flush() + enc = stream.getvalue() + stream.reset() + dup = [] + for i in xrange(len(values)): + dup.append(codec.decode(type)) + if values != dup: + raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) + +# ----------------------- +def dotest(type, value): + """ + old test function cut/copy/paste from qpid/codec.py + """ + args = (type, value) + test(*args) + +# ------------- +def oldtests(): + """ + old test function cut/copy/paste from qpid/codec.py + """ + for value in ("1", "0", "110", "011", "11001", "10101", "10011"): + for i in range(10): + dotest("bit", map(lambda x: x == "1", value*i)) + + for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): + dotest("table", value) + + for type in ("octet", "short", "long", "longlong"): + for value in range(0, 256): + dotest(type, value) + + for type in ("shortstr", "longstr"): + for value in ("", "a", "asdf"): + dotest(type, value) + +# ----------------------------------------- +class oldTests(unittest.TestCase): + + """ + class to handle pre-existing test cases + """ + + # --------------------------- + def test_oldtestcases(self): + """ + call the old tests + """ + return oldtests() + +# --------------------------- +# --------------------------- +if __name__ == '__main__': + + codec_test_suite = unittest.TestSuite() + + #adding all the test suites... + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) + + #loading pre-existing test case from qpid/codec.py + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) + + run_output_stream = StringIO() + test_runner = unittest.TextTestRunner(run_output_stream, '', '') + test_result = test_runner.run(codec_test_suite) + + print '\n%d test run...' % (test_result.testsRun) + + if test_result.wasSuccessful(): + print '\nAll tests successful\n' + + if test_result.failures: + print '\n----------' + print '%d FAILURES:' % (len(test_result.failures)) + print '----------\n' + for failure in test_result.failures: + print str(failure[0]) + ' ... FAIL' + + if test_result.errors: + print '\n---------' + print '%d ERRORS:' % (len(test_result.errors)) + print '---------\n' + + for error in test_result.errors: + print str(error[0]) + ' ... ERROR' + + f = open('codec_unit_test_output.txt', 'w') + f.write(str(run_output_stream.getvalue())) + f.close() diff --git a/qpid/python/qpid/tests/codec010.py b/qpid/python/qpid/tests/codec010.py new file mode 100644 index 0000000000..787ebc146f --- /dev/null +++ b/qpid/python/qpid/tests/codec010.py @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time + +from unittest import TestCase +from qpid.codec010 import StringCodec +from qpid.datatypes import timestamp, uuid4 +from qpid.ops import PRIMITIVE + +class CodecTest(TestCase): + + def check(self, type, value, compare=True): + t = PRIMITIVE[type] + sc = StringCodec() + sc.write_primitive(t, value) + decoded = sc.read_primitive(t) + if compare: + assert decoded == value, "%s, %s" % (decoded, value) + return decoded + + def testMapString(self): + self.check("map", {"string": "this is a test"}) + + def testMapUnicode(self): + self.check("map", {"unicode": u"this is a unicode test"}) + + def testMapBinary(self): + self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"}) + + def testMapBuffer(self): + s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5" + dec = self.check("map", {"buffer": buffer(s)}, False) + assert dec["buffer"] == s + + def testMapInt(self): + self.check("map", {"int": 3}) + + def testMapLong(self): + self.check("map", {"long": 2**32}) + self.check("map", {"long": 1 << 34}) + self.check("map", {"long": -(1 << 34)}) + + def testMapTimestamp(self): + decoded = self.check("map", {"timestamp": timestamp(0)}) + assert isinstance(decoded["timestamp"], timestamp) + + def testMapDatetime(self): + decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False) + assert isinstance(decoded["datetime"], timestamp) + assert decoded["datetime"] == 0.0 + + def testMapNone(self): + self.check("map", {"none": None}) + + def testMapNested(self): + self.check("map", {"map": {"string": "nested test"}}) + + def testMapList(self): + self.check("map", {"list": [1, "two", 3.0, -4]}) + + def testMapUUID(self): + self.check("map", {"uuid": uuid4()}) + + def testMapAll(self): + decoded = self.check("map", {"string": "this is a test", + "unicode": u"this is a unicode test", + "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5", + "int": 3, + "long": 2**32, + "timestamp": timestamp(0), + "none": None, + "map": {"string": "nested map"}, + "list": [1, "two", 3.0, -4], + "uuid": uuid4()}) + assert isinstance(decoded["timestamp"], timestamp) + + def testMapEmpty(self): + self.check("map", {}) + + def testMapNone(self): + self.check("map", None) + + def testList(self): + self.check("list", [1, "two", 3.0, -4]) + + def testListEmpty(self): + self.check("list", []) + + def testListNone(self): + self.check("list", None) + + def testArrayInt(self): + self.check("array", [1, 2, 3, 4]) + + def testArrayString(self): + self.check("array", ["one", "two", "three", "four"]) + + def testArrayEmpty(self): + self.check("array", []) + + def testArrayNone(self): + self.check("array", None) + + def testInt16(self): + self.check("int16", 3) + self.check("int16", -3) + + def testInt64(self): + self.check("int64", 3) + self.check("int64", -3) + self.check("int64", 1<<34) + self.check("int64", -(1<<34)) + + def testDatetime(self): + self.check("datetime", timestamp(0)) + self.check("datetime", timestamp(long(time.time()))) diff --git a/qpid/python/qpid/tests/connection.py b/qpid/python/qpid/tests/connection.py new file mode 100644 index 0000000000..6847285f69 --- /dev/null +++ b/qpid/python/qpid/tests/connection.py @@ -0,0 +1,227 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from threading import * +from unittest import TestCase +from qpid.util import connect, listen +from qpid.connection import * +from qpid.datatypes import Message +from qpid.delegates import Server +from qpid.queue import Queue +from qpid.session import Delegate +from qpid.ops import QueueQueryResult + +PORT = 1234 + +class TestServer: + + def __init__(self, queue): + self.queue = queue + + def connection(self, connection): + return Server(connection, delegate=self.session) + + def session(self, session): + session.auto_sync = False + return TestSession(session, self.queue) + +class TestSession(Delegate): + + def __init__(self, session, queue): + self.session = session + self.queue = queue + + def execution_sync(self, es): + pass + + def queue_query(self, qq): + return QueueQueryResult(qq.queue) + + def message_transfer(self, cmd): + if cmd.destination == "echo": + m = Message(cmd.payload) + m.headers = cmd.headers + self.session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elif cmd.destination == "abort": + self.session.channel.connection.sock.close() + elif cmd.destination == "heartbeat": + self.session.channel.connection_heartbeat() + else: + self.queue.put(cmd) + +class ConnectionTest(TestCase): + + def setUp(self): + self.queue = Queue() + self.running = True + started = Event() + + def run(): + ts = TestServer(self.queue) + for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): + conn = Connection(s, delegate=ts.connection) + try: + conn.start(5) + except Closed: + pass + + self.server = Thread(target=run) + self.server.setDaemon(True) + self.server.start() + + started.wait(3) + assert started.isSet() + + def tearDown(self): + self.running = False + connect("127.0.0.1", PORT).close() + self.server.join(3) + + def connect(self, **kwargs): + return Connection(connect("127.0.0.1", PORT), **kwargs) + + def test(self): + c = self.connect() + c.start(10) + + ssn1 = c.session("test1", timeout=10) + ssn2 = c.session("test2", timeout=10) + + assert ssn1 == c.sessions["test1"] + assert ssn2 == c.sessions["test2"] + assert ssn1.channel != None + assert ssn2.channel != None + assert ssn1 in c.attached.values() + assert ssn2 in c.attached.values() + + ssn1.close(5) + + assert ssn1.channel == None + assert ssn1 not in c.attached.values() + assert ssn2 in c.sessions.values() + + ssn2.close(5) + + assert ssn2.channel == None + assert ssn2 not in c.attached.values() + assert ssn2 not in c.sessions.values() + + ssn = c.session("session", timeout=10) + + assert ssn.channel != None + assert ssn in c.sessions.values() + + destinations = ("one", "two", "three") + + for d in destinations: + ssn.message_transfer(d) + + for d in destinations: + cmd = self.queue.get(10) + assert cmd.destination == d + assert cmd.headers == None + assert cmd.payload == None + + msg = Message("this is a test") + ssn.message_transfer("four", message=msg) + cmd = self.queue.get(10) + assert cmd.destination == "four" + assert cmd.headers == None + assert cmd.payload == msg.body + + qq = ssn.queue_query("asdf") + assert qq.queue == "asdf" + c.close(5) + + def testCloseGet(self): + c = self.connect() + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + for i in range(10): + m = echos.get(timeout=10) + assert m.body == "test%d" % i + + try: + m = echos.get(timeout=10) + assert False + except Closed, e: + pass + + def testCloseListen(self): + c = self.connect() + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + condition = Condition() + def listener(m): messages.append(m) + def exc_listener(e): + condition.acquire() + exceptions.append(e) + condition.notify() + condition.release() + + echos.listen(listener, exc_listener) + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + condition.acquire() + start = time.time() + elapsed = 0 + while not exceptions and elapsed < 10: + condition.wait(10 - elapsed) + elapsed = time.time() - start + condition.release() + + for i in range(10): + m = messages.pop(0) + assert m.body == "test%d" % i + + assert len(exceptions) == 1 + + def testSync(self): + c = self.connect() + c.start(10) + s = c.session("test") + s.auto_sync = False + s.message_transfer("echo", message=Message("test")) + s.sync(10) + + def testHeartbeat(self): + c = self.connect(heartbeat=10) + c.start(10) + s = c.session("test") + s.channel.connection_heartbeat() + s.message_transfer("heartbeat") diff --git a/qpid/python/qpid/tests/datatypes.py b/qpid/python/qpid/tests/datatypes.py new file mode 100644 index 0000000000..00e649d6cf --- /dev/null +++ b/qpid/python/qpid/tests/datatypes.py @@ -0,0 +1,296 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from unittest import TestCase +from qpid.datatypes import * +from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties + +class SerialTest(TestCase): + + def test(self): + for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)): + assert s + 1 > s + assert s - 1 < s + assert s < s + 1 + assert s > s - 1 + + assert serial(0xFFFFFFFFL) + 1 == serial(0) + + assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL) + assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0) + + def testIncr(self): + s = serial(0) + s += 1 + assert s == serial(1) + + def testIn(self): + l = [serial(1), serial(2), serial(3), serial(4)] + assert serial(1) in l + assert serial(0xFFFFFFFFL + 2) in l + assert 4 in l + + def testNone(self): + assert serial(0) != None + + def testHash(self): + d = {} + d[serial(0)] = "zero" + assert d[0] == "zero" + + def testAdd(self): + assert serial(2) + 2 == serial(4) + assert serial(2) + 2 == 4 + + def testSub(self): + delta = serial(4) - serial(2) + assert isinstance(delta, int) or isinstance(delta, long) + assert delta == 2 + + delta = serial(4) - 2 + assert isinstance(delta, Serial) + assert delta == serial(2) + +class RangedSetTest(TestCase): + + def check(self, ranges): + posts = [] + for range in ranges: + posts.append(range.lower) + posts.append(range.upper) + + sorted = posts[:] + sorted.sort() + + assert posts == sorted + + idx = 1 + while idx + 1 < len(posts): + assert posts[idx] + 1 != posts[idx+1] + idx += 2 + + def test(self): + rs = RangedSet() + + self.check(rs.ranges) + + rs.add(1) + + assert 1 in rs + assert 2 not in rs + assert 0 not in rs + self.check(rs.ranges) + + rs.add(2) + + assert 0 not in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + self.check(rs.ranges) + + rs.add(0) + + assert -1 not in rs + assert 0 in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + self.check(rs.ranges) + + rs.add(37) + + assert -1 not in rs + assert 0 in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + assert 36 not in rs + assert 37 in rs + assert 38 not in rs + self.check(rs.ranges) + + rs.add(-1) + self.check(rs.ranges) + + rs.add(-3) + self.check(rs.ranges) + + rs.add(1, 20) + assert 21 not in rs + assert 20 in rs + self.check(rs.ranges) + + def testAddSelf(self): + a = RangedSet() + a.add(0, 8) + self.check(a.ranges) + a.add(0, 8) + self.check(a.ranges) + assert len(a.ranges) == 1 + range = a.ranges[0] + assert range.lower == 0 + assert range.upper == 8 + + def testEmpty(self): + s = RangedSet() + assert s.empty() + s.add(0, -1) + assert s.empty() + s.add(0, 0) + assert not s.empty() + + def testMinMax(self): + s = RangedSet() + assert s.max() is None + assert s.min() is None + s.add(0, 10) + assert s.max() == 10 + assert s.min() == 0 + s.add(0, 5) + assert s.max() == 10 + assert s.min() == 0 + s.add(0, 11) + assert s.max() == 11 + assert s.min() == 0 + s.add(15, 20) + assert s.max() == 20 + assert s.min() == 0 + s.add(-10, -5) + assert s.max() == 20 + assert s.min() == -10 + +class RangeTest(TestCase): + + def testIntersect1(self): + a = Range(0, 10) + b = Range(9, 20) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert i1.upper == 10 + assert i2.upper == 10 + assert i1.lower == 9 + assert i2.lower == 9 + + def testIntersect2(self): + a = Range(0, 10) + b = Range(11, 20) + assert a.intersect(b) == None + assert b.intersect(a) == None + + def testIntersect3(self): + a = Range(0, 10) + b = Range(3, 5) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert i1.upper == 5 + assert i2.upper == 5 + assert i1.lower == 3 + assert i2.lower == 3 + +class UUIDTest(TestCase): + + def test(self): + # this test is kind of lame, but it does excercise the basic + # functionality of the class + u = uuid4() + for i in xrange(1024): + assert u != uuid4() + +class MessageTest(TestCase): + + def setUp(self): + self.mp = MessageProperties() + self.dp = DeliveryProperties() + self.fp = FragmentProperties() + + def testHas(self): + m = Message(self.mp, self.dp, self.fp, "body") + assert m.has("message_properties") + assert m.has("delivery_properties") + assert m.has("fragment_properties") + + def testGet(self): + m = Message(self.mp, self.dp, self.fp, "body") + assert m.get("message_properties") == self.mp + assert m.get("delivery_properties") == self.dp + assert m.get("fragment_properties") == self.fp + + def testSet(self): + m = Message(self.mp, self.dp, "body") + assert m.get("fragment_properties") is None + m.set(self.fp) + assert m.get("fragment_properties") == self.fp + + def testSetOnEmpty(self): + m = Message("body") + assert m.get("delivery_properties") is None + m.set(self.dp) + assert m.get("delivery_properties") == self.dp + + def testSetReplace(self): + m = Message(self.mp, self.dp, self.fp, "body") + dp = DeliveryProperties() + assert m.get("delivery_properties") == self.dp + assert m.get("delivery_properties") != dp + m.set(dp) + assert m.get("delivery_properties") != self.dp + assert m.get("delivery_properties") == dp + + def testClear(self): + m = Message(self.mp, self.dp, self.fp, "body") + assert m.get("message_properties") == self.mp + assert m.get("delivery_properties") == self.dp + assert m.get("fragment_properties") == self.fp + m.clear("fragment_properties") + assert m.get("fragment_properties") is None + assert m.get("message_properties") == self.mp + assert m.get("delivery_properties") == self.dp + +class TimestampTest(TestCase): + + def check(self, expected, *values): + for v in values: + assert isinstance(v, timestamp) + assert v == expected + assert v == timestamp(expected) + + def testAdd(self): + self.check(4.0, + timestamp(2.0) + 2.0, + 2.0 + timestamp(2.0)) + + def testSub(self): + self.check(2.0, + timestamp(4.0) - 2.0, + 4.0 - timestamp(2.0)) + + def testNeg(self): + self.check(-4.0, -timestamp(4.0)) + + def testPos(self): + self.check(+4.0, +timestamp(4.0)) + + def testAbs(self): + self.check(4.0, abs(timestamp(-4.0))) + + def testConversion(self): + dt = timestamp(0).datetime() + t = timestamp(dt) + assert t == 0 diff --git a/qpid/python/qpid/tests/framing.py b/qpid/python/qpid/tests/framing.py new file mode 100644 index 0000000000..0b33df8b9a --- /dev/null +++ b/qpid/python/qpid/tests/framing.py @@ -0,0 +1,289 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# setup, usage, teardown, errors(sync), errors(async), stress, soak, +# boundary-conditions, config + +from qpid.tests import Test +from qpid.framing import * + +class Base(Test): + + def cmp_frames(self, frm1, frm2): + assert frm1.flags == frm2.flags, "expected: %r, got %r" % (frm1, frm2) + assert frm1.type == frm2.type, "expected: %r, got %r" % (frm1, frm2) + assert frm1.track == frm2.track, "expected: %r, got %r" % (frm1, frm2) + assert frm1.channel == frm2.channel, "expected: %r, got %r" % (frm1, frm2) + assert frm1.payload == frm2.payload, "expected: %r, got %r" % (frm1, frm2) + + def cmp_segments(self, seg1, seg2): + assert seg1.first == seg2.first, "expected: %r, got %r" % (seg1, seg2) + assert seg1.last == seg2.last, "expected: %r, got %r" % (seg1, seg2) + assert seg1.type == seg2.type, "expected: %r, got %r" % (seg1, seg2) + assert seg1.track == seg2.track, "expected: %r, got %r" % (seg1, seg2) + assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2) + assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2) + + def cmp_list(self, l1, l2): + if l1 is None: + assert l2 is None + return + + assert len(l1) == len(l2) + for v1, v2 in zip(l1, l2): + if isinstance(v1, Compound): + self.cmp_ops(v1, v2) + else: + assert v1 == v2 + + def cmp_ops(self, op1, op2): + if op1 is None: + assert op2 is None + return + + assert op1.__class__ == op2.__class__ + cls = op1.__class__ + assert op1.NAME == op2.NAME + assert op1.CODE == op2.CODE + assert op1.FIELDS == op2.FIELDS + for f in cls.FIELDS: + v1 = getattr(op1, f.name) + v2 = getattr(op2, f.name) + if COMPOUND.has_key(f.type) or f.type == "struct32": + self.cmp_ops(v1, v2) + elif f.type in ("list", "array"): + self.cmp_list(v1, v2) + else: + assert v1 == v2, "expected: %r, got %r" % (v1, v2) + + if issubclass(cls, Command) or issubclass(cls, Control): + assert op1.channel == op2.channel + + if issubclass(cls, Command): + assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync) + assert (op1.headers is None and op2.headers is None) or \ + (op1.headers is not None and op2.headers is not None) + if op1.headers is not None: + assert len(op1.headers) == len(op2.headers) + for h1, h2 in zip(op1.headers, op2.headers): + self.cmp_ops(h1, h2) + +class FrameTest(Base): + + def enc_dec(self, frames, encoded=None): + enc = FrameEncoder() + dec = FrameDecoder() + + enc.write(*frames) + bytes = enc.read() + if encoded is not None: + assert bytes == encoded, "expected %r, got %r" % (encoded, bytes) + dec.write(bytes) + dframes = dec.read() + + assert len(frames) == len(dframes) + for f, df, in zip(frames, dframes): + self.cmp_frames(f, df) + + def testEmpty(self): + self.enc_dec([Frame(0, 0, 0, 0, "")], + "\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x00") + + def testSingle(self): + self.enc_dec([Frame(0, 0, 0, 1, "payload")], + "\x00\x00\x00\x13\x00\x00\x00\x01\x00\x00\x00\x00payload") + + def testMaxChannel(self): + self.enc_dec([Frame(0, 0, 0, 65535, "max-channel")], + "\x00\x00\x00\x17\x00\x00\xff\xff\x00\x00\x00\x00max-channel") + + def testMaxType(self): + self.enc_dec([Frame(0, 255, 0, 0, "max-type")], + "\x00\xff\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00max-type") + + def testMaxTrack(self): + self.enc_dec([Frame(0, 0, 15, 0, "max-track")], + "\x00\x00\x00\x15\x00\x0f\x00\x00\x00\x00\x00\x00max-track") + + def testSequence(self): + self.enc_dec([Frame(0, 0, 0, 0, "zero"), + Frame(0, 0, 0, 1, "one"), + Frame(0, 0, 1, 0, "two"), + Frame(0, 0, 1, 1, "three"), + Frame(0, 1, 0, 0, "four"), + Frame(0, 1, 0, 1, "five"), + Frame(0, 1, 1, 0, "six"), + Frame(0, 1, 1, 1, "seven"), + Frame(1, 0, 0, 0, "eight"), + Frame(1, 0, 0, 1, "nine"), + Frame(1, 0, 1, 0, "ten"), + Frame(1, 0, 1, 1, "eleven"), + Frame(1, 1, 0, 0, "twelve"), + Frame(1, 1, 0, 1, "thirteen"), + Frame(1, 1, 1, 0, "fourteen"), + Frame(1, 1, 1, 1, "fifteen")]) + +class SegmentTest(Base): + + def enc_dec(self, segments, frames=None, interleave=None, max_payload=Frame.MAX_PAYLOAD): + enc = SegmentEncoder(max_payload) + dec = SegmentDecoder() + + enc.write(*segments) + frms = enc.read() + if frames is not None: + assert len(frames) == len(frms), "expected %s, got %s" % (frames, frms) + for f1, f2 in zip(frames, frms): + self.cmp_frames(f1, f2) + if interleave is not None: + ilvd = [] + for f in frms: + ilvd.append(f) + if interleave: + ilvd.append(interleave.pop(0)) + ilvd.extend(interleave) + dec.write(*ilvd) + else: + dec.write(*frms) + segs = dec.read() + assert len(segments) == len(segs) + for s1, s2 in zip(segments, segs): + self.cmp_segments(s1, s2) + + def testEmpty(self): + self.enc_dec([Segment(True, True, 0, 0, 0, "")], + [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0, + "")]) + + def testSingle(self): + self.enc_dec([Segment(True, True, 0, 0, 0, "payload")], + [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0, + "payload")]) + + def testMaxChannel(self): + self.enc_dec([Segment(False, False, 0, 0, 65535, "max-channel")], + [Frame(FIRST_FRM | LAST_FRM, 0, 0, 65535, "max-channel")]) + + def testMaxType(self): + self.enc_dec([Segment(False, False, 255, 0, 0, "max-type")], + [Frame(FIRST_FRM | LAST_FRM, 255, 0, 0, "max-type")]) + + def testMaxTrack(self): + self.enc_dec([Segment(False, False, 0, 15, 0, "max-track")], + [Frame(FIRST_FRM | LAST_FRM, 0, 15, 0, "max-track")]) + + def testSequence(self): + self.enc_dec([Segment(True, False, 0, 0, 0, "one"), + Segment(False, False, 0, 0, 0, "two"), + Segment(False, True, 0, 0, 0, "three")], + [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG, 0, 0, 0, "one"), + Frame(FIRST_FRM | LAST_FRM, 0, 0, 0, "two"), + Frame(FIRST_FRM | LAST_FRM | LAST_SEG, 0, 0, 0, "three")]) + + def testInterleaveChannel(self): + frames = [Frame(0, 0, 0, 0, chr(ord("a") + i)) for i in range(7)] + frames[0].flags |= FIRST_FRM + frames[-1].flags |= LAST_FRM + + ilvd = [Frame(0, 0, 0, 1, chr(ord("a") + i)) for i in range(7)] + + self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefg")], frames, ilvd, max_payload=1) + + def testInterleaveTrack(self): + frames = [Frame(0, 0, 0, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1)) + for i in range(0, 8, 2)] + frames[0].flags |= FIRST_FRM + frames[-1].flags |= LAST_FRM + + ilvd = [Frame(0, 0, 1, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1)) + for i in range(0, 8, 2)] + + self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2) + +from qpid.ops import * + +class OpTest(Base): + + def enc_dec(self, ops): + enc = OpEncoder() + dec = OpDecoder() + enc.write(*ops) + segs = enc.read() + dec.write(*segs) + dops = dec.read() + assert len(ops) == len(dops) + for op1, op2 in zip(ops, dops): + self.cmp_ops(op1, op2) + + def testEmtpyMT(self): + self.enc_dec([MessageTransfer()]) + + def testEmptyMTSync(self): + self.enc_dec([MessageTransfer(sync=True)]) + + def testMT(self): + self.enc_dec([MessageTransfer(destination="asdf")]) + + def testSyncMT(self): + self.enc_dec([MessageTransfer(destination="asdf", sync=True)]) + + def testEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(payload="")]) + + def testPayloadMT(self): + self.enc_dec([MessageTransfer(payload="test payload")]) + + def testHeadersEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])]) + + def testHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")]) + + def testMultiHeadersEmptyPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])]) + + def testMultiHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")]) + + def testContentTypeHeadersPayloadMT(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")]) + + def testMulti(self): + self.enc_dec([MessageTransfer(), + MessageTransfer(sync=True), + MessageTransfer(destination="one"), + MessageTransfer(destination="two", sync=True), + MessageTransfer(destination="three", payload="test payload")]) + + def testControl(self): + self.enc_dec([SessionAttach(name="asdf")]) + + def testMixed(self): + self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")]) + + def testChannel(self): + self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)]) + + def testCompound(self): + self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])]) + + def testListCompound(self): + self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"), + Xid(global_id="two"), + Xid(global_id="three")]))]) diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py new file mode 100644 index 0000000000..8f6680d5e3 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -0,0 +1,185 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time +from math import ceil +from qpid.harness import Skipped +from qpid.messaging import * +from qpid.tests import Test + +class Base(Test): + + def setup_connection(self): + return None + + def setup_session(self): + return None + + def setup_sender(self): + return None + + def setup_receiver(self): + return None + + def setup(self): + self.test_id = uuid4() + self.broker = self.config.broker + try: + self.conn = self.setup_connection() + except ConnectError, e: + raise Skipped(e) + self.ssn = self.setup_session() + self.snd = self.setup_sender() + if self.snd is not None: + self.snd.durable = self.durable() + self.rcv = self.setup_receiver() + + def teardown(self): + if self.conn is not None and self.conn.attached(): + self.teardown_connection(self.conn) + self.conn = None + + def teardown_connection(self, conn): + conn.close(timeout=self.timeout()) + + def content(self, base, count = None): + if count is None: + return "%s[%s]" % (base, self.test_id) + else: + return "%s[%s, %s]" % (base, count, self.test_id) + + def message(self, base, count = None, **kwargs): + return Message(content=self.content(base, count), **kwargs) + + def ping(self, ssn): + PING_Q = 'ping-queue; {create: always, delete: always}' + # send a message + sender = ssn.sender(PING_Q, durable=self.durable()) + content = self.content("ping") + sender.send(content) + receiver = ssn.receiver(PING_Q) + msg = receiver.fetch(0) + ssn.acknowledge() + assert msg.content == content, "expected %r, got %r" % (content, msg.content) + + def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False): + messages = [] + try: + while limit is None or len(messages) < limit: + messages.append(rcv.fetch(timeout=timeout)) + except Empty: + pass + if expected is not None: + self.assertEchos(expected, messages, redelivered) + return messages + + def diff(self, m1, m2, excluded_properties=()): + result = {} + for attr in ("id", "subject", "user_id", "reply_to", + "correlation_id", "durable", "priority", "ttl", + "redelivered", "content_type", "content"): + a1 = getattr(m1, attr) + a2 = getattr(m2, attr) + if a1 != a2: + result[attr] = (a1, a2) + p1 = dict(m1.properties) + p2 = dict(m2.properties) + for ep in excluded_properties: + p1.pop(ep, None) + p2.pop(ep, None) + if p1 != p2: + result["properties"] = (p1, p2) + return result + + def assertEcho(self, msg, echo, redelivered=False): + if not isinstance(msg, Message) or not isinstance(echo, Message): + if isinstance(msg, Message): + msg = msg.content + if isinstance(echo, Message): + echo = echo.content + assert msg == echo, "expected %s, got %s" % (msg, echo) + else: + delta = self.diff(msg, echo, ("x-amqp-0-10.routing-key",)) + mttl, ettl = delta.pop("ttl", (0, 0)) + if redelivered: + assert echo.redelivered, \ + "expected %s to be redelivered: %s" % (msg, echo) + if delta.has_key("redelivered"): + del delta["redelivered"] + assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl) + assert mttl >= ettl, "%s, %s" % (mttl, ettl) + assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta) + + def assertEchos(self, msgs, echoes, redelivered=False): + assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes) + for m, e in zip(msgs, echoes): + self.assertEcho(m, e, redelivered) + + def assertEmpty(self, rcv): + contents = self.drain(rcv) + assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) + + def assertAvailable(self, rcv, expected=None, lower=None, upper=None): + if expected is not None: + if lower is not None or upper is not None: + raise ValueError("cannot specify lower or upper when specifying expected") + lower = expected + upper = expected + else: + if lower is None: + lower = int(ceil(rcv.threshold*rcv.capacity)) + if upper is None: + upper = rcv.capacity + + p = rcv.available() + if upper == lower: + assert p == lower, "expected %s, got %s" % (lower, p) + else: + assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper) + + def sleep(self): + time.sleep(self.delay()) + + def delay(self): + return float(self.config.defines.get("delay", "2")) + + def timeout(self): + return float(self.config.defines.get("timeout", "60")) + + def get_bool(self, name): + return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") + + def durable(self): + return self.get_bool("durable") + + def reconnect(self): + return self.get_bool("reconnect") + + + def transport(self): + if self.broker.scheme == self.broker.AMQPS: + return "ssl" + else: + return "tcp" + + def connection_options(self): + return {"reconnect": self.reconnect(), + "transport": self.transport()} + +import address, endpoints, message diff --git a/qpid/python/qpid/tests/messaging/address.py b/qpid/python/qpid/tests/messaging/address.py new file mode 100644 index 0000000000..aa9562a717 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/address.py @@ -0,0 +1,321 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +from qpid.tests import Test +from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, \ + SYM, WSPACE, LEXER +from qpid.lexer import Token +from qpid.harness import Skipped +from qpid.tests.parser import ParserBase + +def indent(st): + return " " + st.replace("\n", "\n ") + +def pprint_address(name, subject, options): + return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \ + (pprint(name), pprint(subject), pprint(options)) + +def pprint(o): + if isinstance(o, dict): + return pprint_map(o) + elif isinstance(o, list): + return pprint_list(o) + elif isinstance(o, basestring): + return pprint_string(o) + else: + return repr(o) + +def pprint_map(m): + items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()] + items.sort() + return pprint_items("{", items, "}") + +def pprint_list(l): + return pprint_items("[", [pprint(x) for x in l], "]") + +def pprint_items(start, items, end): + if items: + return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end) + else: + return "%s%s" % (start, end) + +def pprint_string(s): + result = "'" + for c in s: + if c == "'": + result += "\\'" + elif c == "\n": + result += "\\n" + elif ord(c) >= 0x80: + result += "\\u%04x" % ord(c) + else: + result += c + result += "'" + return result + +class AddressTests(ParserBase, Test): + + EXCLUDE = (WSPACE, EOF) + + def fields(self, line, n): + result = line.split(":", n - 1) + result.extend([None]*(n - len(result))) + return result + + def call(self, parser, mode, input): + try: + from subprocess import Popen, PIPE, STDOUT + po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT) + except ImportError, e: + raise Skipped("%s" % e) + except OSError, e: + raise Skipped("%s: %s" % (e, parser)) + out, _ = po.communicate(input=input) + return out + + def parser(self): + return self.config.defines.get("address.parser") + + def do_lex(self, st): + parser = self.parser() + if parser: + out = self.call(parser, "lex", st) + lines = out.split("\n") + toks = [] + for line in lines: + if line.strip(): + name, position, value = self.fields(line, 3) + toks.append(Token(LEXER.type(name), value, position, st)) + return toks + else: + return lex(st) + + def do_parse(self, st): + return parse(st) + + def valid(self, addr, name=None, subject=None, options=None): + parser = self.parser() + if parser: + got = self.call(parser, "parse", addr) + expected = "%s\n" % pprint_address(name, subject, options) + assert expected == got, "expected\n<EXP>%s</EXP>\ngot\n<GOT>%s</GOT>" % (expected, got) + else: + ParserBase.valid(self, addr, (name, subject, options)) + + def invalid(self, addr, error=None): + parser = self.parser() + if parser: + got = self.call(parser, "parse", addr) + expected = "ERROR: %s\n" % error + assert expected == got, "expected %r, got %r" % (expected, got) + else: + ParserBase.invalid(self, addr, error) + + def testDashInId1(self): + self.lex("foo-bar", ID) + + def testDashInId2(self): + self.lex("foo-3", ID) + + def testDashAlone1(self): + self.lex("foo - bar", ID, SYM, ID) + + def testDashAlone2(self): + self.lex("foo - 3", ID, SYM, NUMBER) + + def testLeadingDash(self): + self.lex("-foo", SYM, ID) + + def testTrailingDash(self): + self.lex("foo-", ID, SYM) + + def testNegativeNum(self): + self.lex("-3", NUMBER) + + def testIdNum(self): + self.lex("id1", ID) + + def testIdSpaceNum(self): + self.lex("id 1", ID, NUMBER) + + def testHash(self): + self.valid("foo/bar.#", "foo", "bar.#") + + def testStar(self): + self.valid("foo/bar.*", "foo", "bar.*") + + def testColon(self): + self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf") + + def testOptions(self): + self.valid("foo.bar/baz.qux:moo:arf; {key: value}", + "foo.bar", "baz.qux:moo:arf", {"key": "value"}) + + def testOptionsTrailingComma(self): + self.valid("name/subject; {key: value,}", "name", "subject", + {"key": "value"}) + + def testOptionsNone(self): + self.valid("name/subject; {key: None}", "name", "subject", + {"key": None}) + + def testSemiSubject(self): + self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}", + "foo.bar", "baz.qux;moo:arf", {"key": "value"}) + + def testCommaSubject(self): + self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}") + + def testCommaSubjectOptions(self): + self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar", + "baz.qux.{moo,arf}", {"key": "value"}) + + def testUnbalanced(self): + self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar", + "baz.qux.{moo,arf", {"key": "value"}) + + def testSlashQuote(self): + self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}", + "foo.bar/baz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashHexEsc1(self): + self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}", + "foo.bar\x00baz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashHexEsc2(self): + self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}", + "foo.bar\xffbaz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashHexEsc3(self): + self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}", + "foo.bar\xFFbaz.qux.{moo,arf", + None, {"key": "value"}) + + def testSlashUnicode1(self): + self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}", + u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"}) + + def testSlashUnicode2(self): + self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}", + u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"}) + + def testSlashUnicode3(self): + self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}", + u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"}) + + def testSlashUnicode4(self): + self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}", + u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"}) + + def testNoName(self): + self.invalid("; {key: value}", + "unexpected token SEMI(;) line:1,0:; {key: value}") + + def testEmpty(self): + self.invalid("", "unexpected token EOF line:1,0:") + + def testNoNameSlash(self): + self.invalid("/asdf; {key: value}", + "unexpected token SLASH(/) line:1,0:/asdf; {key: value}") + + def testBadOptions1(self): + self.invalid("name/subject; {", + "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), " + "got EOF line:1,15:name/subject; {") + + def testBadOptions2(self): + self.invalid("name/subject; { 3", + "expecting COLON, got EOF " + "line:1,17:name/subject; { 3") + + def testBadOptions3(self): + self.invalid("name/subject; { key:", + "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF " + "line:1,20:name/subject; { key:") + + def testBadOptions4(self): + self.invalid("name/subject; { key: value", + "expecting (COMMA, RBRACE), got EOF " + "line:1,26:name/subject; { key: value") + + def testBadOptions5(self): + self.invalid("name/subject; { key: value asdf", + "expecting (COMMA, RBRACE), got ID(asdf) " + "line:1,27:name/subject; { key: value asdf") + + def testBadOptions6(self): + self.invalid("name/subject; { key: value,", + "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF " + "line:1,27:name/subject; { key: value,") + + def testBadOptions7(self): + self.invalid("name/subject; { key: value } asdf", + "expecting EOF, got ID(asdf) " + "line:1,29:name/subject; { key: value } asdf") + + def testList1(self): + self.valid("name/subject; { key: [] }", "name", "subject", {"key": []}) + + def testList2(self): + self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']}) + + def testList3(self): + self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject", + {"key": [1, 2, 3]}) + + def testList4(self): + self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject", + {"key": [1, [2, 3], 4]}) + + def testBadList1(self): + self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), " + "got RBRACE(}) line:1,23:name/subject; { key: [ }") + + def testBadList2(self): + self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), " + "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }") + + def testBadList3(self): + self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), " + "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }") + + def testBadList4(self): + self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), " + "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }") + + def testMap1(self): + self.valid("name/subject; { 'key': value }", + "name", "subject", {"key": "value"}) + + def testMap2(self): + self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"}) + + def testMap3(self): + self.valid('name/subject; { "foo.bar": value }', + "name", "subject", {"foo.bar": "value"}) + + def testBoolean(self): + self.valid("name/subject; { true1: True, true2: true, " + "false1: False, false2: false }", + "name", "subject", {"true1": True, "true2": True, + "false1": False, "false2": False}) diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py new file mode 100644 index 0000000000..db5ec03df2 --- /dev/null +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -0,0 +1,1335 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# setup, usage, teardown, errors(sync), errors(async), stress, soak, +# boundary-conditions, config + +import errno, os, socket, sys, time +from qpid import compat +from qpid.compat import set +from qpid.messaging import * +from qpid.messaging.transports import TRANSPORTS +from qpid.tests.messaging import Base +from threading import Thread + +class SetupTests(Base): + + def testEstablish(self): + self.conn = Connection.establish(self.broker, **self.connection_options()) + self.ping(self.conn.session()) + + def testOpen(self): + self.conn = Connection(self.broker, **self.connection_options()) + self.conn.open() + self.ping(self.conn.session()) + + def testOpenReconnectURLs(self): + options = self.connection_options() + options["reconnect_urls"] = [self.broker, self.broker] + self.conn = Connection(self.broker, **options) + self.conn.open() + self.ping(self.conn.session()) + + def testTcpNodelay(self): + self.conn = Connection.establish(self.broker, tcp_nodelay=True) + assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) + + def testConnectError(self): + try: + # Specifying port 0 yields a bad address on Windows; port 4 is unassigned + self.conn = Connection.establish("localhost:4") + assert False, "connect succeeded" + except ConnectError, e: + assert "refused" in str(e) + + def testGetError(self): + self.conn = Connection("localhost:0") + try: + self.conn.open() + assert False, "connect succeeded" + except ConnectError, e: + assert self.conn.get_error() == e + + def use_fds(self): + fds = [] + try: + while True: + fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY)) + except OSError, e: + if e.errno != errno.EMFILE: + raise e + else: + return fds + + def testOpenCloseResourceLeaks(self): + fds = self.use_fds() + try: + for i in range(32): + if fds: os.close(fds.pop()) + for i in xrange(64): + conn = Connection.establish(self.broker, **self.connection_options()) + conn.close() + finally: + while fds: + os.close(fds.pop()) + + def testOpenFailResourceLeaks(self): + fds = self.use_fds() + try: + for i in range(32): + if fds: os.close(fds.pop()) + for i in xrange(64): + conn = Connection("localhost:0", **self.connection_options()) + # XXX: we need to force a waiter to be created for this test + # to work + conn._lock.acquire() + conn._wait(lambda: False, timeout=0.001) + conn._lock.release() + try: + conn.open() + except ConnectError, e: + pass + finally: + while fds: + os.close(fds.pop()) + + def testReconnect(self): + options = self.connection_options() + real = TRANSPORTS["tcp"] + + class flaky: + + def __init__(self, conn, host, port): + self.real = real(conn, host, port) + self.sent_count = 0 + self.recv_count = 0 + + def fileno(self): + return self.real.fileno() + + def reading(self, reading): + return self.real.reading(reading) + + def writing(self, writing): + return self.real.writing(writing) + + def send(self, bytes): + if self.sent_count > 2048: + raise socket.error("fake error") + n = self.real.send(bytes) + self.sent_count += n + return n + + def recv(self, n): + if self.recv_count > 2048: + return "" + bytes = self.real.recv(n) + self.recv_count += len(bytes) + return bytes + + def close(self): + self.real.close() + + TRANSPORTS["flaky"] = flaky + + options["reconnect"] = True + options["reconnect_interval"] = 0 + options["reconnect_limit"] = 100 + options["reconnect_log"] = False + options["transport"] = "flaky" + + self.conn = Connection.establish(self.broker, **options) + ssn = self.conn.session() + snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") + rcv = ssn.receiver(snd.target) + + msgs = [self.message("testReconnect", i) for i in range(20)] + for m in msgs: + snd.send(m) + + content = set() + drained = [] + duplicates = [] + try: + while True: + m = rcv.fetch(timeout=0) + if m.content not in content: + content.add(m.content) + drained.append(m) + else: + duplicates.append(m) + ssn.acknowledge(m) + except Empty: + pass + # XXX: apparently we don't always get duplicates, should figure out why + #assert duplicates, "no duplicates" + assert len(drained) == len(msgs) + for m, d in zip(msgs, drained): + # XXX: we should figure out how to provide proper end to end + # redelivered + self.assertEcho(m, d, d.redelivered) + +class ConnectionTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def testCheckClosed(self): + assert not self.conn.check_closed() + + def testSessionAnon(self): + ssn1 = self.conn.session() + ssn2 = self.conn.session() + self.ping(ssn1) + self.ping(ssn2) + assert ssn1 is not ssn2 + + def testSessionNamed(self): + ssn1 = self.conn.session("one") + ssn2 = self.conn.session("two") + self.ping(ssn1) + self.ping(ssn2) + assert ssn1 is not ssn2 + assert ssn1 is self.conn.session("one") + assert ssn2 is self.conn.session("two") + + def testDetach(self): + ssn = self.conn.session() + self.ping(ssn) + self.conn.detach() + try: + self.ping(ssn) + assert False, "ping succeeded" + except Detached: + # this is the expected failure when pinging on a detached + # connection + pass + self.conn.attach() + self.ping(ssn) + + def testClose(self): + self.conn.close() + assert not self.conn.attached() + + def testSimultaneousClose(self): + ssns = [self.conn.session() for i in range(3)] + for s in ssns: + for i in range(3): + s.receiver("amq.topic") + s.sender("amq.topic") + + def closer(errors): + try: + self.conn.close() + except: + _, e, _ = sys.exc_info() + errors.append(compat.format_exc(e)) + + t1_errors = [] + t2_errors = [] + t1 = Thread(target=lambda: closer(t1_errors)) + t2 = Thread(target=lambda: closer(t2_errors)) + t1.start() + t2.start() + t1.join(self.delay()) + t2.join(self.delay()) + + assert not t1_errors, t1_errors[0] + assert not t2_errors, t2_errors[0] + +class hangable: + + def __init__(self, conn, host, port): + self.tcp = TRANSPORTS["tcp"](conn, host, port) + self.hung = False + + def hang(self): + self.hung = True + + def fileno(self): + return self.tcp.fileno() + + def reading(self, reading): + if self.hung: + return True + else: + return self.tcp.reading(reading) + + def writing(self, writing): + if self.hung: + return False + else: + return self.tcp.writing(writing) + + def send(self, bytes): + if self.hung: + return 0 + else: + return self.tcp.send(bytes) + + def recv(self, n): + if self.hung: + return "" + else: + return self.tcp.recv(n) + + def close(self): + self.tcp.close() + +TRANSPORTS["hangable"] = hangable + +class TimeoutTests(Base): + + def setup_connection(self): + options = self.connection_options() + options["transport"] = "hangable" + return Connection.establish(self.broker, **options) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender("amq.topic") + + def setup_receiver(self): + return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}") + + def teardown_connection(self, conn): + try: + conn.detach(timeout=0) + except Timeout: + pass + + def hang(self): + self.conn._driver._transport.hang() + + def timeoutTest(self, method): + self.hang() + try: + method(timeout=self.delay()) + assert False, "did not time out" + except Timeout: + pass + + def testSenderSync(self): + self.snd.send(self.content("testSenderSync"), sync=False) + self.timeoutTest(self.snd.sync) + + def testSenderClose(self): + self.snd.send(self.content("testSenderClose"), sync=False) + self.timeoutTest(self.snd.close) + + def testReceiverClose(self): + self.timeoutTest(self.rcv.close) + + def testSessionSync(self): + self.snd.send(self.content("testSessionSync"), sync=False) + self.timeoutTest(self.ssn.sync) + + def testSessionClose(self): + self.timeoutTest(self.ssn.close) + + def testConnectionDetach(self): + self.timeoutTest(self.conn.detach) + + def testConnectionClose(self): + self.timeoutTest(self.conn.close) + +ACK_QC = 'test-ack-queue; {create: always}' +ACK_QD = 'test-ack-queue; {delete: always}' + +class SessionTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def testSender(self): + snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', + durable=self.durable()) + snd2 = self.ssn.sender(snd.target, durable=self.durable()) + assert snd is not snd2 + snd2.close() + + content = self.content("testSender") + snd.send(content) + rcv = self.ssn.receiver(snd.target) + msg = rcv.fetch(0) + assert msg.content == content + self.ssn.acknowledge(msg) + + def testReceiver(self): + rcv = self.ssn.receiver('test-rcv-queue; {create: always}') + rcv2 = self.ssn.receiver(rcv.source) + assert rcv is not rcv2 + rcv2.close() + + content = self.content("testReceiver") + snd = self.ssn.sender(rcv.source, durable=self.durable()) + snd.send(content) + msg = rcv.fetch(0) + assert msg.content == content + self.ssn.acknowledge(msg) + snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') + + def testDetachedReceiver(self): + self.conn.detach() + rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") + m = self.content("testDetachedReceiver") + self.conn.attach() + snd = self.ssn.sender("test-dis-rcv-queue") + snd.send(m) + self.drain(rcv, expected=[m]) + + def testNextReceiver(self): + ADDR = 'test-next-rcv-queue; {create: always, delete: always}' + rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + + snd = self.ssn.sender(ADDR) + + msgs = [] + for i in range(10): + content = self.content("testNextReceiver", i) + snd.send(content) + msgs.append(content) + + fetched = [] + try: + while True: + rcv = self.ssn.next_receiver(timeout=self.delay()) + assert rcv in (rcv1, rcv2, rcv3) + assert rcv.available() > 0 + fetched.append(rcv.fetch().content) + except Empty: + pass + assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) + self.ssn.acknowledge() + #we set the capacity to 0 to prevent the deletion of the queue - + #triggered the deletion policy when the first receiver is closed - + #resulting in session exceptions being issued for the remaining + #active subscriptions: + for r in [rcv1, rcv2, rcv3]: + r.capacity = 0 + + # XXX, we need a convenient way to assert that required queues are + # empty on setup, and possibly also to drain queues on teardown + def ackTest(self, acker, ack_capacity=None): + # send a bunch of messages + snd = self.ssn.sender(ACK_QC, durable=self.durable()) + contents = [self.content("ackTest", i) for i in range(15)] + for c in contents: + snd.send(c) + + # drain the queue, verify the messages are there and then close + # without acking + rcv = self.ssn.receiver(ACK_QC) + self.drain(rcv, expected=contents) + self.ssn.close() + + # drain the queue again, verify that they are all the messages + # were requeued, and ack this time before closing + self.ssn = self.conn.session() + if ack_capacity is not None: + self.ssn.ack_capacity = ack_capacity + rcv = self.ssn.receiver(ACK_QC) + self.drain(rcv, expected=contents) + acker(self.ssn) + self.ssn.close() + + # drain the queue a final time and verify that the messages were + # dequeued + self.ssn = self.conn.session() + rcv = self.ssn.receiver(ACK_QD) + self.assertEmpty(rcv) + + def testAcknowledge(self): + self.ackTest(lambda ssn: ssn.acknowledge()) + + def testAcknowledgeAsync(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) + + def testAcknowledgeAsyncAckCap0(self): + try: + try: + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) + assert False, "acknowledge shouldn't succeed with ack_capacity of zero" + except InsufficientCapacity: + pass + finally: + self.ssn.ack_capacity = UNLIMITED + self.drain(self.ssn.receiver(ACK_QD)) + self.ssn.acknowledge() + + def testAcknowledgeAsyncAckCap1(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) + + def testAcknowledgeAsyncAckCap5(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) + + def testAcknowledgeAsyncAckCapUNLIMITED(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) + + def testRelease(self): + msgs = [self.message("testRelease", i) for i in range(3)] + snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) + self.ssn.acknowledge(echos[2], Disposition(RELEASED)) + self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) + self.drain(rcv, expected=msgs[2:3]) + self.ssn.acknowledge() + + def testReject(self): + msgs = [self.message("testReject", i) for i in range(3)] + snd = self.ssn.sender(""" + test-reject-queue; { + create: always, + delete: always, + node: { + x-declare: { + alternate-exchange: 'amq.topic' + } + } + } +""") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + rej = self.ssn.receiver("amq.topic") + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(REJECTED)) + self.ssn.acknowledge(echos[2], + Disposition(REJECTED, code=3, text="test-reject")) + self.drain(rej, expected=msgs[1:]) + self.ssn.acknowledge() + + def send(self, ssn, target, base, count=1): + snd = ssn.sender(target, durable=self.durable()) + messages = [] + for i in range(count): + c = self.message(base, i) + snd.send(c) + messages.append(c) + snd.close() + return messages + + def txTest(self, commit): + TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' + TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' + txssn = self.conn.session(transactional=True) + messages = self.send(self.ssn, TX_Q, "txTest", 3) + txrcv = txssn.receiver(TX_Q) + txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) + rcv = self.ssn.receiver(txrcv.source) + copy_rcv = self.ssn.receiver(txsnd.target) + self.assertEmpty(copy_rcv) + for i in range(3): + m = txrcv.fetch(0) + txsnd.send(m) + self.assertEmpty(copy_rcv) + txssn.acknowledge() + if commit: + txssn.commit() + self.assertEmpty(rcv) + self.drain(copy_rcv, expected=messages) + else: + txssn.rollback() + self.drain(rcv, expected=messages, redelivered=True) + self.assertEmpty(copy_rcv) + self.ssn.acknowledge() + + def testCommit(self): + self.txTest(True) + + def testRollback(self): + self.txTest(False) + + def txTestSend(self, commit): + TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' + txssn = self.conn.session(transactional=True) + messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + rcv = self.ssn.receiver(TX_SEND_Q) + self.assertEmpty(rcv) + + if commit: + txssn.commit() + self.drain(rcv, expected=messages) + self.ssn.acknowledge() + else: + txssn.rollback() + self.assertEmpty(rcv) + txssn.commit() + self.assertEmpty(rcv) + + def testCommitSend(self): + self.txTestSend(True) + + def testRollbackSend(self): + self.txTestSend(False) + + def txTestAck(self, commit): + TX_ACK_QC = 'test-tx-ack-queue; {create: always}' + TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' + txssn = self.conn.session(transactional=True) + txrcv = txssn.receiver(TX_ACK_QC) + self.assertEmpty(txrcv) + messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + self.drain(txrcv, expected=messages) + + if commit: + txssn.acknowledge() + else: + txssn.rollback() + self.drain(txrcv, expected=messages, redelivered=True) + txssn.acknowledge() + txssn.rollback() + self.drain(txrcv, expected=messages, redelivered=True) + txssn.commit() # commit without ack + self.assertEmpty(txrcv) + + txssn.close() + + txssn = self.conn.session(transactional=True) + txrcv = txssn.receiver(TX_ACK_QC) + self.drain(txrcv, expected=messages, redelivered=True) + txssn.acknowledge() + txssn.commit() + rcv = self.ssn.receiver(TX_ACK_QD) + self.assertEmpty(rcv) + txssn.close() + self.assertEmpty(rcv) + + def testCommitAck(self): + self.txTestAck(True) + + def testRollbackAck(self): + self.txTestAck(False) + + def testDoubleCommit(self): + ssn = self.conn.session(transactional=True) + snd = ssn.sender("amq.direct") + rcv = ssn.receiver("amq.direct") + msgs = [self.message("testDoubleCommit", i) for i in range(3)] + for m in msgs: + snd.send(m) + ssn.commit() + self.drain(rcv, expected=msgs) + ssn.acknowledge() + ssn.commit() + + def testClose(self): + self.ssn.close() + try: + self.ping(self.ssn) + assert False, "ping succeeded" + except Detached: + pass + +RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' + +class ReceiverTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(RECEIVER_Q) + + def setup_receiver(self): + return self.ssn.receiver(RECEIVER_Q) + + def send(self, base, count = None, sync=True): + content = self.content(base, count) + self.snd.send(content, sync=sync) + return content + + def testFetch(self): + try: + msg = self.rcv.fetch(0) + assert False, "unexpected message: %s" % msg + except Empty: + pass + try: + start = time.time() + msg = self.rcv.fetch(self.delay()) + assert False, "unexpected message: %s" % msg + except Empty: + elapsed = time.time() - start + assert elapsed >= self.delay() + + one = self.send("testFetch", 1) + two = self.send("testFetch", 2) + three = self.send("testFetch", 3) + msg = self.rcv.fetch(0) + assert msg.content == one + msg = self.rcv.fetch(self.delay()) + assert msg.content == two + msg = self.rcv.fetch() + assert msg.content == three + self.ssn.acknowledge() + + def fetchFromClosedTest(self, entry): + entry.close() + try: + msg = self.rcv.fetch(0) + assert False, "unexpected result: %s" % msg + except Empty, e: + assert False, "unexpected exception: %s" % e + except LinkClosed, e: + pass + + def testFetchFromClosedReceiver(self): + self.fetchFromClosedTest(self.rcv) + + def testFetchFromClosedSession(self): + self.fetchFromClosedTest(self.ssn) + + def testFetchFromClosedConnection(self): + self.fetchFromClosedTest(self.conn) + + def fetchFromConcurrentCloseTest(self, entry): + def closer(): + self.sleep() + entry.close() + t = Thread(target=closer) + t.start() + try: + msg = self.rcv.fetch() + assert False, "unexpected result: %s" % msg + except Empty, e: + assert False, "unexpected exception: %s" % e + except LinkClosed, e: + pass + t.join() + + def testFetchFromConcurrentCloseReceiver(self): + self.fetchFromConcurrentCloseTest(self.rcv) + + def testFetchFromConcurrentCloseSession(self): + self.fetchFromConcurrentCloseTest(self.ssn) + + def testFetchFromConcurrentCloseConnection(self): + self.fetchFromConcurrentCloseTest(self.conn) + + def testCapacityIncrease(self): + content = self.send("testCapacityIncrease") + self.sleep() + assert self.rcv.available() == 0 + self.rcv.capacity = UNLIMITED + self.sleep() + assert self.rcv.available() == 1 + msg = self.rcv.fetch(0) + assert msg.content == content + assert self.rcv.available() == 0 + self.ssn.acknowledge() + + def testCapacityDecrease(self): + self.rcv.capacity = UNLIMITED + one = self.send("testCapacityDecrease", 1) + self.sleep() + assert self.rcv.available() == 1 + msg = self.rcv.fetch(0) + assert msg.content == one + + self.rcv.capacity = 0 + + two = self.send("testCapacityDecrease", 2) + self.sleep() + assert self.rcv.available() == 0 + msg = self.rcv.fetch(0) + assert msg.content == two + + self.ssn.acknowledge() + + def capacityTest(self, capacity, threshold=None): + if threshold is not None: + self.rcv.threshold = threshold + self.rcv.capacity = capacity + self.assertAvailable(self.rcv, 0) + + for i in range(2*capacity): + self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False) + self.snd.sync() + self.sleep() + self.assertAvailable(self.rcv) + + first = capacity/2 + second = capacity - first + self.drain(self.rcv, limit = first) + self.sleep() + self.assertAvailable(self.rcv) + self.drain(self.rcv, limit = second) + self.sleep() + self.assertAvailable(self.rcv) + + drained = self.drain(self.rcv) + assert len(drained) == capacity, "%s, %s" % (len(drained), drained) + self.assertAvailable(self.rcv, 0) + + self.ssn.acknowledge() + + def testCapacity5(self): + self.capacityTest(5) + + def testCapacity5Threshold1(self): + self.capacityTest(5, 1) + + def testCapacity10(self): + self.capacityTest(10) + + def testCapacity10Threshold1(self): + self.capacityTest(10, 1) + + def testCapacity100(self): + self.capacityTest(100) + + def testCapacity100Threshold1(self): + self.capacityTest(100, 1) + + def testCapacityUNLIMITED(self): + self.rcv.capacity = UNLIMITED + self.assertAvailable(self.rcv, 0) + + for i in range(10): + self.send("testCapacityUNLIMITED", i) + self.sleep() + self.assertAvailable(self.rcv, 10) + + self.drain(self.rcv) + self.assertAvailable(self.rcv, 0) + + self.ssn.acknowledge() + + def testAvailable(self): + self.rcv.capacity = UNLIMITED + assert self.rcv.available() == 0 + + for i in range(3): + self.send("testAvailable", i) + self.sleep() + assert self.rcv.available() == 3 + + for i in range(3, 10): + self.send("testAvailable", i) + self.sleep() + assert self.rcv.available() == 10 + + self.drain(self.rcv, limit=3) + assert self.rcv.available() == 7 + + self.drain(self.rcv) + assert self.rcv.available() == 0 + + self.ssn.acknowledge() + + def testDoubleClose(self): + m1 = self.content("testDoubleClose", 1) + m2 = self.content("testDoubleClose", 2) + + snd = self.ssn.sender("""test-double-close; { + create: always, + delete: sender, + node: { + type: topic + } +} +""") + r1 = self.ssn.receiver(snd.target) + r2 = self.ssn.receiver(snd.target) + snd.send(m1) + self.drain(r1, expected=[m1]) + self.drain(r2, expected=[m1]) + r1.close() + snd.send(m2) + self.drain(r2, expected=[m2]) + r2.close() + + # XXX: need testClose + + def testMode(self): + msgs = [self.content("testMode", 1), + self.content("testMode", 2), + self.content("testMode", 3)] + + for m in msgs: + self.snd.send(m) + + rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') + rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') + self.drain(rb, expected=msgs) + self.drain(rc, expected=msgs) + rb2 = self.ssn.receiver(rb.source) + self.assertEmpty(rb2) + self.drain(self.rcv, expected=[]) + + # XXX: need testUnsettled() + + def unreliabilityTest(self, mode="unreliable"): + msgs = [self.message("testUnreliable", i) for i in range(3)] + snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}") + rcv = self.ssn.receiver(snd.target) + for m in msgs: + snd.send(m) + + # close without ack on reliable receiver, messages should be requeued + ssn = self.conn.session() + rrcv = ssn.receiver("test-unreliability-queue") + self.drain(rrcv, expected=msgs) + ssn.close() + + # close without ack on unreliable receiver, messages should not be requeued + ssn = self.conn.session() + urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode) + self.drain(urcv, expected=msgs, redelivered=True) + ssn.close() + + self.assertEmpty(rcv) + + def testUnreliable(self): + self.unreliabilityTest(mode="unreliable") + + def testAtMostOnce(self): + self.unreliabilityTest(mode="at-most-once") + +class AddressTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def badOption(self, options, error): + try: + self.ssn.sender("test-bad-options-snd; %s" % options) + assert False + except InvalidOption, e: + assert "error in options: %s" % error == str(e), e + + try: + self.ssn.receiver("test-bad-options-rcv; %s" % options) + assert False + except InvalidOption, e: + assert "error in options: %s" % error == str(e), e + + def testIllegalKey(self): + self.badOption("{create: always, node: " + "{this-property-does-not-exist: 3}}", + "node: this-property-does-not-exist: " + "illegal key") + + def testWrongValue(self): + self.badOption("{create: asdf}", "create: asdf not in " + "('always', 'sender', 'receiver', 'never')") + + def testWrongType1(self): + self.badOption("{node: asdf}", + "node: asdf is not a map") + + def testWrongType2(self): + self.badOption("{node: {durable: []}}", + "node: durable: [] is not a bool") + + def testCreateQueue(self): + snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " + "node: {type: queue, durable: False, " + "x-declare: {auto_delete: true}}}") + content = self.content("testCreateQueue") + snd.send(content) + rcv = self.ssn.receiver("test-create-queue") + self.drain(rcv, expected=[content]) + + def createExchangeTest(self, props=""): + addr = """test-create-exchange; { + create: always, + delete: always, + node: { + type: topic, + durable: False, + x-declare: {auto_delete: true, %s} + } + }""" % props + snd = self.ssn.sender(addr) + snd.send("ping") + rcv1 = self.ssn.receiver("test-create-exchange/first") + rcv2 = self.ssn.receiver("test-create-exchange/first") + rcv3 = self.ssn.receiver("test-create-exchange/second") + for r in (rcv1, rcv2, rcv3): + try: + r.fetch(0) + assert False + except Empty: + pass + msg1 = Message(self.content("testCreateExchange", 1), subject="first") + msg2 = Message(self.content("testCreateExchange", 2), subject="second") + snd.send(msg1) + snd.send(msg2) + self.drain(rcv1, expected=[msg1.content]) + self.drain(rcv2, expected=[msg1.content]) + self.drain(rcv3, expected=[msg2.content]) + + def testCreateExchange(self): + self.createExchangeTest() + + def testCreateExchangeDirect(self): + self.createExchangeTest("type: direct") + + def testCreateExchangeTopic(self): + self.createExchangeTest("type: topic") + + def testDeleteBySender(self): + snd = self.ssn.sender("test-delete; {create: always}") + snd.send("ping") + snd.close() + snd = self.ssn.sender("test-delete; {delete: always}") + snd.send("ping") + snd.close() + try: + self.ssn.sender("test-delete") + except NotFound, e: + assert "no such queue" in str(e) + + def testDeleteByReceiver(self): + rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") + try: + rcv.fetch(0) + except Empty: + pass + rcv.close() + + try: + self.ssn.receiver("test-delete") + assert False + except NotFound, e: + assert "no such queue" in str(e) + + def testDeleteSpecial(self): + snd = self.ssn.sender("amq.topic; {delete: always}") + snd.send("asdf") + try: + snd.close() + assert False, "successfully deleted amq.topic" + except SessionError, e: + assert "Cannot delete default exchange" in str(e) + # XXX: need to figure out close after error + self.conn._remove_session(self.ssn) + + def testNodeBindingsQueue(self): + snd = self.ssn.sender(""" +test-node-bindings-queue; { + create: always, + delete: always, + node: { + x-bindings: [{exchange: "amq.topic", key: "a.#"}, + {exchange: "amq.direct", key: "b"}, + {exchange: "amq.topic", key: "c.*"}] + } +} +""") + snd.send("one") + snd_a = self.ssn.sender("amq.topic/a.foo") + snd_b = self.ssn.sender("amq.direct/b") + snd_c = self.ssn.sender("amq.topic/c.bar") + snd_a.send("two") + snd_b.send("three") + snd_c.send("four") + rcv = self.ssn.receiver("test-node-bindings-queue") + self.drain(rcv, expected=["one", "two", "three", "four"]) + + def testNodeBindingsTopic(self): + rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") + rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") + rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") + rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") + snd = self.ssn.sender(""" +test-node-bindings-topic; { + create: always, + delete: always, + node: { + type: topic, + x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, + {queue: test-node-bindings-topic-queue-a, key: "a.#"}, + {queue: test-node-bindings-topic-queue-b, key: "b"}, + {queue: test-node-bindings-topic-queue-c, key: "c.*"}] + } +} +""") + m1 = Message("one") + m2 = Message(subject="a.foo", content="two") + m3 = Message(subject="b", content="three") + m4 = Message(subject="c.bar", content="four") + snd.send(m1) + snd.send(m2) + snd.send(m3) + snd.send(m4) + self.drain(rcv, expected=[m1, m2, m3, m4]) + self.drain(rcv_a, expected=[m2]) + self.drain(rcv_b, expected=[m3]) + self.drain(rcv_c, expected=[m4]) + + def testLinkBindings(self): + m_a = self.message("testLinkBindings", 1, subject="a") + m_b = self.message("testLinkBindings", 2, subject="b") + + self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") + snd = self.ssn.sender("amq.topic") + + snd.send(m_a) + snd.send(m_b) + snd.close() + + rcv = self.ssn.receiver("test-link-bindings-queue") + self.assertEmpty(rcv) + + snd = self.ssn.sender(""" +amq.topic; { + link: { + x-bindings: [{queue: test-link-bindings-queue, key: a}] + } +} +""") + + snd.send(m_a) + snd.send(m_b) + + self.drain(rcv, expected=[m_a]) + rcv.close() + + rcv = self.ssn.receiver(""" +test-link-bindings-queue; { + link: { + x-bindings: [{exchange: "amq.topic", key: b}] + } +} +""") + + snd.send(m_a) + snd.send(m_b) + + self.drain(rcv, expected=[m_a, m_b]) + + def testSubjectOverride(self): + snd = self.ssn.sender("amq.topic/a") + rcv_a = self.ssn.receiver("amq.topic/a") + rcv_b = self.ssn.receiver("amq.topic/b") + m1 = self.content("testSubjectOverride", 1) + m2 = self.content("testSubjectOverride", 2) + snd.send(m1) + snd.send(Message(subject="b", content=m2)) + self.drain(rcv_a, expected=[m1]) + self.drain(rcv_b, expected=[m2]) + + def testSubjectDefault(self): + m1 = self.content("testSubjectDefault", 1) + m2 = self.content("testSubjectDefault", 2) + snd = self.ssn.sender("amq.topic/a") + rcv = self.ssn.receiver("amq.topic") + snd.send(m1) + snd.send(Message(subject="b", content=m2)) + e1 = rcv.fetch(timeout=0) + e2 = rcv.fetch(timeout=0) + assert e1.subject == "a", "subject: %s" % e1.subject + assert e2.subject == "b", "subject: %s" % e2.subject + self.assertEmpty(rcv) + + def doReliabilityTest(self, reliability, messages, expected): + snd = self.ssn.sender("amq.topic") + rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) + for m in messages: + snd.send(m) + self.conn.detach() + self.conn.attach() + self.drain(rcv, expected=expected) + + def testReliabilityUnreliable(self): + msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] + self.doReliabilityTest("unreliable", msgs, []) + + def testReliabilityAtLeastOnce(self): + msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] + self.doReliabilityTest("at-least-once", msgs, msgs) + + def testLinkName(self): + msgs = [self.message("testLinkName", i) for i in range(3)] + snd = self.ssn.sender("amq.topic") + trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") + qrcv = self.ssn.receiver("test-link-name") + for m in msgs: + snd.send(m) + self.drain(qrcv, expected=msgs) + + def testAssert1(self): + try: + snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") + assert 0, "assertion failed to trigger" + except AssertionFailed, e: + pass + + def testAssert2(self): + snd = self.ssn.sender("amq.topic; {assert: always}") + +NOSUCH_Q = "this-queue-should-not-exist" +UNPARSEABLE_ADDR = "name/subject; {bad options" +UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" + +class AddressErrorTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def senderErrorTest(self, addr, exc, check=lambda e: True): + try: + self.ssn.sender(addr, durable=self.durable()) + assert False, "sender creation succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % compat.format_exc(e) + + def receiverErrorTest(self, addr, exc, check=lambda e: True): + try: + self.ssn.receiver(addr) + assert False, "receiver creation succeeded" + except exc, e: + assert check(e), "unexpected error: %s" % compat.format_exc(e) + + def testNoneTarget(self): + self.senderErrorTest(None, MalformedAddress) + + def testNoneSource(self): + self.receiverErrorTest(None, MalformedAddress) + + def testNoTarget(self): + self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) + + def testNoSource(self): + self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) + + def testUnparseableTarget(self): + self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress, + lambda e: "expecting COLON" in str(e)) + + def testUnparseableSource(self): + self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress, + lambda e: "expecting COLON" in str(e)) + + def testUnlexableTarget(self): + self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress, + lambda e: "unrecognized characters" in str(e)) + + def testUnlexableSource(self): + self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress, + lambda e: "unrecognized characters" in str(e)) + + def testInvalidMode(self): + self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', + InvalidOption, + lambda e: "not in ('browse', 'consume')" in str(e)) + +SENDER_Q = 'test-sender-q; {create: always, delete: always}' + +class SenderTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(SENDER_Q) + + def setup_receiver(self): + return self.ssn.receiver(SENDER_Q) + + def checkContent(self, content): + self.snd.send(content) + msg = self.rcv.fetch(0) + assert msg.content == content + + out = Message(content) + self.snd.send(out) + echo = self.rcv.fetch(0) + assert out.content == echo.content + assert echo.content == msg.content + self.ssn.acknowledge() + + def testSendString(self): + self.checkContent(self.content("testSendString")) + + def testSendList(self): + self.checkContent(["testSendList", 1, 3.14, self.test_id]) + + def testSendMap(self): + self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) + + def asyncTest(self, capacity): + self.snd.capacity = capacity + msgs = [self.content("asyncTest", i) for i in range(15)] + for m in msgs: + self.snd.send(m, sync=False) + self.drain(self.rcv, timeout=self.delay(), expected=msgs) + self.ssn.acknowledge() + + def testSendAsyncCapacity0(self): + try: + self.asyncTest(0) + assert False, "send shouldn't succeed with zero capacity" + except InsufficientCapacity: + # this is expected + pass + + def testSendAsyncCapacity1(self): + self.asyncTest(1) + + def testSendAsyncCapacity5(self): + self.asyncTest(5) + + def testSendAsyncCapacityUNLIMITED(self): + self.asyncTest(UNLIMITED) + + def testCapacityTimeout(self): + self.snd.capacity = 1 + msgs = [] + caught = False + while len(msgs) < 100: + m = self.content("testCapacity", len(msgs)) + try: + self.snd.send(m, sync=False, timeout=0) + msgs.append(m) + except InsufficientCapacity: + caught = True + break + self.snd.sync() + self.drain(self.rcv, expected=msgs) + self.ssn.acknowledge() + assert caught, "did not exceed capacity" diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py new file mode 100644 index 0000000000..297374b82b --- /dev/null +++ b/qpid/python/qpid/tests/messaging/message.py @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base + +class MessageTests(Base): + + def testCreateString(self): + m = Message("string") + assert m.content == "string" + assert m.content_type is None + + def testCreateUnicode(self): + m = Message(u"unicode") + assert m.content == u"unicode" + assert m.content_type == "text/plain" + + def testCreateMap(self): + m = Message({}) + assert m.content == {} + assert m.content_type == "amqp/map" + + def testCreateList(self): + m = Message([]) + assert m.content == [] + assert m.content_type == "amqp/list" + + def testContentTypeOverride(self): + m = Message() + m.content_type = "text/html; charset=utf8" + m.content = u"<html/>" + assert m.content_type == "text/html; charset=utf8" + +ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' + +class MessageEchoTests(Base): + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender(ECHO_Q) + + def setup_receiver(self): + return self.ssn.receiver(ECHO_Q) + + def check(self, msg): + self.snd.send(msg) + echo = self.rcv.fetch(0) + self.assertEcho(msg, echo) + self.ssn.acknowledge(echo) + + def testStringContent(self): + self.check(Message("string")) + + def testUnicodeContent(self): + self.check(Message(u"unicode")) + + + TEST_MAP = {"key1": "string", + "key2": u"unicode", + "key3": 3, + "key4": -3, + "key5": 3.14, + "key6": -3.14, + "key7": ["one", 2, 3.14], + "key8": [], + "key9": {"sub-key0": 3}, + "key10": True, + "key11": False, + "x-amqp-0-10.app-id": "test-app-id", + "x-amqp-0-10.content-encoding": "test-content-encoding"} + + def testMapContent(self): + self.check(Message(MessageEchoTests.TEST_MAP)) + + def testListContent(self): + self.check(Message([])) + self.check(Message([1, 2, 3])) + self.check(Message(["one", 2, 3.14, {"four": 4}])) + + def testProperties(self): + msg = Message() + msg.subject = "subject" + msg.correlation_id = str(self.test_id) + msg.durable = True + msg.priority = 7 + msg.ttl = 60 + msg.properties = MessageEchoTests.TEST_MAP + msg.reply_to = "reply-address" + self.check(msg) + + def testContentTypeUnknown(self): + msg = Message(content_type = "this-content-type-does-not-exist") + self.check(msg) + + def testTextPlain(self): + self.check(Message(content_type="text/plain", content="asdf")) + + def testTextPlainEmpty(self): + self.check(Message(content_type="text/plain")) + + def check_rt(self, addr, expected=None): + if expected is None: + expected = addr + msg = Message(reply_to=addr) + self.snd.send(msg) + echo = self.rcv.fetch(0) + assert echo.reply_to == expected, echo.reply_to + self.ssn.acknowledge(echo) + + def testReplyTo(self): + self.check_rt("name") + + def testReplyToQueue(self): + self.check_rt("name; {node: {type: queue}}", "name") + + def testReplyToQueueSubject(self): + self.check_rt("name/subject; {node: {type: queue}}", "name") + + def testReplyToTopic(self): + self.check_rt("name; {node: {type: topic}}") + + def testReplyToTopicSubject(self): + self.check_rt("name/subject; {node: {type: topic}}") + + def testBooleanEncoding(self): + msg = Message({"true": True, "false": False}) + self.snd.send(msg) + echo = self.rcv.fetch(0) + self.assertEcho(msg, echo) + t = echo.content["true"] + f = echo.content["false"] + assert isinstance(t, bool), t + assert isinstance(f, bool), f diff --git a/qpid/python/qpid/tests/mimetype.py b/qpid/python/qpid/tests/mimetype.py new file mode 100644 index 0000000000..22760316f0 --- /dev/null +++ b/qpid/python/qpid/tests/mimetype.py @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.tests import Test +from qpid.mimetype import lex, parse, ParseError, EOF, WSPACE +from parser import ParserBase + +class MimeTypeTests(ParserBase, Test): + + EXCLUDE = (WSPACE, EOF) + + def do_lex(self, st): + return lex(st) + + def do_parse(self, st): + return parse(st) + + def valid(self, addr, type=None, subtype=None, parameters=None): + ParserBase.valid(self, addr, (type, subtype, parameters)) + + def testTypeOnly(self): + self.invalid("type", "expecting SLASH, got EOF line:1,4:type") + + def testTypeSubtype(self): + self.valid("type/subtype", "type", "subtype", []) + + def testTypeSubtypeParam(self): + self.valid("type/subtype ; name=value", + "type", "subtype", [("name", "value")]) + + def testTypeSubtypeParamComment(self): + self.valid("type/subtype ; name(This is a comment.)=value", + "type", "subtype", [("name", "value")]) + + def testMultipleParams(self): + self.valid("type/subtype ; name1=value1 ; name2=value2", + "type", "subtype", [("name1", "value1"), ("name2", "value2")]) + + def testCaseInsensitivity(self): + self.valid("Type/Subtype", "type", "subtype", []) diff --git a/qpid/python/qpid/tests/parser.py b/qpid/python/qpid/tests/parser.py new file mode 100644 index 0000000000..a4865cc9fe --- /dev/null +++ b/qpid/python/qpid/tests/parser.py @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.parser import ParseError + +class ParserBase: + + def lex(self, addr, *types): + toks = [t.type for t in self.do_lex(addr) if t.type not in self.EXCLUDE] + assert list(types) == toks, "expected %s, got %s" % (types, toks) + + def valid(self, addr, expected): + got = self.do_parse(addr) + assert expected == got, "expected %s, got %s" % (expected, got) + + def invalid(self, addr, error=None): + try: + p = self.do_parse(addr) + assert False, "invalid address parsed: %s" % p + except ParseError, e: + assert error == str(e), "expected %r, got %r" % (error, str(e)) diff --git a/qpid/python/qpid/tests/queue.py b/qpid/python/qpid/tests/queue.py new file mode 100644 index 0000000000..e12354eb43 --- /dev/null +++ b/qpid/python/qpid/tests/queue.py @@ -0,0 +1,71 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import threading, time +from unittest import TestCase +from qpid.queue import Queue, Empty, Closed + + +class QueueTest (TestCase): + + # The qpid queue class just provides sime simple extensions to + # python's standard queue data structure, so we don't need to test + # all the queue functionality. + + def test_listen(self): + values = [] + heard = threading.Event() + def listener(x): + values.append(x) + heard.set() + + q = Queue(0) + q.listen(listener) + heard.clear() + q.put(1) + heard.wait() + assert values[-1] == 1 + heard.clear() + q.put(2) + heard.wait() + assert values[-1] == 2 + + q.listen(None) + q.put(3) + assert q.get(3) == 3 + q.listen(listener) + + heard.clear() + q.put(4) + heard.wait() + assert values[-1] == 4 + + def test_close(self): + q = Queue(0) + q.put(1); q.put(2); q.put(3); q.close() + assert q.get() == 1 + assert q.get() == 2 + assert q.get() == 3 + for i in range(10): + try: + q.get() + raise AssertionError("expected Closed") + except Closed: + pass diff --git a/qpid/python/qpid/tests/spec010.py b/qpid/python/qpid/tests/spec010.py new file mode 100644 index 0000000000..ac04e1ee02 --- /dev/null +++ b/qpid/python/qpid/tests/spec010.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, tempfile, shutil, stat +from unittest import TestCase +from qpid.codec010 import Codec, StringCodec +from qpid.ops import * + +class SpecTest(TestCase): + + def testSessionHeader(self): + sc = StringCodec() + sc.write_compound(Header(sync=True)) + assert sc.encoded == "\x01\x01" + + sc = StringCodec() + sc.write_compound(Header(sync=False)) + assert sc.encoded == "\x01\x00" + + def encdec(self, value): + sc = StringCodec() + sc.write_compound(value) + decoded = sc.read_compound(value.__class__) + return decoded + + def testMessageProperties(self): + props = MessageProperties(content_length=3735928559L, + reply_to=ReplyTo(exchange="the exchange name", + routing_key="the routing key")) + dec = self.encdec(props) + assert props.content_length == dec.content_length + assert props.reply_to.exchange == dec.reply_to.exchange + assert props.reply_to.routing_key == dec.reply_to.routing_key + + def testMessageSubscribe(self): + cmd = MessageSubscribe(exclusive=True, destination="this is a test") + dec = self.encdec(cmd) + assert cmd.exclusive == dec.exclusive + assert cmd.destination == dec.destination + + def testXid(self): + sc = StringCodec() + xid = Xid(format=0, global_id="gid", branch_id="bid") + sc.write_compound(xid) + assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' + dec = sc.read_compound(Xid) + assert xid.__dict__ == dec.__dict__ + +# def testLoadReadOnly(self): +# spec = "amqp.0-10-qpid-errata.xml" +# f = testrunner.get_spec_file(spec) +# dest = tempfile.mkdtemp() +# shutil.copy(f, dest) +# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) +# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) +# fname = os.path.join(dest, spec) +# load(fname) +# assert not os.path.exists("%s.pcl" % fname) |