From 10e5e39ff9739fa3ce589ad9d451260be0f3842c Mon Sep 17 00:00:00 2001 From: Inada Naoki Date: Thu, 5 Dec 2019 18:51:45 +0900 Subject: blacken test --- test/test_buffer.py | 17 +++---- test/test_case.py | 116 ++++++++++++++++++++++++++++++---------------- test/test_extension.py | 55 ++++++++++++---------- test/test_format.py | 92 ++++++++++++++++++++++-------------- test/test_limits.py | 46 ++++++++++-------- test/test_memoryview.py | 47 ++++++++++--------- test/test_newspec.py | 60 ++++++++++++------------ test/test_obj.py | 42 +++++++++++------ test/test_pack.py | 102 ++++++++++++++++++++++++++++++---------- test/test_read_size.py | 49 +++++++++++--------- test/test_seq.py | 11 +++-- test/test_sequnpack.py | 121 +++++++++++++++++++++++++++--------------------- test/test_stricttype.py | 29 ++++++------ test/test_subtype.py | 7 ++- test/test_timestamp.py | 14 +++--- test/test_unpack.py | 27 ++++++----- 16 files changed, 501 insertions(+), 334 deletions(-) diff --git a/test/test_buffer.py b/test/test_buffer.py index d723e8d..64fbdef 100644 --- a/test/test_buffer.py +++ b/test/test_buffer.py @@ -6,27 +6,28 @@ from msgpack import packb, unpackb def test_unpack_buffer(): from array import array - buf = array('b') + + buf = array("b") try: - buf.frombytes(packb((b'foo', b'bar'))) + buf.frombytes(packb((b"foo", b"bar"))) except AttributeError: # PY2 - buf.fromstring(packb((b'foo', b'bar'))) + buf.fromstring(packb((b"foo", b"bar"))) obj = unpackb(buf, use_list=1) - assert [b'foo', b'bar'] == obj + assert [b"foo", b"bar"] == obj def test_unpack_bytearray(): - buf = bytearray(packb(('foo', 'bar'))) + buf = bytearray(packb(("foo", "bar"))) obj = unpackb(buf, use_list=1) - assert [b'foo', b'bar'] == obj + assert [b"foo", b"bar"] == obj expected_type = bytes assert all(type(s) == expected_type for s in obj) def test_unpack_memoryview(): - buf = bytearray(packb(('foo', 'bar'))) + buf = bytearray(packb(("foo", "bar"))) view = memoryview(buf) obj = unpackb(view, use_list=1) - assert [b'foo', b'bar'] == obj + assert [b"foo", b"bar"] == obj expected_type = bytes assert all(type(s) == expected_type for s in obj) diff --git a/test/test_case.py b/test/test_case.py index 5a4bb6c..3bc1b26 100644 --- a/test/test_case.py +++ b/test/test_case.py @@ -6,97 +6,133 @@ from msgpack import packb, unpackb def check(length, obj): v = packb(obj) - assert len(v) == length, \ - "%r length should be %r but get %r" % (obj, length, len(v)) + assert len(v) == length, "%r length should be %r but get %r" % (obj, length, len(v)) assert unpackb(v, use_list=0) == obj + def test_1(): - for o in [None, True, False, 0, 1, (1 << 6), (1 << 7) - 1, -1, - -((1<<5)-1), -(1<<5)]: + for o in [ + None, + True, + False, + 0, + 1, + (1 << 6), + (1 << 7) - 1, + -1, + -((1 << 5) - 1), + -(1 << 5), + ]: check(1, o) + def test_2(): - for o in [1 << 7, (1 << 8) - 1, - -((1<<5)+1), -(1<<7) - ]: + for o in [1 << 7, (1 << 8) - 1, -((1 << 5) + 1), -(1 << 7)]: check(2, o) + def test_3(): - for o in [1 << 8, (1 << 16) - 1, - -((1<<7)+1), -(1<<15)]: + for o in [1 << 8, (1 << 16) - 1, -((1 << 7) + 1), -(1 << 15)]: check(3, o) + def test_5(): - for o in [1 << 16, (1 << 32) - 1, - -((1<<15)+1), -(1<<31)]: + for o in [1 << 16, (1 << 32) - 1, -((1 << 15) + 1), -(1 << 31)]: check(5, o) + def test_9(): - for o in [1 << 32, (1 << 64) - 1, - -((1<<31)+1), -(1<<63), - 1.0, 0.1, -0.1, -1.0]: + for o in [ + 1 << 32, + (1 << 64) - 1, + -((1 << 31) + 1), + -(1 << 63), + 1.0, + 0.1, + -0.1, + -1.0, + ]: check(9, o) def check_raw(overhead, num): check(num + overhead, b" " * num) + def test_fixraw(): check_raw(1, 0) - check_raw(1, (1<<5) - 1) + check_raw(1, (1 << 5) - 1) + def test_raw16(): - check_raw(3, 1<<5) - check_raw(3, (1<<16) - 1) + check_raw(3, 1 << 5) + check_raw(3, (1 << 16) - 1) + def test_raw32(): - check_raw(5, 1<<16) + check_raw(5, 1 << 16) def check_array(overhead, num): check(num + overhead, (None,) * num) + def test_fixarray(): check_array(1, 0) check_array(1, (1 << 4) - 1) + def test_array16(): check_array(3, 1 << 4) - check_array(3, (1<<16)-1) + check_array(3, (1 << 16) - 1) + def test_array32(): - check_array(5, (1<<16)) + check_array(5, (1 << 16)) def match(obj, buf): assert packb(obj) == buf assert unpackb(buf, use_list=0) == obj + def test_match(): cases = [ - (None, b'\xc0'), - (False, b'\xc2'), - (True, b'\xc3'), - (0, b'\x00'), - (127, b'\x7f'), - (128, b'\xcc\x80'), - (256, b'\xcd\x01\x00'), - (-1, b'\xff'), - (-33, b'\xd0\xdf'), - (-129, b'\xd1\xff\x7f'), - ({1:1}, b'\x81\x01\x01'), + (None, b"\xc0"), + (False, b"\xc2"), + (True, b"\xc3"), + (0, b"\x00"), + (127, b"\x7f"), + (128, b"\xcc\x80"), + (256, b"\xcd\x01\x00"), + (-1, b"\xff"), + (-33, b"\xd0\xdf"), + (-129, b"\xd1\xff\x7f"), + ({1: 1}, b"\x81\x01\x01"), (1.0, b"\xcb\x3f\xf0\x00\x00\x00\x00\x00\x00"), - ((), b'\x90'), - (tuple(range(15)),b"\x9f\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e"), - (tuple(range(16)),b"\xdc\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"), - ({}, b'\x80'), - (dict([(x,x) for x in range(15)]), b'\x8f\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e'), - (dict([(x,x) for x in range(16)]), b'\xde\x00\x10\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e\x0f\x0f'), - ] + ((), b"\x90"), + ( + tuple(range(15)), + b"\x9f\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e", + ), + ( + tuple(range(16)), + b"\xdc\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f", + ), + ({}, b"\x80"), + ( + dict([(x, x) for x in range(15)]), + b"\x8f\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e", + ), + ( + dict([(x, x) for x in range(16)]), + b"\xde\x00\x10\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06\x07\x07\x08\x08\t\t\n\n\x0b\x0b\x0c\x0c\r\r\x0e\x0e\x0f\x0f", + ), + ] for v, p in cases: match(v, p) -def test_unicode(): - assert unpackb(packb('foobar'), use_list=1) == b'foobar' +def test_unicode(): + assert unpackb(packb("foobar"), use_list=1) == b"foobar" diff --git a/test/test_extension.py b/test/test_extension.py index 8aa0cbb..6b36575 100644 --- a/test/test_extension.py +++ b/test/test_extension.py @@ -9,37 +9,41 @@ def test_pack_ext_type(): packer = msgpack.Packer() packer.pack_ext_type(0x42, s) return packer.bytes() - assert p(b'A') == b'\xd4\x42A' # fixext 1 - assert p(b'AB') == b'\xd5\x42AB' # fixext 2 - assert p(b'ABCD') == b'\xd6\x42ABCD' # fixext 4 - assert p(b'ABCDEFGH') == b'\xd7\x42ABCDEFGH' # fixext 8 - assert p(b'A'*16) == b'\xd8\x42' + b'A'*16 # fixext 16 - assert p(b'ABC') == b'\xc7\x03\x42ABC' # ext 8 - assert p(b'A'*0x0123) == b'\xc8\x01\x23\x42' + b'A'*0x0123 # ext 16 - assert p(b'A'*0x00012345) == b'\xc9\x00\x01\x23\x45\x42' + b'A'*0x00012345 # ext 32 + + assert p(b"A") == b"\xd4\x42A" # fixext 1 + assert p(b"AB") == b"\xd5\x42AB" # fixext 2 + assert p(b"ABCD") == b"\xd6\x42ABCD" # fixext 4 + assert p(b"ABCDEFGH") == b"\xd7\x42ABCDEFGH" # fixext 8 + assert p(b"A" * 16) == b"\xd8\x42" + b"A" * 16 # fixext 16 + assert p(b"ABC") == b"\xc7\x03\x42ABC" # ext 8 + assert p(b"A" * 0x0123) == b"\xc8\x01\x23\x42" + b"A" * 0x0123 # ext 16 + assert ( + p(b"A" * 0x00012345) == b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345 + ) # ext 32 def test_unpack_ext_type(): def check(b, expected): assert msgpack.unpackb(b) == expected - check(b'\xd4\x42A', ExtType(0x42, b'A')) # fixext 1 - check(b'\xd5\x42AB', ExtType(0x42, b'AB')) # fixext 2 - check(b'\xd6\x42ABCD', ExtType(0x42, b'ABCD')) # fixext 4 - check(b'\xd7\x42ABCDEFGH', ExtType(0x42, b'ABCDEFGH')) # fixext 8 - check(b'\xd8\x42' + b'A'*16, ExtType(0x42, b'A'*16)) # fixext 16 - check(b'\xc7\x03\x42ABC', ExtType(0x42, b'ABC')) # ext 8 - check(b'\xc8\x01\x23\x42' + b'A'*0x0123, - ExtType(0x42, b'A'*0x0123)) # ext 16 - check(b'\xc9\x00\x01\x23\x45\x42' + b'A'*0x00012345, - ExtType(0x42, b'A'*0x00012345)) # ext 32 + check(b"\xd4\x42A", ExtType(0x42, b"A")) # fixext 1 + check(b"\xd5\x42AB", ExtType(0x42, b"AB")) # fixext 2 + check(b"\xd6\x42ABCD", ExtType(0x42, b"ABCD")) # fixext 4 + check(b"\xd7\x42ABCDEFGH", ExtType(0x42, b"ABCDEFGH")) # fixext 8 + check(b"\xd8\x42" + b"A" * 16, ExtType(0x42, b"A" * 16)) # fixext 16 + check(b"\xc7\x03\x42ABC", ExtType(0x42, b"ABC")) # ext 8 + check(b"\xc8\x01\x23\x42" + b"A" * 0x0123, ExtType(0x42, b"A" * 0x0123)) # ext 16 + check( + b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345, + ExtType(0x42, b"A" * 0x00012345), + ) # ext 32 def test_extension_type(): def default(obj): - print('default called', obj) + print("default called", obj) if isinstance(obj, array.array): - typecode = 123 # application specific typecode + typecode = 123 # application specific typecode try: data = obj.tobytes() except AttributeError: @@ -48,24 +52,27 @@ def test_extension_type(): raise TypeError("Unknown type object %r" % (obj,)) def ext_hook(code, data): - print('ext_hook called', code, data) + print("ext_hook called", code, data) assert code == 123 - obj = array.array('d') + obj = array.array("d") try: obj.frombytes(data) except AttributeError: # PY2 obj.fromstring(data) return obj - obj = [42, b'hello', array.array('d', [1.1, 2.2, 3.3])] + obj = [42, b"hello", array.array("d", [1.1, 2.2, 3.3])] s = msgpack.packb(obj, default=default) obj2 = msgpack.unpackb(s, ext_hook=ext_hook) assert obj == obj2 + import sys -if sys.version > '3': + +if sys.version > "3": long = int + def test_overriding_hooks(): def default(obj): if isinstance(obj, long): diff --git a/test/test_format.py b/test/test_format.py index 5fec0c3..c2cdfbd 100644 --- a/test/test_format.py +++ b/test/test_format.py @@ -3,68 +3,88 @@ from msgpack import unpackb + def check(src, should, use_list=0): assert unpackb(src, use_list=use_list) == should + def testSimpleValue(): - check(b"\x93\xc0\xc2\xc3", - (None, False, True,)) + check(b"\x93\xc0\xc2\xc3", (None, False, True,)) + def testFixnum(): - check(b"\x92\x93\x00\x40\x7f\x93\xe0\xf0\xff", - ((0,64,127,), (-32,-16,-1,),) - ) + check(b"\x92\x93\x00\x40\x7f\x93\xe0\xf0\xff", ((0, 64, 127,), (-32, -16, -1,),)) + def testFixArray(): - check(b"\x92\x90\x91\x91\xc0", - ((),((None,),),), - ) + check( + b"\x92\x90\x91\x91\xc0", ((), ((None,),),), + ) + def testFixRaw(): - check(b"\x94\xa0\xa1a\xa2bc\xa3def", - (b"", b"a", b"bc", b"def",), - ) + check( + b"\x94\xa0\xa1a\xa2bc\xa3def", (b"", b"a", b"bc", b"def",), + ) + def testFixMap(): check( - b"\x82\xc2\x81\xc0\xc0\xc3\x81\xc0\x80", - {False: {None: None}, True:{None:{}}}, - ) + b"\x82\xc2\x81\xc0\xc0\xc3\x81\xc0\x80", + {False: {None: None}, True: {None: {}}}, + ) + def testUnsignedInt(): check( - b"\x99\xcc\x00\xcc\x80\xcc\xff\xcd\x00\x00\xcd\x80\x00" - b"\xcd\xff\xff\xce\x00\x00\x00\x00\xce\x80\x00\x00\x00" - b"\xce\xff\xff\xff\xff", - (0, 128, 255, 0, 32768, 65535, 0, 2147483648, 4294967295,), - ) + b"\x99\xcc\x00\xcc\x80\xcc\xff\xcd\x00\x00\xcd\x80\x00" + b"\xcd\xff\xff\xce\x00\x00\x00\x00\xce\x80\x00\x00\x00" + b"\xce\xff\xff\xff\xff", + (0, 128, 255, 0, 32768, 65535, 0, 2147483648, 4294967295,), + ) + def testSignedInt(): - check(b"\x99\xd0\x00\xd0\x80\xd0\xff\xd1\x00\x00\xd1\x80\x00" - b"\xd1\xff\xff\xd2\x00\x00\x00\x00\xd2\x80\x00\x00\x00" - b"\xd2\xff\xff\xff\xff", - (0, -128, -1, 0, -32768, -1, 0, -2147483648, -1,)) + check( + b"\x99\xd0\x00\xd0\x80\xd0\xff\xd1\x00\x00\xd1\x80\x00" + b"\xd1\xff\xff\xd2\x00\x00\x00\x00\xd2\x80\x00\x00\x00" + b"\xd2\xff\xff\xff\xff", + (0, -128, -1, 0, -32768, -1, 0, -2147483648, -1,), + ) + def testRaw(): - check(b"\x96\xda\x00\x00\xda\x00\x01a\xda\x00\x02ab\xdb\x00\x00" + check( + b"\x96\xda\x00\x00\xda\x00\x01a\xda\x00\x02ab\xdb\x00\x00" b"\x00\x00\xdb\x00\x00\x00\x01a\xdb\x00\x00\x00\x02ab", - (b"", b"a", b"ab", b"", b"a", b"ab")) + (b"", b"a", b"ab", b"", b"a", b"ab"), + ) + def testArray(): - check(b"\x96\xdc\x00\x00\xdc\x00\x01\xc0\xdc\x00\x02\xc2\xc3\xdd\x00" + check( + b"\x96\xdc\x00\x00\xdc\x00\x01\xc0\xdc\x00\x02\xc2\xc3\xdd\x00" b"\x00\x00\x00\xdd\x00\x00\x00\x01\xc0\xdd\x00\x00\x00\x02" b"\xc2\xc3", - ((), (None,), (False,True), (), (None,), (False,True)) - ) + ((), (None,), (False, True), (), (None,), (False, True)), + ) + def testMap(): check( b"\x96" - b"\xde\x00\x00" - b"\xde\x00\x01\xc0\xc2" - b"\xde\x00\x02\xc0\xc2\xc3\xc2" - b"\xdf\x00\x00\x00\x00" - b"\xdf\x00\x00\x00\x01\xc0\xc2" - b"\xdf\x00\x00\x00\x02\xc0\xc2\xc3\xc2", - ({}, {None: False}, {True: False, None: False}, {}, - {None: False}, {True: False, None: False})) + b"\xde\x00\x00" + b"\xde\x00\x01\xc0\xc2" + b"\xde\x00\x02\xc0\xc2\xc3\xc2" + b"\xdf\x00\x00\x00\x00" + b"\xdf\x00\x00\x00\x01\xc0\xc2" + b"\xdf\x00\x00\x00\x02\xc0\xc2\xc3\xc2", + ( + {}, + {None: False}, + {True: False, None: False}, + {}, + {None: False}, + {True: False, None: False}, + ), + ) diff --git a/test/test_limits.py b/test/test_limits.py index 8c7606f..6e85030 100644 --- a/test/test_limits.py +++ b/test/test_limits.py @@ -4,8 +4,14 @@ from __future__ import absolute_import, division, print_function, unicode_litera import pytest from msgpack import ( - packb, unpackb, Packer, Unpacker, ExtType, - PackOverflowError, PackValueError, UnpackValueError, + packb, + unpackb, + Packer, + Unpacker, + ExtType, + PackOverflowError, + PackValueError, + UnpackValueError, ) @@ -13,30 +19,30 @@ def test_integer(): x = -(2 ** 63) assert unpackb(packb(x)) == x with pytest.raises(PackOverflowError): - packb(x-1) + packb(x - 1) x = 2 ** 64 - 1 assert unpackb(packb(x)) == x with pytest.raises(PackOverflowError): - packb(x+1) + packb(x + 1) def test_array_header(): packer = Packer() - packer.pack_array_header(2**32-1) + packer.pack_array_header(2 ** 32 - 1) with pytest.raises(PackValueError): - packer.pack_array_header(2**32) + packer.pack_array_header(2 ** 32) def test_map_header(): packer = Packer() - packer.pack_map_header(2**32-1) + packer.pack_map_header(2 ** 32 - 1) with pytest.raises(PackValueError): - packer.pack_array_header(2**32) + packer.pack_array_header(2 ** 32) def test_max_str_len(): - d = 'x' * 3 + d = "x" * 3 packed = packb(d) unpacker = Unpacker(max_str_len=3, raw=False) @@ -50,7 +56,7 @@ def test_max_str_len(): def test_max_bin_len(): - d = b'x' * 3 + d = b"x" * 3 packed = packb(d, use_bin_type=True) unpacker = Unpacker(max_bin_len=3) @@ -64,7 +70,7 @@ def test_max_bin_len(): def test_max_array_len(): - d = [1,2,3] + d = [1, 2, 3] packed = packb(d) unpacker = Unpacker(max_array_len=3) @@ -107,8 +113,8 @@ def test_max_ext_len(): # PyPy fails following tests because of constant folding? # https://bugs.pypy.org/issue1721 -#@pytest.mark.skipif(True, reason="Requires very large memory.") -#def test_binary(): +# @pytest.mark.skipif(True, reason="Requires very large memory.") +# def test_binary(): # x = b'x' * (2**32 - 1) # assert unpackb(packb(x)) == x # del x @@ -117,8 +123,8 @@ def test_max_ext_len(): # packb(x) # # -#@pytest.mark.skipif(True, reason="Requires very large memory.") -#def test_string(): +# @pytest.mark.skipif(True, reason="Requires very large memory.") +# def test_string(): # x = 'x' * (2**32 - 1) # assert unpackb(packb(x)) == x # x += 'y' @@ -126,8 +132,8 @@ def test_max_ext_len(): # packb(x) # # -#@pytest.mark.skipif(True, reason="Requires very large memory.") -#def test_array(): +# @pytest.mark.skipif(True, reason="Requires very large memory.") +# def test_array(): # x = [0] * (2**32 - 1) # assert unpackb(packb(x)) == x # x.append(0) @@ -137,8 +143,9 @@ def test_max_ext_len(): # auto max len + def test_auto_max_array_len(): - packed = b'\xde\x00\x06zz' + packed = b"\xde\x00\x06zz" with pytest.raises(UnpackValueError): unpackb(packed, raw=False) @@ -147,9 +154,10 @@ def test_auto_max_array_len(): with pytest.raises(UnpackValueError): unpacker.unpack() + def test_auto_max_map_len(): # len(packed) == 6 -> max_map_len == 3 - packed = b'\xde\x00\x04zzz' + packed = b"\xde\x00\x04zzz" with pytest.raises(UnpackValueError): unpackb(packed, raw=False) diff --git a/test/test_memoryview.py b/test/test_memoryview.py index f6d74ed..e1b63b8 100644 --- a/test/test_memoryview.py +++ b/test/test_memoryview.py @@ -10,6 +10,7 @@ import sys # - array type only supports old buffer interface # - array.frombytes is not available, must use deprecated array.fromstring if sys.version_info[0] < 3: + def make_memoryview(obj): return memoryview(buffer(obj)) @@ -20,6 +21,8 @@ if sys.version_info[0] < 3: def get_data(a): return a.tostring() + + else: make_memoryview = memoryview @@ -49,64 +52,64 @@ def _runtest(format, nbytes, expected_header, expected_prefix, use_bin_type): # check packed header assert packed[:1] == expected_header # check packed length prefix, if any - assert packed[1:1+len(expected_prefix)] == expected_prefix + assert packed[1 : 1 + len(expected_prefix)] == expected_prefix # check packed data - assert packed[1+len(expected_prefix):] == original_data + assert packed[1 + len(expected_prefix) :] == original_data # check array unpacked correctly assert original_array == reconstructed_array def test_fixstr_from_byte(): - _runtest('B', 1, b'\xa1', b'', False) - _runtest('B', 31, b'\xbf', b'', False) + _runtest("B", 1, b"\xa1", b"", False) + _runtest("B", 31, b"\xbf", b"", False) def test_fixstr_from_float(): - _runtest('f', 4, b'\xa4', b'', False) - _runtest('f', 28, b'\xbc', b'', False) + _runtest("f", 4, b"\xa4", b"", False) + _runtest("f", 28, b"\xbc", b"", False) def test_str16_from_byte(): - _runtest('B', 2**8, b'\xda', b'\x01\x00', False) - _runtest('B', 2**16-1, b'\xda', b'\xff\xff', False) + _runtest("B", 2 ** 8, b"\xda", b"\x01\x00", False) + _runtest("B", 2 ** 16 - 1, b"\xda", b"\xff\xff", False) def test_str16_from_float(): - _runtest('f', 2**8, b'\xda', b'\x01\x00', False) - _runtest('f', 2**16-4, b'\xda', b'\xff\xfc', False) + _runtest("f", 2 ** 8, b"\xda", b"\x01\x00", False) + _runtest("f", 2 ** 16 - 4, b"\xda", b"\xff\xfc", False) def test_str32_from_byte(): - _runtest('B', 2**16, b'\xdb', b'\x00\x01\x00\x00', False) + _runtest("B", 2 ** 16, b"\xdb", b"\x00\x01\x00\x00", False) def test_str32_from_float(): - _runtest('f', 2**16, b'\xdb', b'\x00\x01\x00\x00', False) + _runtest("f", 2 ** 16, b"\xdb", b"\x00\x01\x00\x00", False) def test_bin8_from_byte(): - _runtest('B', 1, b'\xc4', b'\x01', True) - _runtest('B', 2**8-1, b'\xc4', b'\xff', True) + _runtest("B", 1, b"\xc4", b"\x01", True) + _runtest("B", 2 ** 8 - 1, b"\xc4", b"\xff", True) def test_bin8_from_float(): - _runtest('f', 4, b'\xc4', b'\x04', True) - _runtest('f', 2**8-4, b'\xc4', b'\xfc', True) + _runtest("f", 4, b"\xc4", b"\x04", True) + _runtest("f", 2 ** 8 - 4, b"\xc4", b"\xfc", True) def test_bin16_from_byte(): - _runtest('B', 2**8, b'\xc5', b'\x01\x00', True) - _runtest('B', 2**16-1, b'\xc5', b'\xff\xff', True) + _runtest("B", 2 ** 8, b"\xc5", b"\x01\x00", True) + _runtest("B", 2 ** 16 - 1, b"\xc5", b"\xff\xff", True) def test_bin16_from_float(): - _runtest('f', 2**8, b'\xc5', b'\x01\x00', True) - _runtest('f', 2**16-4, b'\xc5', b'\xff\xfc', True) + _runtest("f", 2 ** 8, b"\xc5", b"\x01\x00", True) + _runtest("f", 2 ** 16 - 4, b"\xc5", b"\xff\xfc", True) def test_bin32_from_byte(): - _runtest('B', 2**16, b'\xc6', b'\x00\x01\x00\x00', True) + _runtest("B", 2 ** 16, b"\xc6", b"\x00\x01\x00\x00", True) def test_bin32_from_float(): - _runtest('f', 2**16, b'\xc6', b'\x00\x01\x00\x00', True) + _runtest("f", 2 ** 16, b"\xc6", b"\x00\x01\x00\x00", True) diff --git a/test/test_newspec.py b/test/test_newspec.py index ab05029..f4f2a23 100644 --- a/test/test_newspec.py +++ b/test/test_newspec.py @@ -4,85 +4,87 @@ from msgpack import packb, unpackb, ExtType def test_str8(): - header = b'\xd9' - data = b'x' * 32 + header = b"\xd9" + data = b"x" * 32 b = packb(data.decode(), use_bin_type=True) assert len(b) == len(data) + 2 - assert b[0:2] == header + b'\x20' + assert b[0:2] == header + b"\x20" assert b[2:] == data assert unpackb(b) == data - data = b'x' * 255 + data = b"x" * 255 b = packb(data.decode(), use_bin_type=True) assert len(b) == len(data) + 2 - assert b[0:2] == header + b'\xff' + assert b[0:2] == header + b"\xff" assert b[2:] == data assert unpackb(b) == data def test_bin8(): - header = b'\xc4' - data = b'' + header = b"\xc4" + data = b"" b = packb(data, use_bin_type=True) assert len(b) == len(data) + 2 - assert b[0:2] == header + b'\x00' + assert b[0:2] == header + b"\x00" assert b[2:] == data assert unpackb(b) == data - data = b'x' * 255 + data = b"x" * 255 b = packb(data, use_bin_type=True) assert len(b) == len(data) + 2 - assert b[0:2] == header + b'\xff' + assert b[0:2] == header + b"\xff" assert b[2:] == data assert unpackb(b) == data def test_bin16(): - header = b'\xc5' - data = b'x' * 256 + header = b"\xc5" + data = b"x" * 256 b = packb(data, use_bin_type=True) assert len(b) == len(data) + 3 assert b[0:1] == header - assert b[1:3] == b'\x01\x00' + assert b[1:3] == b"\x01\x00" assert b[3:] == data assert unpackb(b) == data - data = b'x' * 65535 + data = b"x" * 65535 b = packb(data, use_bin_type=True) assert len(b) == len(data) + 3 assert b[0:1] == header - assert b[1:3] == b'\xff\xff' + assert b[1:3] == b"\xff\xff" assert b[3:] == data assert unpackb(b) == data def test_bin32(): - header = b'\xc6' - data = b'x' * 65536 + header = b"\xc6" + data = b"x" * 65536 b = packb(data, use_bin_type=True) assert len(b) == len(data) + 5 assert b[0:1] == header - assert b[1:5] == b'\x00\x01\x00\x00' + assert b[1:5] == b"\x00\x01\x00\x00" assert b[5:] == data assert unpackb(b) == data + def test_ext(): def check(ext, packed): assert packb(ext) == packed assert unpackb(packed) == ext - check(ExtType(0x42, b'Z'), b'\xd4\x42Z') # fixext 1 - check(ExtType(0x42, b'ZZ'), b'\xd5\x42ZZ') # fixext 2 - check(ExtType(0x42, b'Z'*4), b'\xd6\x42' + b'Z'*4) # fixext 4 - check(ExtType(0x42, b'Z'*8), b'\xd7\x42' + b'Z'*8) # fixext 8 - check(ExtType(0x42, b'Z'*16), b'\xd8\x42' + b'Z'*16) # fixext 16 + + check(ExtType(0x42, b"Z"), b"\xd4\x42Z") # fixext 1 + check(ExtType(0x42, b"ZZ"), b"\xd5\x42ZZ") # fixext 2 + check(ExtType(0x42, b"Z" * 4), b"\xd6\x42" + b"Z" * 4) # fixext 4 + check(ExtType(0x42, b"Z" * 8), b"\xd7\x42" + b"Z" * 8) # fixext 8 + check(ExtType(0x42, b"Z" * 16), b"\xd8\x42" + b"Z" * 16) # fixext 16 # ext 8 - check(ExtType(0x42, b''), b'\xc7\x00\x42') - check(ExtType(0x42, b'Z'*255), b'\xc7\xff\x42' + b'Z'*255) + check(ExtType(0x42, b""), b"\xc7\x00\x42") + check(ExtType(0x42, b"Z" * 255), b"\xc7\xff\x42" + b"Z" * 255) # ext 16 - check(ExtType(0x42, b'Z'*256), b'\xc8\x01\x00\x42' + b'Z'*256) - check(ExtType(0x42, b'Z'*0xffff), b'\xc8\xff\xff\x42' + b'Z'*0xffff) + check(ExtType(0x42, b"Z" * 256), b"\xc8\x01\x00\x42" + b"Z" * 256) + check(ExtType(0x42, b"Z" * 0xFFFF), b"\xc8\xff\xff\x42" + b"Z" * 0xFFFF) # ext 32 - check(ExtType(0x42, b'Z'*0x10000), b'\xc9\x00\x01\x00\x00\x42' + b'Z'*0x10000) + check(ExtType(0x42, b"Z" * 0x10000), b"\xc9\x00\x01\x00\x00\x42" + b"Z" * 0x10000) # needs large memory - #check(ExtType(0x42, b'Z'*0xffffffff), + # check(ExtType(0x42, b'Z'*0xffffffff), # b'\xc9\xff\xff\xff\xff\x42' + b'Z'*0xffffffff) diff --git a/test/test_obj.py b/test/test_obj.py index 390c1b6..0b99cea 100644 --- a/test/test_obj.py +++ b/test/test_obj.py @@ -4,64 +4,76 @@ from pytest import raises from msgpack import packb, unpackb + def _decode_complex(obj): - if b'__complex__' in obj: - return complex(obj[b'real'], obj[b'imag']) + if b"__complex__" in obj: + return complex(obj[b"real"], obj[b"imag"]) return obj + def _encode_complex(obj): if isinstance(obj, complex): - return {b'__complex__': True, b'real': 1, b'imag': 2} + return {b"__complex__": True, b"real": 1, b"imag": 2} return obj + def test_encode_hook(): - packed = packb([3, 1+2j], default=_encode_complex) + packed = packb([3, 1 + 2j], default=_encode_complex) unpacked = unpackb(packed, use_list=1) - assert unpacked[1] == {b'__complex__': True, b'real': 1, b'imag': 2} + assert unpacked[1] == {b"__complex__": True, b"real": 1, b"imag": 2} + def test_decode_hook(): - packed = packb([3, {b'__complex__': True, b'real': 1, b'imag': 2}]) + packed = packb([3, {b"__complex__": True, b"real": 1, b"imag": 2}]) unpacked = unpackb(packed, object_hook=_decode_complex, use_list=1) - assert unpacked[1] == 1+2j + assert unpacked[1] == 1 + 2j + def test_decode_pairs_hook(): packed = packb([3, {1: 2, 3: 4}]) prod_sum = 1 * 2 + 3 * 4 - unpacked = unpackb(packed, object_pairs_hook=lambda l: sum(k * v for k, v in l), use_list=1) + unpacked = unpackb( + packed, object_pairs_hook=lambda l: sum(k * v for k, v in l), use_list=1 + ) assert unpacked[1] == prod_sum + def test_only_one_obj_hook(): with raises(TypeError): - unpackb(b'', object_hook=lambda x: x, object_pairs_hook=lambda x: x) + unpackb(b"", object_hook=lambda x: x, object_pairs_hook=lambda x: x) + def test_bad_hook(): with raises(TypeError): - packed = packb([3, 1+2j], default=lambda o: o) + packed = packb([3, 1 + 2j], default=lambda o: o) unpacked = unpackb(packed, use_list=1) + def _arr_to_str(arr): - return ''.join(str(c) for c in arr) + return "".join(str(c) for c in arr) + def test_array_hook(): - packed = packb([1,2,3]) + packed = packb([1, 2, 3]) unpacked = unpackb(packed, list_hook=_arr_to_str, use_list=1) - assert unpacked == '123' + assert unpacked == "123" class DecodeError(Exception): pass + def bad_complex_decoder(o): raise DecodeError("Ooops!") def test_an_exception_in_objecthook1(): with raises(DecodeError): - packed = packb({1: {'__complex__': True, 'real': 1, 'imag': 2}}) + packed = packb({1: {"__complex__": True, "real": 1, "imag": 2}}) unpackb(packed, object_hook=bad_complex_decoder) def test_an_exception_in_objecthook2(): with raises(DecodeError): - packed = packb({1: [{'__complex__': True, 'real': 1, 'imag': 2}]}) + packed = packb({1: [{"__complex__": True, "real": 1, "imag": 2}]}) unpackb(packed, list_hook=bad_complex_decoder, use_list=1) diff --git a/test/test_pack.py b/test/test_pack.py index b6752e5..de212ef 100644 --- a/test/test_pack.py +++ b/test/test_pack.py @@ -17,20 +17,46 @@ def check(data, use_list=False): re = unpackb(packb(data), use_list=use_list) assert re == data + def testPack(): test_data = [ - 0, 1, 127, 128, 255, 256, 65535, 65536, 4294967295, 4294967296, - -1, -32, -33, -128, -129, -32768, -32769, -4294967296, -4294967297, - 1.0, - b"", b"a", b"a"*31, b"a"*32, - None, True, False, - (), ((),), ((), None,), + 0, + 1, + 127, + 128, + 255, + 256, + 65535, + 65536, + 4294967295, + 4294967296, + -1, + -32, + -33, + -128, + -129, + -32768, + -32769, + -4294967296, + -4294967297, + 1.0, + b"", + b"a", + b"a" * 31, + b"a" * 32, + None, + True, + False, + (), + ((),), + ((), None,), {None: 0}, - (1<<23), - ] + (1 << 23), + ] for td in test_data: check(td) + def testPackUnicode(): test_data = ["", "abcd", ["defgh"], "Русский текст"] for td in test_data: @@ -41,43 +67,64 @@ def testPackUnicode(): re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack() assert re == td + def testPackBytes(): test_data = [ - b"", b"abcd", (b"defgh",), - ] + b"", + b"abcd", + (b"defgh",), + ] for td in test_data: check(td) + def testPackByteArrays(): test_data = [ - bytearray(b""), bytearray(b"abcd"), (bytearray(b"defgh"),), - ] + bytearray(b""), + bytearray(b"abcd"), + (bytearray(b"defgh"),), + ] for td in test_data: check(td) -@pytest.mark.skipif(sys.version_info < (3,0), reason="Python 2 passes invalid surrogates") + +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates" +) def testIgnoreUnicodeErrors(): - re = unpackb(packb(b'abc\xeddef', use_bin_type=False), - raw=False, unicode_errors='ignore') + re = unpackb( + packb(b"abc\xeddef", use_bin_type=False), raw=False, unicode_errors="ignore" + ) assert re == "abcdef" + def testStrictUnicodeUnpack(): - packed = packb(b'abc\xeddef', use_bin_type=False) + packed = packb(b"abc\xeddef", use_bin_type=False) with pytest.raises(UnicodeDecodeError): unpackb(packed, raw=False, use_list=1) -@pytest.mark.skipif(sys.version_info < (3,0), reason="Python 2 passes invalid surrogates") + +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="Python 2 passes invalid surrogates" +) def testIgnoreErrorsPack(): - re = unpackb(packb(u"abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors='ignore'), raw=False, use_list=1) + re = unpackb( + packb("abc\uDC80\uDCFFdef", use_bin_type=True, unicode_errors="ignore"), + raw=False, + use_list=1, + ) assert re == "abcdef" + def testDecodeBinary(): re = unpackb(packb(b"abc"), use_list=1) assert re == b"abc" + def testPackFloat(): - assert packb(1.0, use_single_float=True) == b'\xca' + struct.pack(str('>f'), 1.0) - assert packb(1.0, use_single_float=False) == b'\xcb' + struct.pack(str('>d'), 1.0) + assert packb(1.0, use_single_float=True) == b"\xca" + struct.pack(str(">f"), 1.0) + assert packb(1.0, use_single_float=False) == b"\xcb" + struct.pack(str(">d"), 1.0) + def testArraySize(sizes=[0, 5, 50, 1000]): bio = BytesIO() @@ -92,6 +139,7 @@ def testArraySize(sizes=[0, 5, 50, 1000]): for size in sizes: assert unpacker.unpack() == list(range(size)) + def test_manualreset(sizes=[0, 5, 50, 1000]): packer = Packer(autoreset=False) for size in sizes: @@ -105,7 +153,8 @@ def test_manualreset(sizes=[0, 5, 50, 1000]): assert unpacker.unpack() == list(range(size)) packer.reset() - assert packer.bytes() == b'' + assert packer.bytes() == b"" + def testMapSize(sizes=[0, 5, 50, 1000]): bio = BytesIO() @@ -113,8 +162,8 @@ def testMapSize(sizes=[0, 5, 50, 1000]): for size in sizes: bio.write(packer.pack_map_header(size)) for i in range(size): - bio.write(packer.pack(i)) # key - bio.write(packer.pack(i * 2)) # value + bio.write(packer.pack(i)) # key + bio.write(packer.pack(i * 2)) # value bio.seek(0) unpacker = Unpacker(bio) @@ -123,21 +172,24 @@ def testMapSize(sizes=[0, 5, 50, 1000]): def test_odict(): - seq = [(b'one', 1), (b'two', 2), (b'three', 3), (b'four', 4)] + seq = [(b"one", 1), (b"two", 2), (b"three", 3), (b"four", 4)] od = OrderedDict(seq) assert unpackb(packb(od), use_list=1) == dict(seq) + def pair_hook(seq): return list(seq) + assert unpackb(packb(od), object_pairs_hook=pair_hook, use_list=1) == seq def test_pairlist(): - pairlist = [(b'a', 1), (2, b'b'), (b'foo', b'bar')] + pairlist = [(b"a", 1), (2, b"b"), (b"foo", b"bar")] packer = Packer() packed = packer.pack_map_pairs(pairlist) unpacked = unpackb(packed, object_pairs_hook=list) assert pairlist == unpacked + def test_get_buffer(): packer = Packer(autoreset=0, use_bin_type=True) packer.pack([1, 2]) diff --git a/test/test_read_size.py b/test/test_read_size.py index 4e6c2b9..8d8df64 100644 --- a/test/test_read_size.py +++ b/test/test_read_size.py @@ -1,66 +1,71 @@ """Test Unpacker's read_array_header and read_map_header methods""" from msgpack import packb, Unpacker, OutOfData + UnexpectedTypeException = ValueError + def test_read_array_header(): unpacker = Unpacker() - unpacker.feed(packb(['a', 'b', 'c'])) + unpacker.feed(packb(["a", "b", "c"])) assert unpacker.read_array_header() == 3 - assert unpacker.unpack() == b'a' - assert unpacker.unpack() == b'b' - assert unpacker.unpack() == b'c' + assert unpacker.unpack() == b"a" + assert unpacker.unpack() == b"b" + assert unpacker.unpack() == b"c" try: unpacker.unpack() - assert 0, 'should raise exception' + assert 0, "should raise exception" except OutOfData: - assert 1, 'okay' + assert 1, "okay" def test_read_map_header(): unpacker = Unpacker() - unpacker.feed(packb({'a': 'A'})) + unpacker.feed(packb({"a": "A"})) assert unpacker.read_map_header() == 1 - assert unpacker.unpack() == B'a' - assert unpacker.unpack() == B'A' + assert unpacker.unpack() == b"a" + assert unpacker.unpack() == b"A" try: unpacker.unpack() - assert 0, 'should raise exception' + assert 0, "should raise exception" except OutOfData: - assert 1, 'okay' + assert 1, "okay" + def test_incorrect_type_array(): unpacker = Unpacker() unpacker.feed(packb(1)) try: unpacker.read_array_header() - assert 0, 'should raise exception' + assert 0, "should raise exception" except UnexpectedTypeException: - assert 1, 'okay' + assert 1, "okay" + def test_incorrect_type_map(): unpacker = Unpacker() unpacker.feed(packb(1)) try: unpacker.read_map_header() - assert 0, 'should raise exception' + assert 0, "should raise exception" except UnexpectedTypeException: - assert 1, 'okay' + assert 1, "okay" + def test_correct_type_nested_array(): unpacker = Unpacker() - unpacker.feed(packb({'a': ['b', 'c', 'd']})) + unpacker.feed(packb({"a": ["b", "c", "d"]})) try: unpacker.read_array_header() - assert 0, 'should raise exception' + assert 0, "should raise exception" except UnexpectedTypeException: - assert 1, 'okay' + assert 1, "okay" + def test_incorrect_type_nested_map(): unpacker = Unpacker() - unpacker.feed(packb([{'a': 'b'}])) + unpacker.feed(packb([{"a": "b"}])) try: unpacker.read_map_header() - assert 0, 'should raise exception' + assert 0, "should raise exception" except UnexpectedTypeException: - assert 1, 'okay' - + assert 1, "okay" diff --git a/test/test_seq.py b/test/test_seq.py index fed9ff4..0d5d806 100644 --- a/test/test_seq.py +++ b/test/test_seq.py @@ -7,8 +7,9 @@ import msgpack binarydata = bytes(bytearray(range(256))) + def gen_binary_data(idx): - return binarydata[:idx % 300] + return binarydata[: idx % 300] def test_exceeding_unpacker_read_size(): @@ -18,10 +19,10 @@ def test_exceeding_unpacker_read_size(): NUMBER_OF_STRINGS = 6 read_size = 16 - # 5 ok for read_size=16, while 6 glibc detected *** python: double free or corruption (fasttop): - # 20 ok for read_size=256, while 25 segfaults / glibc detected *** python: double free or corruption (!prev) - # 40 ok for read_size=1024, while 50 introduces errors - # 7000 ok for read_size=1024*1024, while 8000 leads to glibc detected *** python: double free or corruption (!prev): + # 5 ok for read_size=16, while 6 glibc detected *** python: double free or corruption (fasttop): + # 20 ok for read_size=256, while 25 segfaults / glibc detected *** python: double free or corruption (!prev) + # 40 ok for read_size=1024, while 50 introduces errors + # 7000 ok for read_size=1024*1024, while 8000 leads to glibc detected *** python: double free or corruption (!prev): for idx in range(NUMBER_OF_STRINGS): data = gen_binary_data(idx) diff --git a/test/test_sequnpack.py b/test/test_sequnpack.py index 59718f5..e576571 100644 --- a/test/test_sequnpack.py +++ b/test/test_sequnpack.py @@ -10,102 +10,115 @@ from pytest import raises def test_partialdata(): unpacker = Unpacker() - unpacker.feed(b'\xa5') - with raises(StopIteration): next(iter(unpacker)) - unpacker.feed(b'h') - with raises(StopIteration): next(iter(unpacker)) - unpacker.feed(b'a') - with raises(StopIteration): next(iter(unpacker)) - unpacker.feed(b'l') - with raises(StopIteration): next(iter(unpacker)) - unpacker.feed(b'l') - with raises(StopIteration): next(iter(unpacker)) - unpacker.feed(b'o') - assert next(iter(unpacker)) == b'hallo' + unpacker.feed(b"\xa5") + with raises(StopIteration): + next(iter(unpacker)) + unpacker.feed(b"h") + with raises(StopIteration): + next(iter(unpacker)) + unpacker.feed(b"a") + with raises(StopIteration): + next(iter(unpacker)) + unpacker.feed(b"l") + with raises(StopIteration): + next(iter(unpacker)) + unpacker.feed(b"l") + with raises(StopIteration): + next(iter(unpacker)) + unpacker.feed(b"o") + assert next(iter(unpacker)) == b"hallo" + def test_foobar(): unpacker = Unpacker(read_size=3, use_list=1) - unpacker.feed(b'foobar') - assert unpacker.unpack() == ord(b'f') - assert unpacker.unpack() == ord(b'o') - assert unpacker.unpack() == ord(b'o') - assert unpacker.unpack() == ord(b'b') - assert unpacker.unpack() == ord(b'a') - assert unpacker.unpack() == ord(b'r') + unpacker.feed(b"foobar") + assert unpacker.unpack() == ord(b"f") + assert unpacker.unpack() == ord(b"o") + assert unpacker.unpack() == ord(b"o") + assert unpacker.unpack() == ord(b"b") + assert unpacker.unpack() == ord(b"a") + assert unpacker.unpack() == ord(b"r") with raises(OutOfData): unpacker.unpack() - unpacker.feed(b'foo') - unpacker.feed(b'bar') + unpacker.feed(b"foo") + unpacker.feed(b"bar") k = 0 - for o, e in zip(unpacker, 'foobarbaz'): + for o, e in zip(unpacker, "foobarbaz"): assert o == ord(e) k += 1 - assert k == len(b'foobar') + assert k == len(b"foobar") + def test_foobar_skip(): unpacker = Unpacker(read_size=3, use_list=1) - unpacker.feed(b'foobar') - assert unpacker.unpack() == ord(b'f') + unpacker.feed(b"foobar") + assert unpacker.unpack() == ord(b"f") unpacker.skip() - assert unpacker.unpack() == ord(b'o') + assert unpacker.unpack() == ord(b"o") unpacker.skip() - assert unpacker.unpack() == ord(b'a') + assert unpacker.unpack() == ord(b"a") unpacker.skip() with raises(OutOfData): unpacker.unpack() + def test_maxbuffersize(): with raises(ValueError): Unpacker(read_size=5, max_buffer_size=3) unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1) - unpacker.feed(b'fo') + unpacker.feed(b"fo") with raises(BufferFull): - unpacker.feed(b'ob') - unpacker.feed(b'o') - assert ord('f') == next(unpacker) - unpacker.feed(b'b') - assert ord('o') == next(unpacker) - assert ord('o') == next(unpacker) - assert ord('b') == next(unpacker) + unpacker.feed(b"ob") + unpacker.feed(b"o") + assert ord("f") == next(unpacker) + unpacker.feed(b"b") + assert ord("o") == next(unpacker) + assert ord("o") == next(unpacker) + assert ord("b") == next(unpacker) def test_readbytes(): unpacker = Unpacker(read_size=3) - unpacker.feed(b'foobar') - assert unpacker.unpack() == ord(b'f') - assert unpacker.read_bytes(3) == b'oob' - assert unpacker.unpack() == ord(b'a') - assert unpacker.unpack() == ord(b'r') + unpacker.feed(b"foobar") + assert unpacker.unpack() == ord(b"f") + assert unpacker.read_bytes(3) == b"oob" + assert unpacker.unpack() == ord(b"a") + assert unpacker.unpack() == ord(b"r") # Test buffer refill - unpacker = Unpacker(io.BytesIO(b'foobar'), read_size=3) - assert unpacker.unpack() == ord(b'f') - assert unpacker.read_bytes(3) == b'oob' - assert unpacker.unpack() == ord(b'a') - assert unpacker.unpack() == ord(b'r') + unpacker = Unpacker(io.BytesIO(b"foobar"), read_size=3) + assert unpacker.unpack() == ord(b"f") + assert unpacker.read_bytes(3) == b"oob" + assert unpacker.unpack() == ord(b"a") + assert unpacker.unpack() == ord(b"r") + def test_issue124(): unpacker = Unpacker() - unpacker.feed(b'\xa1?\xa1!') - assert tuple(unpacker) == (b'?', b'!') + unpacker.feed(b"\xa1?\xa1!") + assert tuple(unpacker) == (b"?", b"!") assert tuple(unpacker) == () unpacker.feed(b"\xa1?\xa1") - assert tuple(unpacker) == (b'?',) + assert tuple(unpacker) == (b"?",) assert tuple(unpacker) == () unpacker.feed(b"!") - assert tuple(unpacker) == (b'!',) + assert tuple(unpacker) == (b"!",) assert tuple(unpacker) == () def test_unpack_tell(): stream = io.BytesIO() - messages = [2**i-1 for i in range(65)] - messages += [-(2**i) for i in range(1, 64)] - messages += [b'hello', b'hello'*1000, list(range(20)), - {i: bytes(i)*i for i in range(10)}, - {i: bytes(i)*i for i in range(32)}] + messages = [2 ** i - 1 for i in range(65)] + messages += [-(2 ** i) for i in range(1, 64)] + messages += [ + b"hello", + b"hello" * 1000, + list(range(20)), + {i: bytes(i) * i for i in range(10)}, + {i: bytes(i) * i for i in range(32)}, + ] offsets = [] for m in messages: pack(m, stream) diff --git a/test/test_stricttype.py b/test/test_stricttype.py index 87e7c1c..78e1723 100644 --- a/test/test_stricttype.py +++ b/test/test_stricttype.py @@ -5,30 +5,32 @@ from msgpack import packb, unpackb, ExtType def test_namedtuple(): - T = namedtuple('T', "foo bar") + T = namedtuple("T", "foo bar") + def default(o): if isinstance(o, T): return dict(o._asdict()) - raise TypeError('Unsupported type %s' % (type(o),)) + raise TypeError("Unsupported type %s" % (type(o),)) + packed = packb(T(1, 42), strict_types=True, use_bin_type=True, default=default) unpacked = unpackb(packed, raw=False) - assert unpacked == {'foo': 1, 'bar': 42} + assert unpacked == {"foo": 1, "bar": 42} def test_tuple(): - t = ('one', 2, b'three', (4, )) + t = ("one", 2, b"three", (4,)) def default(o): if isinstance(o, tuple): return { - '__type__': 'tuple', - 'value': list(o), - } - raise TypeError('Unsupported type %s' % (type(o),)) + "__type__": "tuple", + "value": list(o), + } + raise TypeError("Unsupported type %s" % (type(o),)) def convert(o): - if o.get('__type__') == 'tuple': - return tuple(o['value']) + if o.get("__type__") == "tuple": + return tuple(o["value"]) return o data = packb(t, strict_types=True, use_bin_type=True, default=default) @@ -38,7 +40,7 @@ def test_tuple(): def test_tuple_ext(): - t = ('one', 2, b'three', (4, )) + t = ("one", 2, b"three", (4,)) MSGPACK_EXT_TYPE_TUPLE = 0 @@ -46,7 +48,8 @@ def test_tuple_ext(): if isinstance(o, tuple): # Convert to list and pack payload = packb( - list(o), strict_types=True, use_bin_type=True, default=default) + list(o), strict_types=True, use_bin_type=True, default=default + ) return ExtType(MSGPACK_EXT_TYPE_TUPLE, payload) raise TypeError(repr(o)) @@ -54,7 +57,7 @@ def test_tuple_ext(): if code == MSGPACK_EXT_TYPE_TUPLE: # Unpack and convert to tuple return tuple(unpackb(payload, raw=False, ext_hook=convert)) - raise ValueError('Unknown Ext code {}'.format(code)) + raise ValueError("Unknown Ext code {}".format(code)) data = packb(t, strict_types=True, use_bin_type=True, default=default) expected = unpackb(data, raw=False, ext_hook=convert) diff --git a/test/test_subtype.py b/test/test_subtype.py index 6807508..d91d455 100644 --- a/test/test_subtype.py +++ b/test/test_subtype.py @@ -4,16 +4,21 @@ from msgpack import packb, unpackb from collections import namedtuple + class MyList(list): pass + class MyDict(dict): pass + class MyTuple(tuple): pass -MyNamedTuple = namedtuple('MyNamedTuple', 'x y') + +MyNamedTuple = namedtuple("MyNamedTuple", "x y") + def test_types(): assert packb(MyDict()) == packb(dict()) diff --git a/test/test_timestamp.py b/test/test_timestamp.py index 55c2f6d..1348e69 100644 --- a/test/test_timestamp.py +++ b/test/test_timestamp.py @@ -4,34 +4,34 @@ from msgpack import Timestamp def test_timestamp(): # timestamp32 - ts = Timestamp(2**32 - 1) + ts = Timestamp(2 ** 32 - 1) assert ts.to_bytes() == b"\xff\xff\xff\xff" packed = msgpack.packb(ts) assert packed == b"\xd6\xff" + ts.to_bytes() unpacked = msgpack.unpackb(packed) assert ts == unpacked - assert ts.seconds == 2**32 - 1 and ts.nanoseconds == 0 + assert ts.seconds == 2 ** 32 - 1 and ts.nanoseconds == 0 # timestamp64 - ts = Timestamp(2**34 - 1, 999999999) + ts = Timestamp(2 ** 34 - 1, 999999999) assert ts.to_bytes() == b"\xee\x6b\x27\xff\xff\xff\xff\xff" packed = msgpack.packb(ts) assert packed == b"\xd7\xff" + ts.to_bytes() unpacked = msgpack.unpackb(packed) assert ts == unpacked - assert ts.seconds == 2**34 - 1 and ts.nanoseconds == 999999999 + assert ts.seconds == 2 ** 34 - 1 and ts.nanoseconds == 999999999 # timestamp96 - ts = Timestamp(2**63 - 1, 999999999) + ts = Timestamp(2 ** 63 - 1, 999999999) assert ts.to_bytes() == b"\x3b\x9a\xc9\xff\x7f\xff\xff\xff\xff\xff\xff\xff" packed = msgpack.packb(ts) assert packed == b"\xc7\x0c\xff" + ts.to_bytes() unpacked = msgpack.unpackb(packed) assert ts == unpacked - assert ts.seconds == 2**63 - 1 and ts.nanoseconds == 999999999 + assert ts.seconds == 2 ** 63 - 1 and ts.nanoseconds == 999999999 # negative fractional - ts = Timestamp(-2.3) #s: -3, ns: 700000000 + ts = Timestamp(-2.3) # s: -3, ns: 700000000 assert ts.to_bytes() == b"\x29\xb9\x27\x00\xff\xff\xff\xff\xff\xff\xff\xfd" packed = msgpack.packb(ts) assert packed == b"\xc7\x0c\xff" + ts.to_bytes() diff --git a/test/test_unpack.py b/test/test_unpack.py index 00a1061..bc74c4d 100644 --- a/test/test_unpack.py +++ b/test/test_unpack.py @@ -5,7 +5,7 @@ from pytest import raises, mark def test_unpack_array_header_from_file(): - f = BytesIO(packb([1,2,3,4])) + f = BytesIO(packb([1, 2, 3, 4])) unpacker = Unpacker(f) assert unpacker.read_array_header() == 4 assert unpacker.unpack() == 1 @@ -16,8 +16,10 @@ def test_unpack_array_header_from_file(): unpacker.unpack() -@mark.skipif("not hasattr(sys, 'getrefcount') == True", - reason='sys.getrefcount() is needed to pass this test') +@mark.skipif( + "not hasattr(sys, 'getrefcount') == True", + reason="sys.getrefcount() is needed to pass this test", +) def test_unpacker_hook_refcnt(): result = [] @@ -43,12 +45,9 @@ def test_unpacker_hook_refcnt(): def test_unpacker_ext_hook(): - class MyUnpacker(Unpacker): - def __init__(self): - super(MyUnpacker, self).__init__( - ext_hook=self._hook, raw=False) + super(MyUnpacker, self).__init__(ext_hook=self._hook, raw=False) def _hook(self, code, data): if code == 1: @@ -57,15 +56,15 @@ def test_unpacker_ext_hook(): return ExtType(code, data) unpacker = MyUnpacker() - unpacker.feed(packb({'a': 1})) - assert unpacker.unpack() == {'a': 1} - unpacker.feed(packb({'a': ExtType(1, b'123')})) - assert unpacker.unpack() == {'a': 123} - unpacker.feed(packb({'a': ExtType(2, b'321')})) - assert unpacker.unpack() == {'a': ExtType(2, b'321')} + unpacker.feed(packb({"a": 1})) + assert unpacker.unpack() == {"a": 1} + unpacker.feed(packb({"a": ExtType(1, b"123")})) + assert unpacker.unpack() == {"a": 123} + unpacker.feed(packb({"a": ExtType(2, b"321")})) + assert unpacker.unpack() == {"a": ExtType(2, b"321")} -if __name__ == '__main__': +if __name__ == "__main__": test_unpack_array_header_from_file() test_unpacker_hook_refcnt() test_unpacker_ext_hook() -- cgit v1.2.1