diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 19:22:48 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 20:02:25 +0900 |
commit | 268340a56d81ede1bc9377a23a38d426f33e4f1f (patch) | |
tree | 2e66c2b0be36c555647309d227b12f25847d39f5 | |
parent | 853c54eb5c2251adac80982451a2178b7b7ec4c1 (diff) | |
download | buildstream-268340a56d81ede1bc9377a23a38d426f33e4f1f.tar.gz |
_scheduler/scheduler.py: Adhere to policy on private symbols
Also added Scheduler.jobs_suspended() context manager to
use in the App instead of exposing Scheduler.suspend_jobs()
and Scheduler.resume_jobs() along with signal callback connect
and disconnect APIs, this keeps things more private in the
scheduler.
This is a part of issue #285
-rw-r--r-- | buildstream/_frontend/app.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 355 |
2 files changed, 209 insertions, 160 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index db9c97b84..f4336956d 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -778,16 +778,12 @@ class App(): @contextmanager def _interrupted(self): - self.scheduler.disconnect_signals() - self._status.clear() - self.scheduler.suspend_jobs() - - yield - - self._maybe_render_status() - self.scheduler.resume_jobs() - self.scheduler.connect_signals() + try: + with self.scheduler.jobs_suspended(): + yield + finally: + self._maybe_render_status() # Some validation routines for project initialization # diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 0ab8ace7c..25e1e6790 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -24,6 +24,7 @@ import os import asyncio import signal import datetime +from contextlib import contextmanager # Local imports from .queue import QueueType @@ -65,30 +66,39 @@ class Scheduler(): ticker_callback=None, job_start_callback=None, job_complete_callback=None): - self.loop = None - self.interrupt_callback = interrupt_callback - self.ticker_callback = ticker_callback - self.job_start_callback = job_start_callback - self.job_complete_callback = job_complete_callback - self.context = context - self.queues = None - self.starttime = start_time - self.suspendtime = None + + # + # Public members + # + self.queues = None # Exposed for the frontend to print summaries + self.context = context # The Context object shared with Queues + self.terminated = False # Whether the scheduler was asked to terminate or has terminated + self.suspended = False # Whether the scheduler is currently suspended + + # These are shared with the Job, but should probably be removed or made private in some way. + self.loop = None # Shared for Job access to observe the message queue + self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py + + # + # Private members + # + self._interrupt_callback = interrupt_callback + self._ticker_callback = ticker_callback + self._job_start_callback = job_start_callback + self._job_complete_callback = job_complete_callback + + self._starttime = start_time + self._suspendtime = None + self._queue_jobs = True # Whether we should continue to queue jobs # Initialize task tokens with the number allowed by # the user configuration - self.job_tokens = { + self._job_tokens = { QueueType.FETCH: context.sched_fetchers, QueueType.BUILD: context.sched_builders, QueueType.PUSH: context.sched_pushers } - # Some local state - self.queue_jobs = True # Whether we should continue to queue jobs - self.terminated = False # Hold on to whether we were terminated - self.suspended = False # Whether tasks are currently suspended - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced (handle feedback) - # run() # # Args: @@ -113,11 +123,11 @@ class Scheduler(): asyncio.set_event_loop(self.loop) # Add timeouts - if self.ticker_callback: - self.loop.call_later(1, self.tick) + if self._ticker_callback: + self.loop.call_later(1, self._tick) # Handle unix signals while running - self.connect_signals() + self._connect_signals() # Run the queues self.sched() @@ -125,9 +135,9 @@ class Scheduler(): self.loop.close() # Stop handling unix signals - self.disconnect_signals() + self._disconnect_signals() - failed = self.failed_elements() + failed = any(any(queue.failed_elements) for queue in self.queues) self.loop = None if failed: @@ -157,36 +167,25 @@ class Scheduler(): # attribute to decide whether or not to print status info # etc and the following code block will trigger some callbacks. self.terminated = True - self.loop.call_soon(self.terminate_jobs_real) + self.loop.call_soon(self._terminate_jobs_real) # Block this until we're finished terminating jobs, # this will remain blocked forever. signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) - # suspend_jobs() + # jobs_suspended() # - # Suspend all ongoing jobs. + # A context manager for running with jobs suspended # - def suspend_jobs(self): - if not self.suspended: - self.suspendtime = datetime.datetime.now() - self.suspended = True - for queue in self.queues: - for job in queue.active_jobs: - job.suspend() + @contextmanager + def jobs_suspended(self): + self._disconnect_signals() + self._suspend_jobs() - # resume_jobs() - # - # Resume suspended jobs. - # - def resume_jobs(self): - if self.suspended: - for queue in self.queues: - for job in queue.active_jobs: - job.resume() - self.suspended = False - self.starttime += (datetime.datetime.now() - self.suspendtime) - self.suspendtime = None + yield + + self._resume_jobs() + self._connect_signals() # stop_queueing() # @@ -194,7 +193,7 @@ class Scheduler(): # to return once all currently processing jobs are finished. # def stop_queueing(self): - self.queue_jobs = False + self._queue_jobs = False # elapsed_time() # @@ -206,81 +205,60 @@ class Scheduler(): # def elapsed_time(self): timenow = datetime.datetime.now() - starttime = self.starttime + starttime = self._starttime if not starttime: starttime = timenow return timenow - starttime - ####################################################### - # Main Loop Events # - ####################################################### - def interrupt_event(self): - # Leave this to the frontend to decide, if no - # interrrupt callback was specified, then just terminate. - if self.interrupt_callback: - self.interrupt_callback() - else: - # Default without a frontend is just terminate - self.terminate_jobs() - - def terminate_event(self): - # Terminate gracefully if we receive SIGTERM - self.terminate_jobs() - - def suspend_event(self): - - # Ignore the feedback signals from Job.suspend() - if self.internal_stops: - self.internal_stops -= 1 - return + # sched() + # + # The main driving function of the scheduler, it will be called + # automatically when Scheduler.run() is called initially, and needs + # to be called whenever a job can potentially be scheduled, usually + # when a Queue completes handling of a job. + # + # This will process the Queues and pull elements through the Queues + # and process anything that is ready. + # + def sched(self): - # No need to care if jobs were suspended or not, we _only_ handle this - # while we know jobs are not suspended. - self.suspend_jobs() - os.kill(os.getpid(), signal.SIGSTOP) - self.resume_jobs() + process_queues = True - ####################################################### - # Internal methods # - ####################################################### - def connect_signals(self): - self.loop.add_signal_handler(signal.SIGINT, self.interrupt_event) - self.loop.add_signal_handler(signal.SIGTERM, self.terminate_event) - self.loop.add_signal_handler(signal.SIGTSTP, self.suspend_event) + while self._queue_jobs and process_queues: - def disconnect_signals(self): - self.loop.remove_signal_handler(signal.SIGINT) - self.loop.remove_signal_handler(signal.SIGTSTP) - self.loop.remove_signal_handler(signal.SIGTERM) + # Pull elements forward through queues + elements = [] + for queue in self.queues: + # Enqueue elements complete from the last queue + queue.enqueue(elements) - def failed_elements(self): - failed = False - for queue in self.queues: - if queue.failed_elements: - failed = True - break - return failed + # Dequeue processed elements for the next queue + elements = list(queue.dequeue()) + elements = list(elements) - def terminate_jobs_real(self): - # 20 seconds is a long time, it can take a while and sometimes - # we still fail, need to look deeper into this again. - wait_start = datetime.datetime.now() - wait_limit = 20.0 + # Kickoff whatever processes can be processed at this time + # + # We start by queuing from the last queue first, because we want to + # give priority to queues later in the scheduling process in the case + # that multiple queues share the same token type. + # + # This avoids starvation situations where we dont move on to fetch + # tasks for elements which failed to pull, and thus need all the pulls + # to complete before ever starting a build + for queue in reversed(self.queues): + queue.process_ready() - # First tell all jobs to terminate - for queue in self.queues: - for job in queue.active_jobs: - job.terminate() + # process_ready() may have skipped jobs, adding them to the done_queue. + # Pull these skipped elements forward to the next queue and process them. + process_queues = any(q.dequeue_ready() for q in self.queues) - # Now wait for them to really terminate + # If nothings ticking, time to bail out + ticking = 0 for queue in self.queues: - for job in queue.active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() + ticking += len(queue.active_jobs) - self.loop.stop() + if ticking == 0: + self.loop.stop() # get_job_token(): # @@ -296,8 +274,8 @@ class Scheduler(): # (bool): Whether a token was handed out or not # def get_job_token(self, queue_type): - if self.job_tokens[queue_type] > 0: - self.job_tokens[queue_type] -= 1 + if self._job_tokens[queue_type] > 0: + self._job_tokens[queue_type] -= 1 return True return False @@ -311,60 +289,135 @@ class Scheduler(): # queue_type (QueueType): The type of token to obtain # def put_job_token(self, queue_type): - self.job_tokens[queue_type] += 1 + self._job_tokens[queue_type] += 1 - def sched(self): + # job_starting(): + # + # Called by the Queue when starting a Job + # + # Args: + # job (Job): The starting Job + # + def job_starting(self, job): + if self._job_start_callback: + self._job_start_callback(job.element, job.action_name) - process_queues = True + # job_completed(): + # + # Called by the Queue when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # success (bool): Whether the Job completed with a success status + # + def job_completed(self, queue, job, success): + if self._job_complete_callback: + self._job_complete_callback(job.element, queue, job.action_name, success) - while self.queue_jobs and process_queues: + ####################################################### + # Local Private Methods # + ####################################################### - # Pull elements forward through queues - elements = [] + # _suspend_jobs() + # + # Suspend all ongoing jobs. + # + def _suspend_jobs(self): + if not self.suspended: + self._suspendtime = datetime.datetime.now() + self.suspended = True for queue in self.queues: - # Enqueue elements complete from the last queue - queue.enqueue(elements) + for job in queue.active_jobs: + job.suspend() - # Dequeue processed elements for the next queue - elements = list(queue.dequeue()) - elements = list(elements) + # _resume_jobs() + # + # Resume suspended jobs. + # + def _resume_jobs(self): + if self.suspended: + for queue in self.queues: + for job in queue.active_jobs: + job.resume() + self.suspended = False + self._starttime += (datetime.datetime.now() - self._suspendtime) + self._suspendtime = None - # Kickoff whatever processes can be processed at this time - # - # We start by queuing from the last queue first, because we want to - # give priority to queues later in the scheduling process in the case - # that multiple queues share the same token type. - # - # This avoids starvation situations where we dont move on to fetch - # tasks for elements which failed to pull, and thus need all the pulls - # to complete before ever starting a build - for queue in reversed(self.queues): - queue.process_ready() + # _interrupt_event(): + # + # A loop registered event callback for keyboard interrupts + # + def _interrupt_event(self): + # Leave this to the frontend to decide, if no + # interrrupt callback was specified, then just terminate. + if self._interrupt_callback: + self._interrupt_callback() + else: + # Default without a frontend is just terminate + self.terminate_jobs() - # process_ready() may have skipped jobs, adding them to the done_queue. - # Pull these skipped elements forward to the next queue and process them. - process_queues = any(q.dequeue_ready() for q in self.queues) + # _terminate_event(): + # + # A loop registered event callback for SIGTERM + # + def _terminate_event(self): + self.terminate_jobs() - # If nothings ticking, time to bail out - ticking = 0 + # _suspend_event(): + # + # A loop registered event callback for SIGTSTP + # + def _suspend_event(self): + + # Ignore the feedback signals from Job.suspend() + if self.internal_stops: + self.internal_stops -= 1 + return + + # No need to care if jobs were suspended or not, we _only_ handle this + # while we know jobs are not suspended. + self._suspend_jobs() + os.kill(os.getpid(), signal.SIGSTOP) + self._resume_jobs() + + # _connect_signals(): + # + # Connects our signal handler event callbacks to the mainloop + # + def _connect_signals(self): + self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event) + self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event) + self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event) + + def _disconnect_signals(self): + self.loop.remove_signal_handler(signal.SIGINT) + self.loop.remove_signal_handler(signal.SIGTSTP) + self.loop.remove_signal_handler(signal.SIGTERM) + + def _terminate_jobs_real(self): + # 20 seconds is a long time, it can take a while and sometimes + # we still fail, need to look deeper into this again. + wait_start = datetime.datetime.now() + wait_limit = 20.0 + + # First tell all jobs to terminate for queue in self.queues: - ticking += len(queue.active_jobs) + for job in queue.active_jobs: + job.terminate() - if ticking == 0: - self.loop.stop() + # Now wait for them to really terminate + for queue in self.queues: + for job in queue.active_jobs: + elapsed = datetime.datetime.now() - wait_start + timeout = max(wait_limit - elapsed.total_seconds(), 0.0) + if not job.terminate_wait(timeout): + job.kill() + + self.loop.stop() # Regular timeout for driving status in the UI - def tick(self): + def _tick(self): elapsed = self.elapsed_time() - self.ticker_callback(elapsed) - self.loop.call_later(1, self.tick) - - # Called by the Queue when starting a Job - def job_starting(self, job): - if self.job_start_callback: - self.job_start_callback(job.element, job.action_name) - - # Called by the Queue when a Job completed - def job_completed(self, queue, job, success): - if self.job_complete_callback: - self.job_complete_callback(job.element, queue, job.action_name, success) + self._ticker_callback(elapsed) + self.loop.call_later(1, self._tick) |