summaryrefslogtreecommitdiff
path: root/test/test_sequnpack.py
blob: 59718f5635e803e42c46eebe5dbda0c691f61aad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
# coding: utf-8

import io
from msgpack import Unpacker, BufferFull
from msgpack import pack
from msgpack.exceptions import OutOfData
from pytest import raises


def test_partialdata():
    unpacker = Unpacker()
    unpacker.feed(b'\xa5')
    with raises(StopIteration): next(iter(unpacker))
    unpacker.feed(b'h')
    with raises(StopIteration): next(iter(unpacker))
    unpacker.feed(b'a')
    with raises(StopIteration): next(iter(unpacker))
    unpacker.feed(b'l')
    with raises(StopIteration): next(iter(unpacker))
    unpacker.feed(b'l')
    with raises(StopIteration): next(iter(unpacker))
    unpacker.feed(b'o')
    assert next(iter(unpacker)) == b'hallo'

def test_foobar():
    unpacker = Unpacker(read_size=3, use_list=1)
    unpacker.feed(b'foobar')
    assert unpacker.unpack() == ord(b'f')
    assert unpacker.unpack() == ord(b'o')
    assert unpacker.unpack() == ord(b'o')
    assert unpacker.unpack() == ord(b'b')
    assert unpacker.unpack() == ord(b'a')
    assert unpacker.unpack() == ord(b'r')
    with raises(OutOfData):
        unpacker.unpack()

    unpacker.feed(b'foo')
    unpacker.feed(b'bar')

    k = 0
    for o, e in zip(unpacker, 'foobarbaz'):
        assert o == ord(e)
        k += 1
    assert k == len(b'foobar')

def test_foobar_skip():
    unpacker = Unpacker(read_size=3, use_list=1)
    unpacker.feed(b'foobar')
    assert unpacker.unpack() == ord(b'f')
    unpacker.skip()
    assert unpacker.unpack() == ord(b'o')
    unpacker.skip()
    assert unpacker.unpack() == ord(b'a')
    unpacker.skip()
    with raises(OutOfData):
        unpacker.unpack()

def test_maxbuffersize():
    with raises(ValueError):
        Unpacker(read_size=5, max_buffer_size=3)
    unpacker = Unpacker(read_size=3, max_buffer_size=3, use_list=1)
    unpacker.feed(b'fo')
    with raises(BufferFull):
        unpacker.feed(b'ob')
    unpacker.feed(b'o')
    assert ord('f') == next(unpacker)
    unpacker.feed(b'b')
    assert ord('o') == next(unpacker)
    assert ord('o') == next(unpacker)
    assert ord('b') == next(unpacker)


def test_readbytes():
    unpacker = Unpacker(read_size=3)
    unpacker.feed(b'foobar')
    assert unpacker.unpack() == ord(b'f')
    assert unpacker.read_bytes(3) == b'oob'
    assert unpacker.unpack() == ord(b'a')
    assert unpacker.unpack() == ord(b'r')

    # Test buffer refill
    unpacker = Unpacker(io.BytesIO(b'foobar'), read_size=3)
    assert unpacker.unpack() == ord(b'f')
    assert unpacker.read_bytes(3) == b'oob'
    assert unpacker.unpack() == ord(b'a')
    assert unpacker.unpack() == ord(b'r')

def test_issue124():
    unpacker = Unpacker()
    unpacker.feed(b'\xa1?\xa1!')
    assert tuple(unpacker) == (b'?', b'!')
    assert tuple(unpacker) == ()
    unpacker.feed(b"\xa1?\xa1")
    assert tuple(unpacker) == (b'?',)
    assert tuple(unpacker) == ()
    unpacker.feed(b"!")
    assert tuple(unpacker) == (b'!',)
    assert tuple(unpacker) == ()


def test_unpack_tell():
    stream = io.BytesIO()
    messages = [2**i-1 for i in range(65)]
    messages += [-(2**i) for i in range(1, 64)]
    messages += [b'hello', b'hello'*1000, list(range(20)),
                 {i: bytes(i)*i for i in range(10)},
                 {i: bytes(i)*i for i in range(32)}]
    offsets = []
    for m in messages:
        pack(m, stream)
        offsets.append(stream.tell())
    stream.seek(0)
    unpacker = Unpacker(stream)
    for m, o in zip(messages, offsets):
        m2 = next(unpacker)
        assert m == m2
        assert o == unpacker.tell()