summaryrefslogtreecommitdiff
path: root/zephyr/zmake/tests/test_multiproc_logging.py
blob: 2eac9326d35438b2d9db50822dd8d23263748c05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import io
import logging
import os
import threading
import unittest.mock as mock

import zmake.multiproc


def test_read_output_from_pipe():
    semaphore = threading.Semaphore(0)
    pipe = os.pipe()
    fd = io.TextIOWrapper(os.fdopen(pipe[0], "rb"), encoding="utf-8")
    logger = mock.Mock(spec=logging.Logger)
    logger.log.side_effect = lambda log_lvl, line: semaphore.release()
    zmake.multiproc.log_output(logger, logging.DEBUG, fd, job_id="")
    os.write(pipe[1], "Hello\n".encode("utf-8"))
    semaphore.acquire()
    logger.log.assert_called_with(logging.DEBUG, "Hello")


def test_read_output_change_log_level():
    semaphore = threading.Semaphore(0)
    pipe = os.pipe()
    fd = io.TextIOWrapper(os.fdopen(pipe[0], "rb"), encoding="utf-8")
    logger = mock.Mock(spec=logging.Logger)
    logger.log.side_effect = lambda log_lvl, line: semaphore.release()
    # This call will log output from fd (the file descriptor) to DEBUG, though
    # when the line starts with 'World', the logging level will be switched to
    # CRITICAL (see the content of the log_lvl_override_func).
    zmake.multiproc.log_output(
        logger=logger,
        log_level=logging.DEBUG,
        file_descriptor=fd,
        log_level_override_func=lambda line, lvl: logging.CRITICAL
        if line.startswith("World")
        else lvl,
        job_id="",
    )
    os.write(pipe[1], "Hello\n".encode("utf-8"))
    semaphore.acquire()
    os.write(pipe[1], "World\n".encode("utf-8"))
    semaphore.acquire()
    os.write(pipe[1], "Bye\n".encode("utf-8"))
    semaphore.acquire()
    logger.log.assert_has_calls(
        [
            mock.call(logging.DEBUG, "Hello"),
            mock.call(logging.CRITICAL, "World"),
            mock.call(logging.CRITICAL, "Bye"),
        ]
    )


def test_read_output_from_second_pipe():
    """Test that we can read from more than one pipe.

    This is particularly important since we will block on a read/select once we
    have a file descriptor. It is important that we break from the select and
    start it again with the updated list when a new one is added.
    """
    semaphore = threading.Semaphore(0)
    pipes = [os.pipe(), os.pipe()]
    fds = [
        io.TextIOWrapper(os.fdopen(pipes[0][0], "rb"), encoding="utf-8"),
        io.TextIOWrapper(os.fdopen(pipes[1][0], "rb"), encoding="utf-8"),
    ]

    logger = mock.Mock(spec=logging.Logger)
    logger.log.side_effect = lambda log_lvl, fmt, id, line: semaphore.release()

    zmake.multiproc.log_output(logger, logging.DEBUG, fds[0], job_id="0")
    zmake.multiproc.log_output(logger, logging.ERROR, fds[1], job_id="1")

    os.write(pipes[1][1], "Hello\n".encode("utf-8"))
    semaphore.acquire()
    logger.log.assert_called_with(logging.ERROR, "[%s]%s", "1", "Hello")


def test_read_output_after_another_pipe_closed():
    """Test processing output from a pipe after closing another.

    Since we don't want to complicate the API. File descriptors are
    automatically pruned away when closed. Make sure that the other descriptors
    remain functional when that happens.
    """
    semaphore = threading.Semaphore(0)
    pipes = [os.pipe(), os.pipe()]
    fds = [
        io.TextIOWrapper(os.fdopen(pipes[0][0], "rb"), encoding="utf-8"),
        io.TextIOWrapper(os.fdopen(pipes[1][0], "rb"), encoding="utf-8"),
    ]

    logger = mock.Mock(spec=logging.Logger)
    logger.log.side_effect = lambda log_lvl, fmt, id, line: semaphore.release()

    zmake.multiproc.log_output(logger, logging.DEBUG, fds[0], job_id="0")
    zmake.multiproc.log_output(logger, logging.ERROR, fds[1], job_id="1")

    fds[0].close()
    os.write(pipes[1][1], "Hello\n".encode("utf-8"))
    semaphore.acquire()
    logger.log.assert_called_with(logging.ERROR, "[%s]%s", "1", "Hello")