summaryrefslogtreecommitdiff
path: root/test/test_unpack.py
blob: aa4c01f80ce60f44b91dc0d598382e8f3769c06b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from io import BytesIO
import sys
from msgpack import Unpacker, packb, OutOfData, ExtType
from pytest import raises, mark

try:
    from itertools import izip as zip
except ImportError:
    pass


def test_unpack_array_header_from_file():
    f = BytesIO(packb([1, 2, 3, 4]))
    unpacker = Unpacker(f)
    assert unpacker.read_array_header() == 4
    assert unpacker.unpack() == 1
    assert unpacker.unpack() == 2
    assert unpacker.unpack() == 3
    assert unpacker.unpack() == 4
    with raises(OutOfData):
        unpacker.unpack()


@mark.skipif(
    "not hasattr(sys, 'getrefcount') == True",
    reason="sys.getrefcount() is needed to pass this test",
)
def test_unpacker_hook_refcnt():
    result = []

    def hook(x):
        result.append(x)
        return x

    basecnt = sys.getrefcount(hook)

    up = Unpacker(object_hook=hook, list_hook=hook)

    assert sys.getrefcount(hook) >= basecnt + 2

    up.feed(packb([{}]))
    up.feed(packb([{}]))
    assert up.unpack() == [{}]
    assert up.unpack() == [{}]
    assert result == [{}, [{}], {}, [{}]]

    del up

    assert sys.getrefcount(hook) == basecnt


def test_unpacker_ext_hook():
    class MyUnpacker(Unpacker):
        def __init__(self):
            super(MyUnpacker, self).__init__(ext_hook=self._hook, raw=False)

        def _hook(self, code, data):
            if code == 1:
                return int(data)
            else:
                return ExtType(code, data)

    unpacker = MyUnpacker()
    unpacker.feed(packb({"a": 1}))
    assert unpacker.unpack() == {"a": 1}
    unpacker.feed(packb({"a": ExtType(1, b"123")}))
    assert unpacker.unpack() == {"a": 123}
    unpacker.feed(packb({"a": ExtType(2, b"321")}))
    assert unpacker.unpack() == {"a": ExtType(2, b"321")}


def test_unpacker_tell():
    objects = 1, 2, u"abc", u"def", u"ghi"
    packed = b"\x01\x02\xa3abc\xa3def\xa3ghi"
    positions = 1, 2, 6, 10, 14
    unpacker = Unpacker(BytesIO(packed))
    for obj, unp, pos in zip(objects, unpacker, positions):
        assert obj == unp
        assert pos == unpacker.tell()


def test_unpacker_tell_read_bytes():
    objects = 1, u"abc", u"ghi"
    packed = b"\x01\x02\xa3abc\xa3def\xa3ghi"
    raw_data = b"\x02", b"\xa3def", b""
    lenghts = 1, 4, 999
    positions = 1, 6, 14
    unpacker = Unpacker(BytesIO(packed))
    for obj, unp, pos, n, raw in zip(objects, unpacker, positions, lenghts, raw_data):
        assert obj == unp
        assert pos == unpacker.tell()
        assert unpacker.read_bytes(n) == raw