summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/jobs/job.py
blob: c7e2624e61fa25a0f0e02f5985424ee54acba071 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
#
#  Copyright (C) 2018 Codethink Limited
#  Copyright (C) 2019 Bloomberg Finance LP
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors:
#        Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
#        Jürg Billeter <juerg.billeter@codethink.co.uk>
#        Tristan Maat <tristan.maat@codethink.co.uk>

# System imports
import asyncio
import contextlib
import datetime
import itertools
import multiprocessing
import threading
import traceback

# BuildStream toplevel imports
from ... import utils
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
from ._job import terminate_thread
from ..._signals import TerminateException


# Return code values shutdown of job handling child processes
#
class _ReturnCode(FastEnum):
    OK = 0
    FAIL = 1
    PERM_FAIL = 2
    SKIPPED = 3
    TERMINATED = 4


# JobStatus:
#
# The job completion status, passed back through the
# complete callbacks.
#
class JobStatus(FastEnum):
    # Job succeeded
    OK = 0

    # A temporary BstError was raised
    FAIL = 1

    # A SkipJob was raised
    SKIPPED = 3


# Job()
#
# The Job object represents a task that will run in parallel to the main
# process. It has some methods that are not implemented - they are meant for
# you to implement in a subclass.
#
# It has a close relationship with the ChildJob class, and it can be considered
# a two part solution:
#
# 1. A Job instance, which will create a ChildJob instance and arrange for
#    childjob.child_process() to be executed in another process.
# 2. The created ChildJob instance, which does the actual work.
#
# This split makes it clear what data is passed to the other process and what
# is executed in which process.
#
# To set up a minimal new kind of Job, e.g. YourJob:
#
# 1. Create a YourJob class, inheriting from Job.
# 2. Create a YourChildJob class, inheriting from ChildJob.
# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
# 4. Implement YourChildJob.child_process().
#
# Args:
#    scheduler (Scheduler): The scheduler
#    action_name (str): The queue action name
#    logfile (str): A template string that points to the logfile
#                   that should be used - should contain {pid}.
#    max_retries (int): The maximum number of retries
#
class Job:
    # Unique id generator for jobs
    #
    # This is used to identify tasks in the `State` class
    _id_generator = itertools.count(1)

    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):

        #
        # Public members
        #
        self.id = "{}-{}".format(action_name, next(Job._id_generator))
        self.name = None  # The name of the job, set by the job's subclass
        self.action_name = action_name  # The action name for the Queue

        #
        # Private members
        #
        self._scheduler = scheduler  # The scheduler
        self._messenger = self._scheduler.context.messenger
        self._pipe_r = None  # The read end of a pipe for message passing
        self._listening = False  # Whether the parent is currently listening
        self._suspended = False  # Whether this job is currently suspended
        self._max_retries = max_retries  # Maximum number of automatic retries
        self._result = None  # Return value of child action in the parent
        self._tries = 0  # Try count, for retryable jobs
        self._terminated = False  # Whether this job has been explicitly terminated

        self._logfile = logfile
        self._message_element_name = None  # The task-wide element name
        self._message_element_key = None  # The task-wide element cache key
        self._element = None  # The Element() passed to the Job() constructor, if applicable

        self._task = None  # The task that is run
        self._child = None

    # set_name()
    #
    # Sets the name of this job
    def set_name(self, name):
        self.name = name

    # start()
    #
    # Starts the job.
    #
    def start(self):

        assert not self._terminated, "Attempted to start process which was already terminated"

        # FIXME: remove this, this is not necessary when using asyncio
        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)

        self._tries += 1
        self._parent_start_listening()

        # FIXME: remove the parent/child separation, it's not needed anymore.
        self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
            self.action_name,
            self._messenger,
            self._scheduler.context.logdir,
            self._logfile,
            self._max_retries,
            self._tries,
            self._message_element_name,
            self._message_element_key,
        )

        loop = asyncio.get_event_loop()

        async def execute():
            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
            await self._parent_child_completed(ret_code)

        self._task = loop.create_task(execute())

    # terminate()
    #
    # Politely request that an ongoing job terminate soon.
    #
    # This will raise an exception in the child to ask it to exit.
    #
    def terminate(self):
        self.message(MessageType.STATUS, "{} terminating".format(self.action_name))

        # Make sure there is no garbage on the pipe
        self._parent_stop_listening()

        if self._task:
            self._child.terminate()

        self._terminated = True

    # get_terminated()
    #
    # Check if a job has been terminated.
    #
    # Returns:
    #     (bool): True in the main process if Job.terminate() was called.
    #
    def get_terminated(self):
        return self._terminated

    # set_message_element_name()
    #
    # This is called by Job subclasses to set the plugin instance element
    # name issuing the message (if an element is related to the Job).
    #
    # Args:
    #     element_name (int): The element_name to be supplied to the Message() constructor
    #
    def set_message_element_name(self, element_name):
        self._message_element_name = element_name

    # set_message_element_key()
    #
    # This is called by Job subclasses to set the element
    # key for for the issuing message (if an element is related to the Job).
    #
    # Args:
    #     element_key (_DisplayKey): The element_key tuple to be supplied to the Message() constructor
    #
    def set_message_element_key(self, element_key):
        self._message_element_key = element_key

    # message():
    #
    # Logs a message, this will be logged in the task's logfile and
    # conditionally also be sent to the frontend.
    #
    # Args:
    #    message_type (MessageType): The type of message to send
    #    message (str): The message
    #    kwargs: Remaining Message() constructor arguments, note that you can
    #            override 'element_name' and 'element_key' this way.
    #
    def message(self, message_type, message, **kwargs):
        kwargs["scheduler"] = True
        message = Message(
            message_type,
            message,
            element_name=self._message_element_name,
            element_key=self._message_element_key,
            **kwargs
        )
        self._messenger.message(message)

    # get_element()
    #
    # Get the Element() related to the job, if jobtype (i.e ElementJob) is
    # applicable, default None.
    #
    # Returns:
    #     (Element): The Element() instance pertaining to the Job, else None.
    #
    def get_element(self):
        return self._element

    #######################################################
    #                  Abstract Methods                   #
    #######################################################

    # parent_complete()
    #
    # This will be executed in the main process after the job finishes, and is
    # expected to pass the result to the main thread.
    #
    # Args:
    #    status (JobStatus): The job exit status
    #    result (any): The result returned by child_process().
    #
    def parent_complete(self, status, result):
        raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))

    # create_child_job()
    #
    # Called by a Job instance to create a child job.
    #
    # The child job object is an instance of a subclass of ChildJob.
    #
    # The child job object's child_process() method will be executed in another
    # process, so that work is done in parallel. See the documentation for the
    # Job class for more information on this relationship.
    #
    # This method must be overridden by Job subclasses.
    #
    # Returns:
    #    (ChildJob): An instance of a subclass of ChildJob.
    #
    def create_child_job(self, *args, **kwargs):
        raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__))

    #######################################################
    #                  Local Private Methods              #
    #######################################################

    # _parent_shutdown()
    #
    # Shuts down the Job on the parent side by reading any remaining
    # messages on the message pipe and cleaning up any resources.
    #
    def _parent_shutdown(self):
        # Make sure we've read everything we need and then stop listening
        self._parent_process_pipe()
        self._parent_stop_listening()

    # _parent_child_completed()
    #
    # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
    #
    # Args:
    #    returncode (int): The return code of the child process
    #
    async def _parent_child_completed(self, returncode):
        self._parent_shutdown()

        try:
            returncode = _ReturnCode(returncode)
        except ValueError:
            # An unexpected return code was returned; fail permanently and report
            self.message(
                MessageType.ERROR,
                "Internal job process unexpectedly died with exit code {}".format(returncode),
                logfile=self._logfile,
            )
            returncode = _ReturnCode.PERM_FAIL

        # We don't want to retry if we got OK or a permanent fail.
        retry_flag = returncode == _ReturnCode.FAIL

        if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
            self.start()
            return

        # Resolve the outward facing overall job completion status
        #
        if returncode == _ReturnCode.OK:
            status = JobStatus.OK
        elif returncode == _ReturnCode.SKIPPED:
            status = JobStatus.SKIPPED
        elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL):
            status = JobStatus.FAIL
        elif returncode == _ReturnCode.TERMINATED:
            if self._terminated:
                self.message(MessageType.INFO, "Job terminated")
            else:
                self.message(MessageType.ERROR, "Job was terminated unexpectedly")

            status = JobStatus.FAIL
        else:
            status = JobStatus.FAIL

        self.parent_complete(status, self._result)
        self._scheduler.job_completed(self, status)

        # Force the deletion of the pipe and process objects to try and clean up FDs
        self._pipe_r.close()
        self._pipe_r = self._task = None

    # _parent_process_pipe()
    #
    # Reads back message envelopes from the message pipe
    # in the parent process.
    #
    def _parent_process_pipe(self):
        while self._pipe_r.poll():
            try:
                message = self._pipe_r.recv()
            except EOFError:
                self._parent_stop_listening()
                break

            self._messenger.message(message)

    # _parent_recv()
    #
    # A callback to handle I/O events from the message
    # pipe file descriptor in the main process message loop
    #
    def _parent_recv(self, *args):
        self._parent_process_pipe()

    # _parent_start_listening()
    #
    # Starts listening on the message pipe
    #
    def _parent_start_listening(self):
        if not self._listening:
            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
            self._listening = True

    # _parent_stop_listening()
    #
    # Stops listening on the message pipe
    #
    def _parent_stop_listening(self):
        if self._listening:
            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
            self._listening = False


