summaryrefslogtreecommitdiff
path: root/python/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests')
-rw-r--r--python/tests/__init__.py22
-rw-r--r--python/tests/codec.py601
-rw-r--r--python/tests/codec010.py133
-rw-r--r--python/tests/connection.py222
-rw-r--r--python/tests/datatypes.py296
-rw-r--r--python/tests/queue.py71
-rw-r--r--python/tests/spec010.py74
7 files changed, 0 insertions, 1419 deletions
diff --git a/python/tests/__init__.py b/python/tests/__init__.py
deleted file mode 100644
index 1e495f3af3..0000000000
--- a/python/tests/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import codec, queue, datatypes, connection, spec010, codec010
diff --git a/python/tests/codec.py b/python/tests/codec.py
deleted file mode 100644
index 9b51b4713c..0000000000
--- a/python/tests/codec.py
+++ /dev/null
@@ -1,601 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import unittest
-from qpid.codec import Codec
-from qpid.spec import load
-from cStringIO import StringIO
-from qpid.reference import ReferenceId
-
-__doc__ = """
-
- This is a unit test script for qpid/codec.py
-
- It can be run standalone or as part of the existing test framework.
-
- To run standalone:
- -------------------
-
- Place in the qpid/python/tests/ directory and type...
-
- python codec.py
-
- A brief output will be printed on screen. The verbose output will be placed inn a file called
- codec_unit_test_output.txt. [TODO: make this filename configurable]
-
- To run as part of the existing test framework:
- -----------------------------------------------
-
- python run-tests tests.codec
-
- Change History:
- -----------------
- Jimmy John 05/19/2007 Initial draft
- Jimmy John 05/22/2007 Implemented comments by Rafael Schloming
-
-
-"""
-
-from qpid_config import amqp_spec_0_8
-SPEC = load(amqp_spec_0_8)
-
-# --------------------------------------
-# --------------------------------------
-class BaseDataTypes(unittest.TestCase):
-
-
- """
- Base class containing common functions
- """
-
- # ---------------
- def setUp(self):
- """
- standard setUp for unitetest (refer unittest documentation for details)
- """
- self.codec = Codec(StringIO(), SPEC)
-
- # ------------------
- def tearDown(self):
- """
- standard tearDown for unitetest (refer unittest documentation for details)
- """
- self.codec.stream.flush()
- self.codec.stream.close()
-
- # ----------------------------------------
- def callFunc(self, functionName, *args):
- """
- helper function - given a function name and arguments, calls the function with the args and
- returns the contents of the stream
- """
- getattr(self.codec, functionName)(args[0])
- return self.codec.stream.getvalue()
-
- # ----------------------------------------
- def readFunc(self, functionName, *args):
- """
- helper function - creates a input stream and then calls the function with arguments as have been
- supplied
- """
- self.codec.stream = StringIO(args[0])
- return getattr(self.codec, functionName)()
-
-
-# ----------------------------------------
-# ----------------------------------------
-class IntegerTestCase(BaseDataTypes):
-
- """
- Handles octet, short, long, long long
-
- """
-
- # -------------------------
- def __init__(self, *args):
- """
- sets constants for use in tests
- """
-
- BaseDataTypes.__init__(self, *args)
- self.const_integer = 2
- self.const_integer_octet_encoded = '\x02'
- self.const_integer_short_encoded = '\x00\x02'
- self.const_integer_long_encoded = '\x00\x00\x00\x02'
- self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02'
-
- # -------------------------- #
- # Unsigned Octect - 8 bits #
- # -------------------------- #
-
- # --------------------------
- def test_unsigned_octet(self):
- """
- ubyte format requires 0<=number<=255
- """
- self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...')
-
- # -------------------------------------------
- def test_octet_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_octet, 256)
-
- # -------------------------------------------
- def test_uoctet_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_octet, -1)
-
- # ---------------------------------
- def test_uoctet_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...')
-
- # ------------------------------------
- def test_unsigned_octet_decode(self):
- """
- octet decoding
- """
- self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...')
-
- # ----------------------------------- #
- # Unsigned Short Integers - 16 bits #
- # ----------------------------------- #
-
- # -----------------------
- def test_ushort_int(self):
- """
- testing unsigned short integer
- """
- self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...')
-
- # -------------------------------------------
- def test_ushort_int_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_short, 65536)
-
- # -------------------------------------------
- def test_ushort_int_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_short, -1)
-
- # ---------------------------------
- def test_ushort_int_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...')
-
- # ------------------------------------
- def test_ushort_int_decode(self):
- """
- unsigned short decoding
- """
- self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...')
-
-
- # ---------------------------------- #
- # Unsigned Long Integers - 32 bits #
- # ---------------------------------- #
-
- # -----------------------
- def test_ulong_int(self):
- """
- testing unsigned long iteger
- """
- self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...')
-
- # -------------------------------------------
- def test_ulong_int_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296)
-
- # -------------------------------------------
- def test_ulong_int_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_long, -1)
-
- # ---------------------------------
- def test_ulong_int_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...')
-
- # -------------------------------
- def test_ulong_int_decode(self):
- """
- unsigned long decoding
- """
- self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...')
-
-
- # --------------------------------------- #
- # Unsigned Long Long Integers - 64 bits #
- # --------------------------------------- #
-
- # -----------------------
- def test_ulong_long_int(self):
- """
- testing unsinged long long integer
- """
- self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...')
-
- # -------------------------------------------
- def test_ulong_long_int_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616)
-
- # -------------------------------------------
- def test_ulong_long_int_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_longlong, -1)
-
- # ---------------------------------
- def test_ulong_long_int_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...')
-
- # ------------------------------------
- def test_ulong_long_int_decode(self):
- """
- unsigned long long decoding
- """
- self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...')
-
-# -----------------------------------
-# -----------------------------------
-class BitTestCase(BaseDataTypes):
-
- """
- Handles bits
- """
-
- # ----------------------------------------------
- def callFunc(self, functionName, *args):
- """
- helper function
- """
- for ele in args:
- getattr(self.codec, functionName)(ele)
-
- self.codec.flush()
- return self.codec.stream.getvalue()
-
- # -------------------
- def test_bit1(self):
- """
- sends in 11
- """
- self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...')
-
- # -------------------
- def test_bit2(self):
- """
- sends in 10011
- """
- self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...')
-
- # -------------------
- def test_bit3(self):
- """
- sends in 1110100111 [10 bits(right to left), should be compressed into two octets]
- """
- self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...')
-
- # ------------------------------------
- def test_bit_decode_1(self):
- """
- decode bit 1
- """
- self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...')
-
- # ------------------------------------
- def test_bit_decode_0(self):
- """
- decode bit 0
- """
- self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...')
-
-# -----------------------------------
-# -----------------------------------
-class StringTestCase(BaseDataTypes):
-
- """
- Handles short strings, long strings
- """
-
- # ------------------------------------------------------------- #
- # Short Strings - 8 bit length followed by zero or more octets #
- # ------------------------------------------------------------- #
-
- # ---------------------------------------
- def test_short_string_zero_length(self):
- """
- 0 length short string
- """
- self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...')
-
- # -------------------------------------------
- def test_short_string_positive_length(self):
- """
- positive length short string
- """
- self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...')
-
- # -------------------------------------------
- def test_short_string_out_of_upper_range(self):
- """
- string length > 255
- """
- self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256)
-
- # ------------------------------------
- def test_short_string_decode(self):
- """
- short string decode
- """
- self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...')
-
-
- # ------------------------------------------------------------- #
- # Long Strings - 32 bit length followed by zero or more octets #
- # ------------------------------------------------------------- #
-
- # ---------------------------------------
- def test_long_string_zero_length(self):
- """
- 0 length long string
- """
- self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...')
-
- # -------------------------------------------
- def test_long_string_positive_length(self):
- """
- positive length long string
- """
- self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...')
-
- # ------------------------------------
- def test_long_string_decode(self):
- """
- long string decode
- """
- self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...')
-
-
-# --------------------------------------
-# --------------------------------------
-class TimestampTestCase(BaseDataTypes):
-
- """
- No need of any test cases here as timestamps are implemented as long long which is tested above
- """
- pass
-
-# ---------------------------------------
-# ---------------------------------------
-class FieldTableTestCase(BaseDataTypes):
-
- """
- Handles Field Tables
-
- Only S/I type messages seem to be implemented currently
- """
-
- # -------------------------
- def __init__(self, *args):
- """
- sets constants for use in tests
- """
-
- BaseDataTypes.__init__(self, *args)
- self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'}
- self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1'
-
- # -------------------------------------------
- def test_field_table_name_value_pair(self):
- """
- valid name value pair
- """
- self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...')
-
- # ---------------------------------------------------
- def test_field_table_multiple_name_value_pair(self):
- """
- multiple name value pair
- """
- self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...')
-
- # ------------------------------------
- def test_field_table_decode(self):
- """
- field table decode
- """
- self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...')
-
-
-# ------------------------------------
-# ------------------------------------
-class ContentTestCase(BaseDataTypes):
-
- """
- Handles Content data types
- """
-
- # -----------------------------
- def test_content_inline(self):
- """
- inline content
- """
- self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...')
-
- # --------------------------------
- def test_content_reference(self):
- """
- reference content
- """
- self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...')
-
- # ------------------------------------
- def test_content_inline_decode(self):
- """
- inline content decode
- """
- self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...')
-
- # ------------------------------------
- def test_content_reference_decode(self):
- """
- reference content decode
- """
- self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...')
-
-# ------------------------ #
-# Pre - existing test code #
-# ------------------------ #
-
-# ---------------------
-def test(type, value):
- """
- old test function cut/copy/paste from qpid/codec.py
- """
- if isinstance(value, (list, tuple)):
- values = value
- else:
- values = [value]
- stream = StringIO()
- codec = Codec(stream, SPEC)
- for v in values:
- codec.encode(type, v)
- codec.flush()
- enc = stream.getvalue()
- stream.reset()
- dup = []
- for i in xrange(len(values)):
- dup.append(codec.decode(type))
- if values != dup:
- raise AssertionError("%r --> %r --> %r" % (values, enc, dup))
-
-# -----------------------
-def dotest(type, value):
- """
- old test function cut/copy/paste from qpid/codec.py
- """
- args = (type, value)
- test(*args)
-
-# -------------
-def oldtests():
- """
- old test function cut/copy/paste from qpid/codec.py
- """
- for value in ("1", "0", "110", "011", "11001", "10101", "10011"):
- for i in range(10):
- dotest("bit", map(lambda x: x == "1", value*i))
-
- for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}):
- dotest("table", value)
-
- for type in ("octet", "short", "long", "longlong"):
- for value in range(0, 256):
- dotest(type, value)
-
- for type in ("shortstr", "longstr"):
- for value in ("", "a", "asdf"):
- dotest(type, value)
-
-# -----------------------------------------
-class oldTests(unittest.TestCase):
-
- """
- class to handle pre-existing test cases
- """
-
- # ---------------------------
- def test_oldtestcases(self):
- """
- call the old tests
- """
- return oldtests()
-
-# ---------------------------
-# ---------------------------
-if __name__ == '__main__':
-
- codec_test_suite = unittest.TestSuite()
-
- #adding all the test suites...
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase))
-
- #loading pre-existing test case from qpid/codec.py
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests))
-
- run_output_stream = StringIO()
- test_runner = unittest.TextTestRunner(run_output_stream, '', '')
- test_result = test_runner.run(codec_test_suite)
-
- print '\n%d test run...' % (test_result.testsRun)
-
- if test_result.wasSuccessful():
- print '\nAll tests successful\n'
-
- if test_result.failures:
- print '\n----------'
- print '%d FAILURES:' % (len(test_result.failures))
- print '----------\n'
- for failure in test_result.failures:
- print str(failure[0]) + ' ... FAIL'
-
- if test_result.errors:
- print '\n---------'
- print '%d ERRORS:' % (len(test_result.errors))
- print '---------\n'
-
- for error in test_result.errors:
- print str(error[0]) + ' ... ERROR'
-
- f = open('codec_unit_test_output.txt', 'w')
- f.write(str(run_output_stream.getvalue()))
- f.close()
diff --git a/python/tests/codec010.py b/python/tests/codec010.py
deleted file mode 100644
index 787ebc146f..0000000000
--- a/python/tests/codec010.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time
-
-from unittest import TestCase
-from qpid.codec010 import StringCodec
-from qpid.datatypes import timestamp, uuid4
-from qpid.ops import PRIMITIVE
-
-class CodecTest(TestCase):
-
- def check(self, type, value, compare=True):
- t = PRIMITIVE[type]
- sc = StringCodec()
- sc.write_primitive(t, value)
- decoded = sc.read_primitive(t)
- if compare:
- assert decoded == value, "%s, %s" % (decoded, value)
- return decoded
-
- def testMapString(self):
- self.check("map", {"string": "this is a test"})
-
- def testMapUnicode(self):
- self.check("map", {"unicode": u"this is a unicode test"})
-
- def testMapBinary(self):
- self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"})
-
- def testMapBuffer(self):
- s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"
- dec = self.check("map", {"buffer": buffer(s)}, False)
- assert dec["buffer"] == s
-
- def testMapInt(self):
- self.check("map", {"int": 3})
-
- def testMapLong(self):
- self.check("map", {"long": 2**32})
- self.check("map", {"long": 1 << 34})
- self.check("map", {"long": -(1 << 34)})
-
- def testMapTimestamp(self):
- decoded = self.check("map", {"timestamp": timestamp(0)})
- assert isinstance(decoded["timestamp"], timestamp)
-
- def testMapDatetime(self):
- decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False)
- assert isinstance(decoded["datetime"], timestamp)
- assert decoded["datetime"] == 0.0
-
- def testMapNone(self):
- self.check("map", {"none": None})
-
- def testMapNested(self):
- self.check("map", {"map": {"string": "nested test"}})
-
- def testMapList(self):
- self.check("map", {"list": [1, "two", 3.0, -4]})
-
- def testMapUUID(self):
- self.check("map", {"uuid": uuid4()})
-
- def testMapAll(self):
- decoded = self.check("map", {"string": "this is a test",
- "unicode": u"this is a unicode test",
- "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5",
- "int": 3,
- "long": 2**32,
- "timestamp": timestamp(0),
- "none": None,
- "map": {"string": "nested map"},
- "list": [1, "two", 3.0, -4],
- "uuid": uuid4()})
- assert isinstance(decoded["timestamp"], timestamp)
-
- def testMapEmpty(self):
- self.check("map", {})
-
- def testMapNone(self):
- self.check("map", None)
-
- def testList(self):
- self.check("list", [1, "two", 3.0, -4])
-
- def testListEmpty(self):
- self.check("list", [])
-
- def testListNone(self):
- self.check("list", None)
-
- def testArrayInt(self):
- self.check("array", [1, 2, 3, 4])
-
- def testArrayString(self):
- self.check("array", ["one", "two", "three", "four"])
-
- def testArrayEmpty(self):
- self.check("array", [])
-
- def testArrayNone(self):
- self.check("array", None)
-
- def testInt16(self):
- self.check("int16", 3)
- self.check("int16", -3)
-
- def testInt64(self):
- self.check("int64", 3)
- self.check("int64", -3)
- self.check("int64", 1<<34)
- self.check("int64", -(1<<34))
-
- def testDatetime(self):
- self.check("datetime", timestamp(0))
- self.check("datetime", timestamp(long(time.time())))
diff --git a/python/tests/connection.py b/python/tests/connection.py
deleted file mode 100644
index 8c00df56e1..0000000000
--- a/python/tests/connection.py
+++ /dev/null
@@ -1,222 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.connection import *
-from qpid.datatypes import Message
-from qpid.delegates import Server
-from qpid.queue import Queue
-from qpid.session import Delegate
-from qpid.ops import QueueQueryResult
-
-PORT = 1234
-
-class TestServer:
-
- def __init__(self, queue):
- self.queue = queue
-
- def connection(self, connection):
- return Server(connection, delegate=self.session)
-
- def session(self, session):
- session.auto_sync = False
- return TestSession(session, self.queue)
-
-class TestSession(Delegate):
-
- def __init__(self, session, queue):
- self.session = session
- self.queue = queue
-
- def execution_sync(self, es):
- pass
-
- def queue_query(self, qq):
- return QueueQueryResult(qq.queue)
-
- def message_transfer(self, cmd):
- if cmd.destination == "echo":
- m = Message(cmd.payload)
- m.headers = cmd.headers
- self.session.message_transfer(cmd.destination, cmd.accept_mode,
- cmd.acquire_mode, m)
- elif cmd.destination == "abort":
- self.session.channel.connection.sock.close()
- elif cmd.destination == "heartbeat":
- self.session.channel.connection_heartbeat()
- else:
- self.queue.put(cmd)
-
-class ConnectionTest(TestCase):
-
- def setUp(self):
- self.queue = Queue()
- self.running = True
- started = Event()
-
- def run():
- ts = TestServer(self.queue)
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- conn = Connection(s, delegate=ts.connection)
- try:
- conn.start(5)
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- connect("127.0.0.1", PORT).close()
- self.server.join(3)
-
- def connect(self, **kwargs):
- return Connection(connect("127.0.0.1", PORT), **kwargs)
-
- def test(self):
- c = self.connect()
- c.start(10)
-
- ssn1 = c.session("test1", timeout=10)
- ssn2 = c.session("test2", timeout=10)
-
- assert ssn1 == c.sessions["test1"]
- assert ssn2 == c.sessions["test2"]
- assert ssn1.channel != None
- assert ssn2.channel != None
- assert ssn1 in c.attached.values()
- assert ssn2 in c.attached.values()
-
- ssn1.close(5)
-
- assert ssn1.channel == None
- assert ssn1 not in c.attached.values()
- assert ssn2 in c.sessions.values()
-
- ssn2.close(5)
-
- assert ssn2.channel == None
- assert ssn2 not in c.attached.values()
- assert ssn2 not in c.sessions.values()
-
- ssn = c.session("session", timeout=10)
-
- assert ssn.channel != None
- assert ssn in c.sessions.values()
-
- destinations = ("one", "two", "three")
-
- for d in destinations:
- ssn.message_transfer(d)
-
- for d in destinations:
- cmd = self.queue.get(10)
- assert cmd.destination == d
- assert cmd.headers == None
- assert cmd.payload == None
-
- msg = Message("this is a test")
- ssn.message_transfer("four", message=msg)
- cmd = self.queue.get(10)
- assert cmd.destination == "four"
- assert cmd.headers == None
- assert cmd.payload == msg.body
-
- qq = ssn.queue_query("asdf")
- assert qq.queue == "asdf"
- c.close(5)
-
- def testCloseGet(self):
- c = self.connect()
- c.start(10)
- ssn = c.session("test", timeout=10)
- echos = ssn.incoming("echo")
-
- for i in range(10):
- ssn.message_transfer("echo", message=Message("test%d" % i))
-
- ssn.auto_sync=False
- ssn.message_transfer("abort")
-
- for i in range(10):
- m = echos.get(timeout=10)
- assert m.body == "test%d" % i
-
- try:
- m = echos.get(timeout=10)
- assert False
- except Closed, e:
- pass
-
- def testCloseListen(self):
- c = self.connect()
- c.start(10)
- ssn = c.session("test", timeout=10)
- echos = ssn.incoming("echo")
-
- messages = []
- exceptions = []
- condition = Condition()
- def listener(m): messages.append(m)
- def exc_listener(e):
- exceptions.append(e)
- condition.acquire()
- condition.notify()
- condition.release()
-
- echos.listen(listener, exc_listener)
-
- for i in range(10):
- ssn.message_transfer("echo", message=Message("test%d" % i))
-
- ssn.auto_sync=False
- ssn.message_transfer("abort")
-
- condition.acquire()
- condition.wait(10)
- condition.release()
-
- for i in range(10):
- m = messages.pop(0)
- assert m.body == "test%d" % i
-
- assert len(exceptions) == 1
-
- def testSync(self):
- c = self.connect()
- c.start(10)
- s = c.session("test")
- s.auto_sync = False
- s.message_transfer("echo", message=Message("test"))
- s.sync(10)
-
- def testHeartbeat(self):
- c = self.connect(heartbeat=10)
- c.start(10)
- s = c.session("test")
- s.channel.connection_heartbeat()
- s.message_transfer("heartbeat")
diff --git a/python/tests/datatypes.py b/python/tests/datatypes.py
deleted file mode 100644
index 00e649d6cf..0000000000
--- a/python/tests/datatypes.py
+++ /dev/null
@@ -1,296 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from unittest import TestCase
-from qpid.datatypes import *
-from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties
-
-class SerialTest(TestCase):
-
- def test(self):
- for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)):
- assert s + 1 > s
- assert s - 1 < s
- assert s < s + 1
- assert s > s - 1
-
- assert serial(0xFFFFFFFFL) + 1 == serial(0)
-
- assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL)
- assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0)
-
- def testIncr(self):
- s = serial(0)
- s += 1
- assert s == serial(1)
-
- def testIn(self):
- l = [serial(1), serial(2), serial(3), serial(4)]
- assert serial(1) in l
- assert serial(0xFFFFFFFFL + 2) in l
- assert 4 in l
-
- def testNone(self):
- assert serial(0) != None
-
- def testHash(self):
- d = {}
- d[serial(0)] = "zero"
- assert d[0] == "zero"
-
- def testAdd(self):
- assert serial(2) + 2 == serial(4)
- assert serial(2) + 2 == 4
-
- def testSub(self):
- delta = serial(4) - serial(2)
- assert isinstance(delta, int) or isinstance(delta, long)
- assert delta == 2
-
- delta = serial(4) - 2
- assert isinstance(delta, Serial)
- assert delta == serial(2)
-
-class RangedSetTest(TestCase):
-
- def check(self, ranges):
- posts = []
- for range in ranges:
- posts.append(range.lower)
- posts.append(range.upper)
-
- sorted = posts[:]
- sorted.sort()
-
- assert posts == sorted
-
- idx = 1
- while idx + 1 < len(posts):
- assert posts[idx] + 1 != posts[idx+1]
- idx += 2
-
- def test(self):
- rs = RangedSet()
-
- self.check(rs.ranges)
-
- rs.add(1)
-
- assert 1 in rs
- assert 2 not in rs
- assert 0 not in rs
- self.check(rs.ranges)
-
- rs.add(2)
-
- assert 0 not in rs
- assert 1 in rs
- assert 2 in rs
- assert 3 not in rs
- self.check(rs.ranges)
-
- rs.add(0)
-
- assert -1 not in rs
- assert 0 in rs
- assert 1 in rs
- assert 2 in rs
- assert 3 not in rs
- self.check(rs.ranges)
-
- rs.add(37)
-
- assert -1 not in rs
- assert 0 in rs
- assert 1 in rs
- assert 2 in rs
- assert 3 not in rs
- assert 36 not in rs
- assert 37 in rs
- assert 38 not in rs
- self.check(rs.ranges)
-
- rs.add(-1)
- self.check(rs.ranges)
-
- rs.add(-3)
- self.check(rs.ranges)
-
- rs.add(1, 20)
- assert 21 not in rs
- assert 20 in rs
- self.check(rs.ranges)
-
- def testAddSelf(self):
- a = RangedSet()
- a.add(0, 8)
- self.check(a.ranges)
- a.add(0, 8)
- self.check(a.ranges)
- assert len(a.ranges) == 1
- range = a.ranges[0]
- assert range.lower == 0
- assert range.upper == 8
-
- def testEmpty(self):
- s = RangedSet()
- assert s.empty()
- s.add(0, -1)
- assert s.empty()
- s.add(0, 0)
- assert not s.empty()
-
- def testMinMax(self):
- s = RangedSet()
- assert s.max() is None
- assert s.min() is None
- s.add(0, 10)
- assert s.max() == 10
- assert s.min() == 0
- s.add(0, 5)
- assert s.max() == 10
- assert s.min() == 0
- s.add(0, 11)
- assert s.max() == 11
- assert s.min() == 0
- s.add(15, 20)
- assert s.max() == 20
- assert s.min() == 0
- s.add(-10, -5)
- assert s.max() == 20
- assert s.min() == -10
-
-class RangeTest(TestCase):
-
- def testIntersect1(self):
- a = Range(0, 10)
- b = Range(9, 20)
- i1 = a.intersect(b)
- i2 = b.intersect(a)
- assert i1.upper == 10
- assert i2.upper == 10
- assert i1.lower == 9
- assert i2.lower == 9
-
- def testIntersect2(self):
- a = Range(0, 10)
- b = Range(11, 20)
- assert a.intersect(b) == None
- assert b.intersect(a) == None
-
- def testIntersect3(self):
- a = Range(0, 10)
- b = Range(3, 5)
- i1 = a.intersect(b)
- i2 = b.intersect(a)
- assert i1.upper == 5
- assert i2.upper == 5
- assert i1.lower == 3
- assert i2.lower == 3
-
-class UUIDTest(TestCase):
-
- def test(self):
- # this test is kind of lame, but it does excercise the basic
- # functionality of the class
- u = uuid4()
- for i in xrange(1024):
- assert u != uuid4()
-
-class MessageTest(TestCase):
-
- def setUp(self):
- self.mp = MessageProperties()
- self.dp = DeliveryProperties()
- self.fp = FragmentProperties()
-
- def testHas(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- assert m.has("message_properties")
- assert m.has("delivery_properties")
- assert m.has("fragment_properties")
-
- def testGet(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- assert m.get("message_properties") == self.mp
- assert m.get("delivery_properties") == self.dp
- assert m.get("fragment_properties") == self.fp
-
- def testSet(self):
- m = Message(self.mp, self.dp, "body")
- assert m.get("fragment_properties") is None
- m.set(self.fp)
- assert m.get("fragment_properties") == self.fp
-
- def testSetOnEmpty(self):
- m = Message("body")
- assert m.get("delivery_properties") is None
- m.set(self.dp)
- assert m.get("delivery_properties") == self.dp
-
- def testSetReplace(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- dp = DeliveryProperties()
- assert m.get("delivery_properties") == self.dp
- assert m.get("delivery_properties") != dp
- m.set(dp)
- assert m.get("delivery_properties") != self.dp
- assert m.get("delivery_properties") == dp
-
- def testClear(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- assert m.get("message_properties") == self.mp
- assert m.get("delivery_properties") == self.dp
- assert m.get("fragment_properties") == self.fp
- m.clear("fragment_properties")
- assert m.get("fragment_properties") is None
- assert m.get("message_properties") == self.mp
- assert m.get("delivery_properties") == self.dp
-
-class TimestampTest(TestCase):
-
- def check(self, expected, *values):
- for v in values:
- assert isinstance(v, timestamp)
- assert v == expected
- assert v == timestamp(expected)
-
- def testAdd(self):
- self.check(4.0,
- timestamp(2.0) + 2.0,
- 2.0 + timestamp(2.0))
-
- def testSub(self):
- self.check(2.0,
- timestamp(4.0) - 2.0,
- 4.0 - timestamp(2.0))
-
- def testNeg(self):
- self.check(-4.0, -timestamp(4.0))
-
- def testPos(self):
- self.check(+4.0, +timestamp(4.0))
-
- def testAbs(self):
- self.check(4.0, abs(timestamp(-4.0)))
-
- def testConversion(self):
- dt = timestamp(0).datetime()
- t = timestamp(dt)
- assert t == 0
diff --git a/python/tests/queue.py b/python/tests/queue.py
deleted file mode 100644
index e12354eb43..0000000000
--- a/python/tests/queue.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import threading, time
-from unittest import TestCase
-from qpid.queue import Queue, Empty, Closed
-
-
-class QueueTest (TestCase):
-
- # The qpid queue class just provides sime simple extensions to
- # python's standard queue data structure, so we don't need to test
- # all the queue functionality.
-
- def test_listen(self):
- values = []
- heard = threading.Event()
- def listener(x):
- values.append(x)
- heard.set()
-
- q = Queue(0)
- q.listen(listener)
- heard.clear()
- q.put(1)
- heard.wait()
- assert values[-1] == 1
- heard.clear()
- q.put(2)
- heard.wait()
- assert values[-1] == 2
-
- q.listen(None)
- q.put(3)
- assert q.get(3) == 3
- q.listen(listener)
-
- heard.clear()
- q.put(4)
- heard.wait()
- assert values[-1] == 4
-
- def test_close(self):
- q = Queue(0)
- q.put(1); q.put(2); q.put(3); q.close()
- assert q.get() == 1
- assert q.get() == 2
- assert q.get() == 3
- for i in range(10):
- try:
- q.get()
- raise AssertionError("expected Closed")
- except Closed:
- pass
diff --git a/python/tests/spec010.py b/python/tests/spec010.py
deleted file mode 100644
index ac04e1ee02..0000000000
--- a/python/tests/spec010.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, tempfile, shutil, stat
-from unittest import TestCase
-from qpid.codec010 import Codec, StringCodec
-from qpid.ops import *
-
-class SpecTest(TestCase):
-
- def testSessionHeader(self):
- sc = StringCodec()
- sc.write_compound(Header(sync=True))
- assert sc.encoded == "\x01\x01"
-
- sc = StringCodec()
- sc.write_compound(Header(sync=False))
- assert sc.encoded == "\x01\x00"
-
- def encdec(self, value):
- sc = StringCodec()
- sc.write_compound(value)
- decoded = sc.read_compound(value.__class__)
- return decoded
-
- def testMessageProperties(self):
- props = MessageProperties(content_length=3735928559L,
- reply_to=ReplyTo(exchange="the exchange name",
- routing_key="the routing key"))
- dec = self.encdec(props)
- assert props.content_length == dec.content_length
- assert props.reply_to.exchange == dec.reply_to.exchange
- assert props.reply_to.routing_key == dec.reply_to.routing_key
-
- def testMessageSubscribe(self):
- cmd = MessageSubscribe(exclusive=True, destination="this is a test")
- dec = self.encdec(cmd)
- assert cmd.exclusive == dec.exclusive
- assert cmd.destination == dec.destination
-
- def testXid(self):
- sc = StringCodec()
- xid = Xid(format=0, global_id="gid", branch_id="bid")
- sc.write_compound(xid)
- assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
- dec = sc.read_compound(Xid)
- assert xid.__dict__ == dec.__dict__
-
-# def testLoadReadOnly(self):
-# spec = "amqp.0-10-qpid-errata.xml"
-# f = testrunner.get_spec_file(spec)
-# dest = tempfile.mkdtemp()
-# shutil.copy(f, dest)
-# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
-# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
-# fname = os.path.join(dest, spec)
-# load(fname)
-# assert not os.path.exists("%s.pcl" % fname)