summaryrefslogtreecommitdiff
path: root/zephyr/zmake/zmake/jobserver.py
blob: 69199a2dc8f0b33757125120f1879c38ecafc110 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for job counters, limiting the amount of concurrent executions."""

import logging
import multiprocessing
import os
import re
import select
import subprocess

import zmake


class JobHandle:
    """Small object to handle claim of a job."""

    def __init__(self, release_func, *args, **kwargs):
        self.release_func = release_func
        self.args = args
        self.kwargs = kwargs

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        self.release_func(*self.args, **self.kwargs)


class JobClient:
    """Abstract base class for all job clients."""

    def get_job(self):
        """Claim a job."""
        raise NotImplementedError("Abstract method not implemented")

    def env(self):
        """Get the environment variables necessary to share the job server."""
        return {}

    def popen(self, *args, **kwargs):
        """Start a process using subprocess.Popen

        All other arguments are passed to subprocess.Popen.

        Returns:
            A Popen object.
        """
        kwargs.setdefault("env", os.environ)
        kwargs["env"].update(self.env())

        logger = logging.getLogger(self.__class__.__name__)
        logger.debug("Running %s", zmake.util.repr_command(*args))
        return subprocess.Popen(*args, **kwargs)

    def run(self, *args, claim_job=True, **kwargs):
        """Run a process using subprocess.run, optionally claiming a job.

        Args:
            claim_job: True if a job should be claimed.

        All other arguments are passed to subprocess.run.

        Returns:
            A CompletedProcess object.
        """
        if claim_job:
            with self.get_job():
                return self.run(*args, claim_job=False, **kwargs)

        kwargs.setdefault("env", os.environ)
        kwargs["env"].update(self.env())

        return subprocess.run(*args, **kwargs)


class JobServer(JobClient):
    """Abstract Job Server."""

    def __init__(self, jobs=0):
        raise NotImplementedError("Abstract method not implemented")


class GNUMakeJobClient(JobClient):
    def __init__(self, read_fd, write_fd):
        self._pipe = [read_fd, write_fd]

    @classmethod
    def from_environ(cls, env=None):
        """Create a job client from an environment with the MAKEFLAGS variable.

        If we are started under a GNU Make Job Server, we can search
        the environment for a string "--jobserver-auth=R,W", where R
        and W will be the read and write file descriptors to the pipe
        respectively.  If we don't find this environment variable (or
        the string inside of it), this will raise an OSError.

        Args:
            env: Optionally, the environment to search.

        Returns:
            A GNUMakeJobClient configured appropriately.
        """
        if env is None:
            env = os.environ
        makeflags = env.get("MAKEFLAGS")
        if not makeflags:
            raise OSError("MAKEFLAGS is not set in the environment")
        match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags)
        if not match:
            raise OSError("MAKEFLAGS did not contain jobserver flags")
        read_fd, write_fd = map(int, match.groups())
        return cls(read_fd, write_fd)

    def get_job(self):
        """Claim a job.

        Returns:
            A JobHandle object.
        """
        byte = os.read(self._pipe[0], 1)
        return JobHandle(lambda: os.write(self._pipe[1], byte))

    def env(self):
        """Get the environment variables necessary to share the job server."""
        return {"MAKEFLAGS": "--jobserver-auth={},{}".format(*self._pipe)}


class GNUMakeJobServer(JobServer, GNUMakeJobClient):
    """Implements a GNU Make POSIX Job Server.

    See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
    for specification.
    """

    def __init__(self, jobs=0):
        if not jobs:
            jobs = multiprocessing.cpu_count()
        elif jobs > select.PIPE_BUF:
            jobs = select.PIPE_BUF

        self._pipe = os.pipe()
        os.write(self._pipe[1], b"+" * jobs)