summaryrefslogtreecommitdiff
path: root/tests/unit/utils_json_stream_test.py
blob: f7aefd0f187530973e95a2e96c8c3201defb40b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# encoding: utf-8
from __future__ import absolute_import
from __future__ import unicode_literals

from docker.utils.json_stream import json_splitter, stream_as_text, json_stream


class TestJsonSplitter(object):

    def test_json_splitter_no_object(self):
        data = '{"foo": "bar'
        assert json_splitter(data) is None

    def test_json_splitter_with_object(self):
        data = '{"foo": "bar"}\n  \n{"next": "obj"}'
        assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')

    def test_json_splitter_leading_whitespace(self):
        data = '\n   \r{"foo": "bar"}\n\n   {"next": "obj"}'
        assert json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')


class TestStreamAsText(object):

    def test_stream_with_non_utf_unicode_character(self):
        stream = [b'\xed\xf3\xf3']
        output, = stream_as_text(stream)
        assert output == '���'

    def test_stream_with_utf_character(self):
        stream = ['ěĝ'.encode('utf-8')]
        output, = stream_as_text(stream)
        assert output == 'ěĝ'


class TestJsonStream(object):

    def test_with_falsy_entries(self):
        stream = [
            '{"one": "two"}\n{}\n',
            "[1, 2, 3]\n[]\n",
        ]
        output = list(json_stream(stream))
        assert output == [
            {'one': 'two'},
            {},
            [1, 2, 3],
            [],
        ]

    def test_with_leading_whitespace(self):
        stream = [
            '\n  \r\n  {"one": "two"}{"x": 1}',
            '  {"three": "four"}\t\t{"x": 2}'
        ]
        output = list(json_stream(stream))
        assert output == [
            {'one': 'two'},
            {'x': 1},
            {'three': 'four'},
            {'x': 2}
        ]