diff options
Diffstat (limited to 'qpid/python/qpid/tests/codec.py')
-rw-r--r-- | qpid/python/qpid/tests/codec.py | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/qpid/python/qpid/tests/codec.py b/qpid/python/qpid/tests/codec.py new file mode 100644 index 0000000000..8fd0528636 --- /dev/null +++ b/qpid/python/qpid/tests/codec.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from qpid.codec import Codec +from qpid.spec08 import load +from cStringIO import StringIO +from qpid.reference import ReferenceId + +__doc__ = """ + + This is a unit test script for qpid/codec.py + + It can be run standalone or as part of the existing test framework. + + To run standalone: + ------------------- + + Place in the qpid/python/tests/ directory and type... + + python codec.py + + A brief output will be printed on screen. The verbose output will be placed inn a file called + codec_unit_test_output.txt. [TODO: make this filename configurable] + + To run as part of the existing test framework: + ----------------------------------------------- + + python run-tests tests.codec + + Change History: + ----------------- + Jimmy John 05/19/2007 Initial draft + Jimmy John 05/22/2007 Implemented comments by Rafael Schloming + + +""" + +from qpid.specs_config import amqp_spec_0_8 +SPEC = load(amqp_spec_0_8) + +# -------------------------------------- +# -------------------------------------- +class BaseDataTypes(unittest.TestCase): + + + """ + Base class containing common functions + """ + + # --------------- + def setUp(self): + """ + standard setUp for unitetest (refer unittest documentation for details) + """ + self.codec = Codec(StringIO(), SPEC) + + # ------------------ + def tearDown(self): + """ + standard tearDown for unitetest (refer unittest documentation for details) + """ + self.codec.stream.flush() + self.codec.stream.close() + + # ---------------------------------------- + def callFunc(self, functionName, *args): + """ + helper function - given a function name and arguments, calls the function with the args and + returns the contents of the stream + """ + getattr(self.codec, functionName)(args[0]) + return self.codec.stream.getvalue() + + # ---------------------------------------- + def readFunc(self, functionName, *args): + """ + helper function - creates a input stream and then calls the function with arguments as have been + supplied + """ + self.codec.stream = StringIO(args[0]) + return getattr(self.codec, functionName)() + + +# ---------------------------------------- +# ---------------------------------------- +class IntegerTestCase(BaseDataTypes): + + """ + Handles octet, short, long, long long + + """ + + # ------------------------- + def __init__(self, *args): + """ + sets constants for use in tests + """ + + BaseDataTypes.__init__(self, *args) + self.const_integer = 2 + self.const_integer_octet_encoded = '\x02' + self.const_integer_short_encoded = '\x00\x02' + self.const_integer_long_encoded = '\x00\x00\x00\x02' + self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' + + # -------------------------- # + # Unsigned Octect - 8 bits # + # -------------------------- # + + # -------------------------- + def test_unsigned_octet(self): + """ + ubyte format requires 0<=number<=255 + """ + self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') + + # ------------------------------------------- + def test_octet_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_octet, 256) + + # ------------------------------------------- + def test_uoctet_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_octet, -1) + + # --------------------------------- + def test_uoctet_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') + + # ------------------------------------ + def test_unsigned_octet_decode(self): + """ + octet decoding + """ + self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') + + # ----------------------------------- # + # Unsigned Short Integers - 16 bits # + # ----------------------------------- # + + # ----------------------- + def test_ushort_int(self): + """ + testing unsigned short integer + """ + self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') + + # ------------------------------------------- + def test_ushort_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_short, 65536) + + # ------------------------------------------- + def test_ushort_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_short, -1) + + # --------------------------------- + def test_ushort_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') + + # ------------------------------------ + def test_ushort_int_decode(self): + """ + unsigned short decoding + """ + self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') + + + # ---------------------------------- # + # Unsigned Long Integers - 32 bits # + # ---------------------------------- # + + # ----------------------- + def test_ulong_int(self): + """ + testing unsigned long iteger + """ + self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') + + # ------------------------------------------- + def test_ulong_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) + + # ------------------------------------------- + def test_ulong_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_long, -1) + + # --------------------------------- + def test_ulong_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') + + # ------------------------------- + def test_ulong_int_decode(self): + """ + unsigned long decoding + """ + self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') + + + # --------------------------------------- # + # Unsigned Long Long Integers - 64 bits # + # --------------------------------------- # + + # ----------------------- + def test_ulong_long_int(self): + """ + testing unsinged long long integer + """ + self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') + + # ------------------------------------------- + def test_ulong_long_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) + + # ------------------------------------------- + def test_ulong_long_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) + + # --------------------------------- + def test_ulong_long_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') + + # ------------------------------------ + def test_ulong_long_int_decode(self): + """ + unsigned long long decoding + """ + self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') + +# ----------------------------------- +# ----------------------------------- +class BitTestCase(BaseDataTypes): + + """ + Handles bits + """ + + # ---------------------------------------------- + def callFunc(self, functionName, *args): + """ + helper function + """ + for ele in args: + getattr(self.codec, functionName)(ele) + + self.codec.flush() + return self.codec.stream.getvalue() + + # ------------------- + def test_bit1(self): + """ + sends in 11 + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') + + # ------------------- + def test_bit2(self): + """ + sends in 10011 + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') + + # ------------------- + def test_bit3(self): + """ + sends in 1110100111 [10 bits(right to left), should be compressed into two octets] + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') + + # ------------------------------------ + def test_bit_decode_1(self): + """ + decode bit 1 + """ + self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') + + # ------------------------------------ + def test_bit_decode_0(self): + """ + decode bit 0 + """ + self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') + +# ----------------------------------- +# ----------------------------------- +class StringTestCase(BaseDataTypes): + + """ + Handles short strings, long strings + """ + + # ------------------------------------------------------------- # + # Short Strings - 8 bit length followed by zero or more octets # + # ------------------------------------------------------------- # + + # --------------------------------------- + def test_short_string_zero_length(self): + """ + 0 length short string + """ + self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') + + # ------------------------------------------- + def test_short_string_positive_length(self): + """ + positive length short string + """ + self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') + + # ------------------------------------------- + def test_short_string_out_of_upper_range(self): + """ + string length > 255 + """ + self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) + + # ------------------------------------ + def test_short_string_decode(self): + """ + short string decode + """ + self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') + + + # ------------------------------------------------------------- # + # Long Strings - 32 bit length followed by zero or more octets # + # ------------------------------------------------------------- # + + # --------------------------------------- + def test_long_string_zero_length(self): + """ + 0 length long string + """ + self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') + + # ------------------------------------------- + def test_long_string_positive_length(self): + """ + positive length long string + """ + self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') + + # ------------------------------------ + def test_long_string_decode(self): + """ + long string decode + """ + self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') + + +# -------------------------------------- +# -------------------------------------- +class TimestampTestCase(BaseDataTypes): + + """ + No need of any test cases here as timestamps are implemented as long long which is tested above + """ + pass + +# --------------------------------------- +# --------------------------------------- +class FieldTableTestCase(BaseDataTypes): + + """ + Handles Field Tables + + Only S/I type messages seem to be implemented currently + """ + + # ------------------------- + def __init__(self, *args): + """ + sets constants for use in tests + """ + + BaseDataTypes.__init__(self, *args) + self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} + self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' + + # ------------------------------------------- + def test_field_table_name_value_pair(self): + """ + valid name value pair + """ + self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') + + # --------------------------------------------------- + def test_field_table_multiple_name_value_pair(self): + """ + multiple name value pair + """ + self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') + + # ------------------------------------ + def test_field_table_decode(self): + """ + field table decode + """ + self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') + + +# ------------------------------------ +# ------------------------------------ +class ContentTestCase(BaseDataTypes): + + """ + Handles Content data types + """ + + # ----------------------------- + def test_content_inline(self): + """ + inline content + """ + self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') + + # -------------------------------- + def test_content_reference(self): + """ + reference content + """ + self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') + + # ------------------------------------ + def test_content_inline_decode(self): + """ + inline content decode + """ + self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') + + # ------------------------------------ + def test_content_reference_decode(self): + """ + reference content decode + """ + self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') + +# ------------------------ # +# Pre - existing test code # +# ------------------------ # + +# --------------------- +def test(type, value): + """ + old test function cut/copy/paste from qpid/codec.py + """ + if isinstance(value, (list, tuple)): + values = value + else: + values = [value] + stream = StringIO() + codec = Codec(stream, SPEC) + for v in values: + codec.encode(type, v) + codec.flush() + enc = stream.getvalue() + stream.reset() + dup = [] + for i in xrange(len(values)): + dup.append(codec.decode(type)) + if values != dup: + raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) + +# ----------------------- +def dotest(type, value): + """ + old test function cut/copy/paste from qpid/codec.py + """ + args = (type, value) + test(*args) + +# ------------- +def oldtests(): + """ + old test function cut/copy/paste from qpid/codec.py + """ + for value in ("1", "0", "110", "011", "11001", "10101", "10011"): + for i in range(10): + dotest("bit", map(lambda x: x == "1", value*i)) + + for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): + dotest("table", value) + + for type in ("octet", "short", "long", "longlong"): + for value in range(0, 256): + dotest(type, value) + + for type in ("shortstr", "longstr"): + for value in ("", "a", "asdf"): + dotest(type, value) + +# ----------------------------------------- +class oldTests(unittest.TestCase): + + """ + class to handle pre-existing test cases + """ + + # --------------------------- + def test_oldtestcases(self): + """ + call the old tests + """ + return oldtests() + +# --------------------------- +# --------------------------- +if __name__ == '__main__': + + codec_test_suite = unittest.TestSuite() + + #adding all the test suites... + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) + + #loading pre-existing test case from qpid/codec.py + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) + + run_output_stream = StringIO() + test_runner = unittest.TextTestRunner(run_output_stream, '', '') + test_result = test_runner.run(codec_test_suite) + + print '\n%d test run...' % (test_result.testsRun) + + if test_result.wasSuccessful(): + print '\nAll tests successful\n' + + if test_result.failures: + print '\n----------' + print '%d FAILURES:' % (len(test_result.failures)) + print '----------\n' + for failure in test_result.failures: + print str(failure[0]) + ' ... FAIL' + + if test_result.errors: + print '\n---------' + print '%d ERRORS:' % (len(test_result.errors)) + print '---------\n' + + for error in test_result.errors: + print str(error[0]) + ' ... ERROR' + + f = open('codec_unit_test_output.txt', 'w') + f.write(str(run_output_stream.getvalue())) + f.close() |