summaryrefslogtreecommitdiff
path: root/test/test_extension.py
blob: 6b3657511223d19e9c11b20300fd4eff1c67ea4e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import print_function
import array
import msgpack
from msgpack import ExtType


def test_pack_ext_type():
    def p(s):
        packer = msgpack.Packer()
        packer.pack_ext_type(0x42, s)
        return packer.bytes()

    assert p(b"A") == b"\xd4\x42A"  # fixext 1
    assert p(b"AB") == b"\xd5\x42AB"  # fixext 2
    assert p(b"ABCD") == b"\xd6\x42ABCD"  # fixext 4
    assert p(b"ABCDEFGH") == b"\xd7\x42ABCDEFGH"  # fixext 8
    assert p(b"A" * 16) == b"\xd8\x42" + b"A" * 16  # fixext 16
    assert p(b"ABC") == b"\xc7\x03\x42ABC"  # ext 8
    assert p(b"A" * 0x0123) == b"\xc8\x01\x23\x42" + b"A" * 0x0123  # ext 16
    assert (
        p(b"A" * 0x00012345) == b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345
    )  # ext 32


def test_unpack_ext_type():
    def check(b, expected):
        assert msgpack.unpackb(b) == expected

    check(b"\xd4\x42A", ExtType(0x42, b"A"))  # fixext 1
    check(b"\xd5\x42AB", ExtType(0x42, b"AB"))  # fixext 2
    check(b"\xd6\x42ABCD", ExtType(0x42, b"ABCD"))  # fixext 4
    check(b"\xd7\x42ABCDEFGH", ExtType(0x42, b"ABCDEFGH"))  # fixext 8
    check(b"\xd8\x42" + b"A" * 16, ExtType(0x42, b"A" * 16))  # fixext 16
    check(b"\xc7\x03\x42ABC", ExtType(0x42, b"ABC"))  # ext 8
    check(b"\xc8\x01\x23\x42" + b"A" * 0x0123, ExtType(0x42, b"A" * 0x0123))  # ext 16
    check(
        b"\xc9\x00\x01\x23\x45\x42" + b"A" * 0x00012345,
        ExtType(0x42, b"A" * 0x00012345),
    )  # ext 32


def test_extension_type():
    def default(obj):
        print("default called", obj)
        if isinstance(obj, array.array):
            typecode = 123  # application specific typecode
            try:
                data = obj.tobytes()
            except AttributeError:
                data = obj.tostring()
            return ExtType(typecode, data)
        raise TypeError("Unknown type object %r" % (obj,))

    def ext_hook(code, data):
        print("ext_hook called", code, data)
        assert code == 123
        obj = array.array("d")
        try:
            obj.frombytes(data)
        except AttributeError:  # PY2
            obj.fromstring(data)
        return obj

    obj = [42, b"hello", array.array("d", [1.1, 2.2, 3.3])]
    s = msgpack.packb(obj, default=default)
    obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
    assert obj == obj2


import sys

if sys.version > "3":
    long = int


def test_overriding_hooks():
    def default(obj):
        if isinstance(obj, long):
            return {"__type__": "long", "__data__": str(obj)}
        else:
            return obj

    obj = {"testval": long(1823746192837461928374619)}
    refobj = {"testval": default(obj["testval"])}
    refout = msgpack.packb(refobj)
    assert isinstance(refout, (str, bytes))
    testout = msgpack.packb(obj, default=default)

    assert refout == testout