diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-02-05 15:27:54 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-05 15:27:54 +0000 |
commit | 4799ff20244f1313f15e34ca60f1ed06d81c232a (patch) | |
tree | 59dce384109d34c1e7c2e01f5383843099c37e57 /python/tests | |
parent | 9221f274a6a9b05802f4bd5403425d02ffc84133 (diff) | |
download | qpid-python-4799ff20244f1313f15e34ca60f1ed06d81c232a.tar.gz |
moved tests/*.py underneath qpid/tests/*.py
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906969 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests')
-rw-r--r-- | python/tests/__init__.py | 22 | ||||
-rw-r--r-- | python/tests/codec.py | 601 | ||||
-rw-r--r-- | python/tests/codec010.py | 133 | ||||
-rw-r--r-- | python/tests/connection.py | 222 | ||||
-rw-r--r-- | python/tests/datatypes.py | 296 | ||||
-rw-r--r-- | python/tests/queue.py | 71 | ||||
-rw-r--r-- | python/tests/spec010.py | 74 |
7 files changed, 0 insertions, 1419 deletions
diff --git a/python/tests/__init__.py b/python/tests/__init__.py deleted file mode 100644 index 1e495f3af3..0000000000 --- a/python/tests/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import codec, queue, datatypes, connection, spec010, codec010 diff --git a/python/tests/codec.py b/python/tests/codec.py deleted file mode 100644 index 9b51b4713c..0000000000 --- a/python/tests/codec.py +++ /dev/null @@ -1,601 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from qpid.codec import Codec -from qpid.spec import load -from cStringIO import StringIO -from qpid.reference import ReferenceId - -__doc__ = """ - - This is a unit test script for qpid/codec.py - - It can be run standalone or as part of the existing test framework. - - To run standalone: - ------------------- - - Place in the qpid/python/tests/ directory and type... - - python codec.py - - A brief output will be printed on screen. The verbose output will be placed inn a file called - codec_unit_test_output.txt. [TODO: make this filename configurable] - - To run as part of the existing test framework: - ----------------------------------------------- - - python run-tests tests.codec - - Change History: - ----------------- - Jimmy John 05/19/2007 Initial draft - Jimmy John 05/22/2007 Implemented comments by Rafael Schloming - - -""" - -from qpid_config import amqp_spec_0_8 -SPEC = load(amqp_spec_0_8) - -# -------------------------------------- -# -------------------------------------- -class BaseDataTypes(unittest.TestCase): - - - """ - Base class containing common functions - """ - - # --------------- - def setUp(self): - """ - standard setUp for unitetest (refer unittest documentation for details) - """ - self.codec = Codec(StringIO(), SPEC) - - # ------------------ - def tearDown(self): - """ - standard tearDown for unitetest (refer unittest documentation for details) - """ - self.codec.stream.flush() - self.codec.stream.close() - - # ---------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - given a function name and arguments, calls the function with the args and - returns the contents of the stream - """ - getattr(self.codec, functionName)(args[0]) - return self.codec.stream.getvalue() - - # ---------------------------------------- - def readFunc(self, functionName, *args): - """ - helper function - creates a input stream and then calls the function with arguments as have been - supplied - """ - self.codec.stream = StringIO(args[0]) - return getattr(self.codec, functionName)() - - -# ---------------------------------------- -# ---------------------------------------- -class IntegerTestCase(BaseDataTypes): - - """ - Handles octet, short, long, long long - - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_integer = 2 - self.const_integer_octet_encoded = '\x02' - self.const_integer_short_encoded = '\x00\x02' - self.const_integer_long_encoded = '\x00\x00\x00\x02' - self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' - - # -------------------------- # - # Unsigned Octect - 8 bits # - # -------------------------- # - - # -------------------------- - def test_unsigned_octet(self): - """ - ubyte format requires 0<=number<=255 - """ - self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') - - # ------------------------------------------- - def test_octet_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, 256) - - # ------------------------------------------- - def test_uoctet_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, -1) - - # --------------------------------- - def test_uoctet_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') - - # ------------------------------------ - def test_unsigned_octet_decode(self): - """ - octet decoding - """ - self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') - - # ----------------------------------- # - # Unsigned Short Integers - 16 bits # - # ----------------------------------- # - - # ----------------------- - def test_ushort_int(self): - """ - testing unsigned short integer - """ - self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') - - # ------------------------------------------- - def test_ushort_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, 65536) - - # ------------------------------------------- - def test_ushort_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, -1) - - # --------------------------------- - def test_ushort_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') - - # ------------------------------------ - def test_ushort_int_decode(self): - """ - unsigned short decoding - """ - self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') - - - # ---------------------------------- # - # Unsigned Long Integers - 32 bits # - # ---------------------------------- # - - # ----------------------- - def test_ulong_int(self): - """ - testing unsigned long iteger - """ - self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) - - # ------------------------------------------- - def test_ulong_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, -1) - - # --------------------------------- - def test_ulong_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') - - # ------------------------------- - def test_ulong_int_decode(self): - """ - unsigned long decoding - """ - self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') - - - # --------------------------------------- # - # Unsigned Long Long Integers - 64 bits # - # --------------------------------------- # - - # ----------------------- - def test_ulong_long_int(self): - """ - testing unsinged long long integer - """ - self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_long_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) - - # ------------------------------------------- - def test_ulong_long_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) - - # --------------------------------- - def test_ulong_long_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') - - # ------------------------------------ - def test_ulong_long_int_decode(self): - """ - unsigned long long decoding - """ - self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') - -# ----------------------------------- -# ----------------------------------- -class BitTestCase(BaseDataTypes): - - """ - Handles bits - """ - - # ---------------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - """ - for ele in args: - getattr(self.codec, functionName)(ele) - - self.codec.flush() - return self.codec.stream.getvalue() - - # ------------------- - def test_bit1(self): - """ - sends in 11 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') - - # ------------------- - def test_bit2(self): - """ - sends in 10011 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') - - # ------------------- - def test_bit3(self): - """ - sends in 1110100111 [10 bits(right to left), should be compressed into two octets] - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') - - # ------------------------------------ - def test_bit_decode_1(self): - """ - decode bit 1 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') - - # ------------------------------------ - def test_bit_decode_0(self): - """ - decode bit 0 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') - -# ----------------------------------- -# ----------------------------------- -class StringTestCase(BaseDataTypes): - - """ - Handles short strings, long strings - """ - - # ------------------------------------------------------------- # - # Short Strings - 8 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_short_string_zero_length(self): - """ - 0 length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_positive_length(self): - """ - positive length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_out_of_upper_range(self): - """ - string length > 255 - """ - self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) - - # ------------------------------------ - def test_short_string_decode(self): - """ - short string decode - """ - self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') - - - # ------------------------------------------------------------- # - # Long Strings - 32 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_long_string_zero_length(self): - """ - 0 length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') - - # ------------------------------------------- - def test_long_string_positive_length(self): - """ - positive length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') - - # ------------------------------------ - def test_long_string_decode(self): - """ - long string decode - """ - self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') - - -# -------------------------------------- -# -------------------------------------- -class TimestampTestCase(BaseDataTypes): - - """ - No need of any test cases here as timestamps are implemented as long long which is tested above - """ - pass - -# --------------------------------------- -# --------------------------------------- -class FieldTableTestCase(BaseDataTypes): - - """ - Handles Field Tables - - Only S/I type messages seem to be implemented currently - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} - self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' - - # ------------------------------------------- - def test_field_table_name_value_pair(self): - """ - valid name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') - - # --------------------------------------------------- - def test_field_table_multiple_name_value_pair(self): - """ - multiple name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') - - # ------------------------------------ - def test_field_table_decode(self): - """ - field table decode - """ - self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') - - -# ------------------------------------ -# ------------------------------------ -class ContentTestCase(BaseDataTypes): - - """ - Handles Content data types - """ - - # ----------------------------- - def test_content_inline(self): - """ - inline content - """ - self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') - - # -------------------------------- - def test_content_reference(self): - """ - reference content - """ - self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') - - # ------------------------------------ - def test_content_inline_decode(self): - """ - inline content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') - - # ------------------------------------ - def test_content_reference_decode(self): - """ - reference content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') - -# ------------------------ # -# Pre - existing test code # -# ------------------------ # - -# --------------------- -def test(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - if isinstance(value, (list, tuple)): - values = value - else: - values = [value] - stream = StringIO() - codec = Codec(stream, SPEC) - for v in values: - codec.encode(type, v) - codec.flush() - enc = stream.getvalue() - stream.reset() - dup = [] - for i in xrange(len(values)): - dup.append(codec.decode(type)) - if values != dup: - raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) - -# ----------------------- -def dotest(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - args = (type, value) - test(*args) - -# ------------- -def oldtests(): - """ - old test function cut/copy/paste from qpid/codec.py - """ - for value in ("1", "0", "110", "011", "11001", "10101", "10011"): - for i in range(10): - dotest("bit", map(lambda x: x == "1", value*i)) - - for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): - dotest("table", value) - - for type in ("octet", "short", "long", "longlong"): - for value in range(0, 256): - dotest(type, value) - - for type in ("shortstr", "longstr"): - for value in ("", "a", "asdf"): - dotest(type, value) - -# ----------------------------------------- -class oldTests(unittest.TestCase): - - """ - class to handle pre-existing test cases - """ - - # --------------------------- - def test_oldtestcases(self): - """ - call the old tests - """ - return oldtests() - -# --------------------------- -# --------------------------- -if __name__ == '__main__': - - codec_test_suite = unittest.TestSuite() - - #adding all the test suites... - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) - - #loading pre-existing test case from qpid/codec.py - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) - - run_output_stream = StringIO() - test_runner = unittest.TextTestRunner(run_output_stream, '', '') - test_result = test_runner.run(codec_test_suite) - - print '\n%d test run...' % (test_result.testsRun) - - if test_result.wasSuccessful(): - print '\nAll tests successful\n' - - if test_result.failures: - print '\n----------' - print '%d FAILURES:' % (len(test_result.failures)) - print '----------\n' - for failure in test_result.failures: - print str(failure[0]) + ' ... FAIL' - - if test_result.errors: - print '\n---------' - print '%d ERRORS:' % (len(test_result.errors)) - print '---------\n' - - for error in test_result.errors: - print str(error[0]) + ' ... ERROR' - - f = open('codec_unit_test_output.txt', 'w') - f.write(str(run_output_stream.getvalue())) - f.close() diff --git a/python/tests/codec010.py b/python/tests/codec010.py deleted file mode 100644 index 787ebc146f..0000000000 --- a/python/tests/codec010.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time - -from unittest import TestCase -from qpid.codec010 import StringCodec -from qpid.datatypes import timestamp, uuid4 -from qpid.ops import PRIMITIVE - -class CodecTest(TestCase): - - def check(self, type, value, compare=True): - t = PRIMITIVE[type] - sc = StringCodec() - sc.write_primitive(t, value) - decoded = sc.read_primitive(t) - if compare: - assert decoded == value, "%s, %s" % (decoded, value) - return decoded - - def testMapString(self): - self.check("map", {"string": "this is a test"}) - - def testMapUnicode(self): - self.check("map", {"unicode": u"this is a unicode test"}) - - def testMapBinary(self): - self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"}) - - def testMapBuffer(self): - s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5" - dec = self.check("map", {"buffer": buffer(s)}, False) - assert dec["buffer"] == s - - def testMapInt(self): - self.check("map", {"int": 3}) - - def testMapLong(self): - self.check("map", {"long": 2**32}) - self.check("map", {"long": 1 << 34}) - self.check("map", {"long": -(1 << 34)}) - - def testMapTimestamp(self): - decoded = self.check("map", {"timestamp": timestamp(0)}) - assert isinstance(decoded["timestamp"], timestamp) - - def testMapDatetime(self): - decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False) - assert isinstance(decoded["datetime"], timestamp) - assert decoded["datetime"] == 0.0 - - def testMapNone(self): - self.check("map", {"none": None}) - - def testMapNested(self): - self.check("map", {"map": {"string": "nested test"}}) - - def testMapList(self): - self.check("map", {"list": [1, "two", 3.0, -4]}) - - def testMapUUID(self): - self.check("map", {"uuid": uuid4()}) - - def testMapAll(self): - decoded = self.check("map", {"string": "this is a test", - "unicode": u"this is a unicode test", - "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5", - "int": 3, - "long": 2**32, - "timestamp": timestamp(0), - "none": None, - "map": {"string": "nested map"}, - "list": [1, "two", 3.0, -4], - "uuid": uuid4()}) - assert isinstance(decoded["timestamp"], timestamp) - - def testMapEmpty(self): - self.check("map", {}) - - def testMapNone(self): - self.check("map", None) - - def testList(self): - self.check("list", [1, "two", 3.0, -4]) - - def testListEmpty(self): - self.check("list", []) - - def testListNone(self): - self.check("list", None) - - def testArrayInt(self): - self.check("array", [1, 2, 3, 4]) - - def testArrayString(self): - self.check("array", ["one", "two", "three", "four"]) - - def testArrayEmpty(self): - self.check("array", []) - - def testArrayNone(self): - self.check("array", None) - - def testInt16(self): - self.check("int16", 3) - self.check("int16", -3) - - def testInt64(self): - self.check("int64", 3) - self.check("int64", -3) - self.check("int64", 1<<34) - self.check("int64", -(1<<34)) - - def testDatetime(self): - self.check("datetime", timestamp(0)) - self.check("datetime", timestamp(long(time.time()))) diff --git a/python/tests/connection.py b/python/tests/connection.py deleted file mode 100644 index 8c00df56e1..0000000000 --- a/python/tests/connection.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.connection import * -from qpid.datatypes import Message -from qpid.delegates import Server -from qpid.queue import Queue -from qpid.session import Delegate -from qpid.ops import QueueQueryResult - -PORT = 1234 - -class TestServer: - - def __init__(self, queue): - self.queue = queue - - def connection(self, connection): - return Server(connection, delegate=self.session) - - def session(self, session): - session.auto_sync = False - return TestSession(session, self.queue) - -class TestSession(Delegate): - - def __init__(self, session, queue): - self.session = session - self.queue = queue - - def execution_sync(self, es): - pass - - def queue_query(self, qq): - return QueueQueryResult(qq.queue) - - def message_transfer(self, cmd): - if cmd.destination == "echo": - m = Message(cmd.payload) - m.headers = cmd.headers - self.session.message_transfer(cmd.destination, cmd.accept_mode, - cmd.acquire_mode, m) - elif cmd.destination == "abort": - self.session.channel.connection.sock.close() - elif cmd.destination == "heartbeat": - self.session.channel.connection_heartbeat() - else: - self.queue.put(cmd) - -class ConnectionTest(TestCase): - - def setUp(self): - self.queue = Queue() - self.running = True - started = Event() - - def run(): - ts = TestServer(self.queue) - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - conn = Connection(s, delegate=ts.connection) - try: - conn.start(5) - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - assert started.isSet() - - def tearDown(self): - self.running = False - connect("127.0.0.1", PORT).close() - self.server.join(3) - - def connect(self, **kwargs): - return Connection(connect("127.0.0.1", PORT), **kwargs) - - def test(self): - c = self.connect() - c.start(10) - - ssn1 = c.session("test1", timeout=10) - ssn2 = c.session("test2", timeout=10) - - assert ssn1 == c.sessions["test1"] - assert ssn2 == c.sessions["test2"] - assert ssn1.channel != None - assert ssn2.channel != None - assert ssn1 in c.attached.values() - assert ssn2 in c.attached.values() - - ssn1.close(5) - - assert ssn1.channel == None - assert ssn1 not in c.attached.values() - assert ssn2 in c.sessions.values() - - ssn2.close(5) - - assert ssn2.channel == None - assert ssn2 not in c.attached.values() - assert ssn2 not in c.sessions.values() - - ssn = c.session("session", timeout=10) - - assert ssn.channel != None - assert ssn in c.sessions.values() - - destinations = ("one", "two", "three") - - for d in destinations: - ssn.message_transfer(d) - - for d in destinations: - cmd = self.queue.get(10) - assert cmd.destination == d - assert cmd.headers == None - assert cmd.payload == None - - msg = Message("this is a test") - ssn.message_transfer("four", message=msg) - cmd = self.queue.get(10) - assert cmd.destination == "four" - assert cmd.headers == None - assert cmd.payload == msg.body - - qq = ssn.queue_query("asdf") - assert qq.queue == "asdf" - c.close(5) - - def testCloseGet(self): - c = self.connect() - c.start(10) - ssn = c.session("test", timeout=10) - echos = ssn.incoming("echo") - - for i in range(10): - ssn.message_transfer("echo", message=Message("test%d" % i)) - - ssn.auto_sync=False - ssn.message_transfer("abort") - - for i in range(10): - m = echos.get(timeout=10) - assert m.body == "test%d" % i - - try: - m = echos.get(timeout=10) - assert False - except Closed, e: - pass - - def testCloseListen(self): - c = self.connect() - c.start(10) - ssn = c.session("test", timeout=10) - echos = ssn.incoming("echo") - - messages = [] - exceptions = [] - condition = Condition() - def listener(m): messages.append(m) - def exc_listener(e): - exceptions.append(e) - condition.acquire() - condition.notify() - condition.release() - - echos.listen(listener, exc_listener) - - for i in range(10): - ssn.message_transfer("echo", message=Message("test%d" % i)) - - ssn.auto_sync=False - ssn.message_transfer("abort") - - condition.acquire() - condition.wait(10) - condition.release() - - for i in range(10): - m = messages.pop(0) - assert m.body == "test%d" % i - - assert len(exceptions) == 1 - - def testSync(self): - c = self.connect() - c.start(10) - s = c.session("test") - s.auto_sync = False - s.message_transfer("echo", message=Message("test")) - s.sync(10) - - def testHeartbeat(self): - c = self.connect(heartbeat=10) - c.start(10) - s = c.session("test") - s.channel.connection_heartbeat() - s.message_transfer("heartbeat") diff --git a/python/tests/datatypes.py b/python/tests/datatypes.py deleted file mode 100644 index 00e649d6cf..0000000000 --- a/python/tests/datatypes.py +++ /dev/null @@ -1,296 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from unittest import TestCase -from qpid.datatypes import * -from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties - -class SerialTest(TestCase): - - def test(self): - for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)): - assert s + 1 > s - assert s - 1 < s - assert s < s + 1 - assert s > s - 1 - - assert serial(0xFFFFFFFFL) + 1 == serial(0) - - assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL) - assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0) - - def testIncr(self): - s = serial(0) - s += 1 - assert s == serial(1) - - def testIn(self): - l = [serial(1), serial(2), serial(3), serial(4)] - assert serial(1) in l - assert serial(0xFFFFFFFFL + 2) in l - assert 4 in l - - def testNone(self): - assert serial(0) != None - - def testHash(self): - d = {} - d[serial(0)] = "zero" - assert d[0] == "zero" - - def testAdd(self): - assert serial(2) + 2 == serial(4) - assert serial(2) + 2 == 4 - - def testSub(self): - delta = serial(4) - serial(2) - assert isinstance(delta, int) or isinstance(delta, long) - assert delta == 2 - - delta = serial(4) - 2 - assert isinstance(delta, Serial) - assert delta == serial(2) - -class RangedSetTest(TestCase): - - def check(self, ranges): - posts = [] - for range in ranges: - posts.append(range.lower) - posts.append(range.upper) - - sorted = posts[:] - sorted.sort() - - assert posts == sorted - - idx = 1 - while idx + 1 < len(posts): - assert posts[idx] + 1 != posts[idx+1] - idx += 2 - - def test(self): - rs = RangedSet() - - self.check(rs.ranges) - - rs.add(1) - - assert 1 in rs - assert 2 not in rs - assert 0 not in rs - self.check(rs.ranges) - - rs.add(2) - - assert 0 not in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - self.check(rs.ranges) - - rs.add(0) - - assert -1 not in rs - assert 0 in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - self.check(rs.ranges) - - rs.add(37) - - assert -1 not in rs - assert 0 in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - assert 36 not in rs - assert 37 in rs - assert 38 not in rs - self.check(rs.ranges) - - rs.add(-1) - self.check(rs.ranges) - - rs.add(-3) - self.check(rs.ranges) - - rs.add(1, 20) - assert 21 not in rs - assert 20 in rs - self.check(rs.ranges) - - def testAddSelf(self): - a = RangedSet() - a.add(0, 8) - self.check(a.ranges) - a.add(0, 8) - self.check(a.ranges) - assert len(a.ranges) == 1 - range = a.ranges[0] - assert range.lower == 0 - assert range.upper == 8 - - def testEmpty(self): - s = RangedSet() - assert s.empty() - s.add(0, -1) - assert s.empty() - s.add(0, 0) - assert not s.empty() - - def testMinMax(self): - s = RangedSet() - assert s.max() is None - assert s.min() is None - s.add(0, 10) - assert s.max() == 10 - assert s.min() == 0 - s.add(0, 5) - assert s.max() == 10 - assert s.min() == 0 - s.add(0, 11) - assert s.max() == 11 - assert s.min() == 0 - s.add(15, 20) - assert s.max() == 20 - assert s.min() == 0 - s.add(-10, -5) - assert s.max() == 20 - assert s.min() == -10 - -class RangeTest(TestCase): - - def testIntersect1(self): - a = Range(0, 10) - b = Range(9, 20) - i1 = a.intersect(b) - i2 = b.intersect(a) - assert i1.upper == 10 - assert i2.upper == 10 - assert i1.lower == 9 - assert i2.lower == 9 - - def testIntersect2(self): - a = Range(0, 10) - b = Range(11, 20) - assert a.intersect(b) == None - assert b.intersect(a) == None - - def testIntersect3(self): - a = Range(0, 10) - b = Range(3, 5) - i1 = a.intersect(b) - i2 = b.intersect(a) - assert i1.upper == 5 - assert i2.upper == 5 - assert i1.lower == 3 - assert i2.lower == 3 - -class UUIDTest(TestCase): - - def test(self): - # this test is kind of lame, but it does excercise the basic - # functionality of the class - u = uuid4() - for i in xrange(1024): - assert u != uuid4() - -class MessageTest(TestCase): - - def setUp(self): - self.mp = MessageProperties() - self.dp = DeliveryProperties() - self.fp = FragmentProperties() - - def testHas(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.has("message_properties") - assert m.has("delivery_properties") - assert m.has("fragment_properties") - - def testGet(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - assert m.get("fragment_properties") == self.fp - - def testSet(self): - m = Message(self.mp, self.dp, "body") - assert m.get("fragment_properties") is None - m.set(self.fp) - assert m.get("fragment_properties") == self.fp - - def testSetOnEmpty(self): - m = Message("body") - assert m.get("delivery_properties") is None - m.set(self.dp) - assert m.get("delivery_properties") == self.dp - - def testSetReplace(self): - m = Message(self.mp, self.dp, self.fp, "body") - dp = DeliveryProperties() - assert m.get("delivery_properties") == self.dp - assert m.get("delivery_properties") != dp - m.set(dp) - assert m.get("delivery_properties") != self.dp - assert m.get("delivery_properties") == dp - - def testClear(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - assert m.get("fragment_properties") == self.fp - m.clear("fragment_properties") - assert m.get("fragment_properties") is None - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - -class TimestampTest(TestCase): - - def check(self, expected, *values): - for v in values: - assert isinstance(v, timestamp) - assert v == expected - assert v == timestamp(expected) - - def testAdd(self): - self.check(4.0, - timestamp(2.0) + 2.0, - 2.0 + timestamp(2.0)) - - def testSub(self): - self.check(2.0, - timestamp(4.0) - 2.0, - 4.0 - timestamp(2.0)) - - def testNeg(self): - self.check(-4.0, -timestamp(4.0)) - - def testPos(self): - self.check(+4.0, +timestamp(4.0)) - - def testAbs(self): - self.check(4.0, abs(timestamp(-4.0))) - - def testConversion(self): - dt = timestamp(0).datetime() - t = timestamp(dt) - assert t == 0 diff --git a/python/tests/queue.py b/python/tests/queue.py deleted file mode 100644 index e12354eb43..0000000000 --- a/python/tests/queue.py +++ /dev/null @@ -1,71 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import threading, time -from unittest import TestCase -from qpid.queue import Queue, Empty, Closed - - -class QueueTest (TestCase): - - # The qpid queue class just provides sime simple extensions to - # python's standard queue data structure, so we don't need to test - # all the queue functionality. - - def test_listen(self): - values = [] - heard = threading.Event() - def listener(x): - values.append(x) - heard.set() - - q = Queue(0) - q.listen(listener) - heard.clear() - q.put(1) - heard.wait() - assert values[-1] == 1 - heard.clear() - q.put(2) - heard.wait() - assert values[-1] == 2 - - q.listen(None) - q.put(3) - assert q.get(3) == 3 - q.listen(listener) - - heard.clear() - q.put(4) - heard.wait() - assert values[-1] == 4 - - def test_close(self): - q = Queue(0) - q.put(1); q.put(2); q.put(3); q.close() - assert q.get() == 1 - assert q.get() == 2 - assert q.get() == 3 - for i in range(10): - try: - q.get() - raise AssertionError("expected Closed") - except Closed: - pass diff --git a/python/tests/spec010.py b/python/tests/spec010.py deleted file mode 100644 index ac04e1ee02..0000000000 --- a/python/tests/spec010.py +++ /dev/null @@ -1,74 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, tempfile, shutil, stat -from unittest import TestCase -from qpid.codec010 import Codec, StringCodec -from qpid.ops import * - -class SpecTest(TestCase): - - def testSessionHeader(self): - sc = StringCodec() - sc.write_compound(Header(sync=True)) - assert sc.encoded == "\x01\x01" - - sc = StringCodec() - sc.write_compound(Header(sync=False)) - assert sc.encoded == "\x01\x00" - - def encdec(self, value): - sc = StringCodec() - sc.write_compound(value) - decoded = sc.read_compound(value.__class__) - return decoded - - def testMessageProperties(self): - props = MessageProperties(content_length=3735928559L, - reply_to=ReplyTo(exchange="the exchange name", - routing_key="the routing key")) - dec = self.encdec(props) - assert props.content_length == dec.content_length - assert props.reply_to.exchange == dec.reply_to.exchange - assert props.reply_to.routing_key == dec.reply_to.routing_key - - def testMessageSubscribe(self): - cmd = MessageSubscribe(exclusive=True, destination="this is a test") - dec = self.encdec(cmd) - assert cmd.exclusive == dec.exclusive - assert cmd.destination == dec.destination - - def testXid(self): - sc = StringCodec() - xid = Xid(format=0, global_id="gid", branch_id="bid") - sc.write_compound(xid) - assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' - dec = sc.read_compound(Xid) - assert xid.__dict__ == dec.__dict__ - -# def testLoadReadOnly(self): -# spec = "amqp.0-10-qpid-errata.xml" -# f = testrunner.get_spec_file(spec) -# dest = tempfile.mkdtemp() -# shutil.copy(f, dest) -# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) -# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) -# fname = os.path.join(dest, spec) -# load(fname) -# assert not os.path.exists("%s.pcl" % fname) |