# ChildJob()
#
# The ChildJob object represents the part of a parallel task that will run in a
# separate process. It has a close relationship with the parent Job that
# created it.
#
# See the documentation of the Job class for more on their relationship, and
# how to set up a (Job, ChildJob pair).
#
# The args below are passed from the parent Job to the ChildJob.
#
# Args:
#    scheduler (Scheduler): The scheduler.
#    action_name (str): The queue action name.
#    logfile (str): A template string that points to the logfile
#                   that should be used - should contain {pid}.
#    max_retries (int): The maximum number of retries.
#    tries (int): The number of retries so far.
#    message_element_name (str): None, or the plugin instance element name
#                                to be supplied to the Message() constructor.
#    message_element_key (tuple): None, or the element display key tuple
#                                to be supplied to the Message() constructor.
#
class ChildJob:
    def __init__(
        self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key
    ):

        self.action_name = action_name

        self._messenger = messenger
        self._logdir = logdir
        self._logfile = logfile
        self._max_retries = max_retries
        self._tries = tries
        self._message_element_name = message_element_name
        self._message_element_key = message_element_key

        self._pipe_w = None  # The write end of a pipe for message passing
        self._thread_id = None  # Thread in which the child executes its action
        self._should_terminate = False
        self._terminate_lock = threading.Lock()

    # message():
    #
    # Logs a message, this will be logged in the task's logfile and
    # conditionally also be sent to the frontend.
    #
    # Args:
    #    message_type (MessageType): The type of message to send
    #    message (str): The message
    #    kwargs: Remaining Message() constructor arguments, note
    #            element_key is set in _child_message_handler
    #            for front end display if not already set or explicitly
    #            overriden here.
    #
    def message(self, message_type, message, **kwargs):
        kwargs["scheduler"] = True
        self._messenger.message(
            Message(
                message_type,
                message,
                element_name=self._message_element_name,
                element_key=self._message_element_key,
                **kwargs
            )
        )

    #######################################################
    #                  Abstract Methods                   #
    #######################################################

    # child_process()
    #
    # This will be executed after starting the child process, and is intended
    # to perform the job's task.
    #
    # Returns:
    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
    #           dicts, numbers, but not Element instances). It is returned to
    #           the parent Job running in the main process. This is taken as
    #           the result of the Job.
    #
    def child_process(self):
        raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))

    # child_action()
    #
    # Perform the action in the child process, this calls the action_cb.
    #
    # Args:
    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
    #
    def child_action(self, pipe_w):
        # Assign the pipe we passed across the process boundaries
        #
        # Set the global message handler in this child
        # process to forward messages to the parent process
        self._pipe_w = pipe_w
        self._messenger.set_message_handler(self._child_message_handler)

        # FIXME
        silence = self.action_name == "Cache-query"

        # Time, log and and run the action function
        #
        if silence:
            record_cm = contextlib.suppress()
        else:
            record_cm = self._messenger.recorded_messages(self._logfile, self._logdir)
        with self._messenger.timed_suspendable() as timeinfo, record_cm as filename:
            try:
                if not silence:
                    self.message(MessageType.START, self.action_name, logfile=filename)

                with self._terminate_lock:
                    self._thread_id = threading.current_thread().ident
                    if self._should_terminate:
                        return _ReturnCode.TERMINATED, None

                try:
                    # Try the task action
                    result = self.child_process()  # pylint: disable=assignment-from-no-return
                except SkipJob as e:
                    elapsed = datetime.datetime.now() - timeinfo.start_time
                    if not silence:
                        self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)

                    # Alert parent of skip by return code
                    return _ReturnCode.SKIPPED, None
                except BstError as e:
                    elapsed = datetime.datetime.now() - timeinfo.start_time
                    retry_flag = e.temporary

                    if retry_flag and (self._tries <= self._max_retries):
                        self.message(
                            MessageType.FAIL,
                            "Try #{} failed, retrying".format(self._tries),
                            elapsed=elapsed,
                            logfile=filename,
                        )
                    else:
                        self.message(
                            MessageType.FAIL,
                            str(e),
                            elapsed=elapsed,
                            detail=e.detail,
                            logfile=filename,
                            sandbox=e.sandbox,
                        )

                    # Report the exception to the parent (for internal testing purposes)
                    set_last_task_error(e.domain, e.reason)

                    # Set return code based on whether or not the error was temporary.
                    #
                    return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None
                except Exception:  # pylint: disable=broad-except

                    # If an unhandled (not normalized to BstError) occurs, that's a bug,
                    # send the traceback and formatted exception back to the frontend
                    # and print it to the log file.
                    #
                    elapsed = datetime.datetime.now() - timeinfo.start_time
                    detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())

                    self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
                    # Unhandled exceptions should permenantly fail
                    return _ReturnCode.PERM_FAIL, None

                else:
                    # No exception occurred in the action
                    elapsed = datetime.datetime.now() - timeinfo.start_time
                    if not silence:
                        self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)

                    # Shutdown needs to stay outside of the above context manager,
                    # make sure we dont try to handle SIGTERM while the process
                    # is already busy in sys.exit()
                    return _ReturnCode.OK, result
                finally:
                    self._thread_id = None
            except TerminateException:
                self._thread_id = None
                return _ReturnCode.TERMINATED, None
            finally:
                self._pipe_w.close()

    # terminate()
    #
    # Ask the the current child thread to terminate
    #
    # This should only ever be called from the main thread.
    #
    def terminate(self):
        assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler"

        if self._should_terminate:
            return

        with self._terminate_lock:
            self._should_terminate = True
            if self._thread_id is None:
                return

        terminate_thread(self._thread_id)

    #######################################################
    #                  Local Private Methods              #
    #######################################################

    # _child_message_handler()
    #
    # A Context delegate for handling messages, this replaces the
    # frontend's main message handler in the context of a child task
    # and performs local logging to the local log file before sending
    # the message back to the parent process for further propagation.
    # The related element display key is added to the message for
    # widget rendering if not already set for an element childjob.
    #
    # Args:
    #    message     (Message): The message to log
    #    is_silenced (bool)   : Whether messages are silenced
    #
    def _child_message_handler(self, message, is_silenced):

        message.action_name = self.action_name
        message.task_element_name = self._message_element_name
        message.task_element_key = self._message_element_key

        # Send to frontend if appropriate
        if is_silenced and (message.message_type not in unconditional_messages):
            return

        # Don't bother propagating these to the frontend
        if message.message_type == MessageType.LOG:
            return

        self._pipe_w.send(message)