summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 19:22:48 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 20:02:25 +0900
commit268340a56d81ede1bc9377a23a38d426f33e4f1f (patch)
tree2e66c2b0be36c555647309d227b12f25847d39f5
parent853c54eb5c2251adac80982451a2178b7b7ec4c1 (diff)
downloadbuildstream-268340a56d81ede1bc9377a23a38d426f33e4f1f.tar.gz
_scheduler/scheduler.py: Adhere to policy on private symbols
Also added Scheduler.jobs_suspended() context manager to use in the App instead of exposing Scheduler.suspend_jobs() and Scheduler.resume_jobs() along with signal callback connect and disconnect APIs, this keeps things more private in the scheduler. This is a part of issue #285
-rw-r--r--buildstream/_frontend/app.py14
-rw-r--r--buildstream/_scheduler/scheduler.py355
2 files changed, 209 insertions, 160 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index db9c97b84..f4336956d 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -778,16 +778,12 @@ class App():
@contextmanager
def _interrupted(self):
- self.scheduler.disconnect_signals()
-
self._status.clear()
- self.scheduler.suspend_jobs()
-
- yield
-
- self._maybe_render_status()
- self.scheduler.resume_jobs()
- self.scheduler.connect_signals()
+ try:
+ with self.scheduler.jobs_suspended():
+ yield
+ finally:
+ self._maybe_render_status()
# Some validation routines for project initialization
#
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 0ab8ace7c..25e1e6790 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -24,6 +24,7 @@ import os
import asyncio
import signal
import datetime
+from contextlib import contextmanager
# Local imports
from .queue import QueueType
@@ -65,30 +66,39 @@ class Scheduler():
ticker_callback=None,
job_start_callback=None,
job_complete_callback=None):
- self.loop = None
- self.interrupt_callback = interrupt_callback
- self.ticker_callback = ticker_callback
- self.job_start_callback = job_start_callback
- self.job_complete_callback = job_complete_callback
- self.context = context
- self.queues = None
- self.starttime = start_time
- self.suspendtime = None
+
+ #
+ # Public members
+ #
+ self.queues = None # Exposed for the frontend to print summaries
+ self.context = context # The Context object shared with Queues
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
+ self.suspended = False # Whether the scheduler is currently suspended
+
+ # These are shared with the Job, but should probably be removed or made private in some way.
+ self.loop = None # Shared for Job access to observe the message queue
+ self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
+
+ #
+ # Private members
+ #
+ self._interrupt_callback = interrupt_callback
+ self._ticker_callback = ticker_callback
+ self._job_start_callback = job_start_callback
+ self._job_complete_callback = job_complete_callback
+
+ self._starttime = start_time
+ self._suspendtime = None
+ self._queue_jobs = True # Whether we should continue to queue jobs
# Initialize task tokens with the number allowed by
# the user configuration
- self.job_tokens = {
+ self._job_tokens = {
QueueType.FETCH: context.sched_fetchers,
QueueType.BUILD: context.sched_builders,
QueueType.PUSH: context.sched_pushers
}
- # Some local state
- self.queue_jobs = True # Whether we should continue to queue jobs
- self.terminated = False # Hold on to whether we were terminated
- self.suspended = False # Whether tasks are currently suspended
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced (handle feedback)
-
# run()
#
# Args:
@@ -113,11 +123,11 @@ class Scheduler():
asyncio.set_event_loop(self.loop)
# Add timeouts
- if self.ticker_callback:
- self.loop.call_later(1, self.tick)
+ if self._ticker_callback:
+ self.loop.call_later(1, self._tick)
# Handle unix signals while running
- self.connect_signals()
+ self._connect_signals()
# Run the queues
self.sched()
@@ -125,9 +135,9 @@ class Scheduler():
self.loop.close()
# Stop handling unix signals
- self.disconnect_signals()
+ self._disconnect_signals()
- failed = self.failed_elements()
+ failed = any(any(queue.failed_elements) for queue in self.queues)
self.loop = None
if failed:
@@ -157,36 +167,25 @@ class Scheduler():
# attribute to decide whether or not to print status info
# etc and the following code block will trigger some callbacks.
self.terminated = True
- self.loop.call_soon(self.terminate_jobs_real)
+ self.loop.call_soon(self._terminate_jobs_real)
# Block this until we're finished terminating jobs,
# this will remain blocked forever.
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
- # suspend_jobs()
+ # jobs_suspended()
#
- # Suspend all ongoing jobs.
+ # A context manager for running with jobs suspended
#
- def suspend_jobs(self):
- if not self.suspended:
- self.suspendtime = datetime.datetime.now()
- self.suspended = True
- for queue in self.queues:
- for job in queue.active_jobs:
- job.suspend()
+ @contextmanager
+ def jobs_suspended(self):
+ self._disconnect_signals()
+ self._suspend_jobs()
- # resume_jobs()
- #
- # Resume suspended jobs.
- #
- def resume_jobs(self):
- if self.suspended:
- for queue in self.queues:
- for job in queue.active_jobs:
- job.resume()
- self.suspended = False
- self.starttime += (datetime.datetime.now() - self.suspendtime)
- self.suspendtime = None
+ yield
+
+ self._resume_jobs()
+ self._connect_signals()
# stop_queueing()
#
@@ -194,7 +193,7 @@ class Scheduler():
# to return once all currently processing jobs are finished.
#
def stop_queueing(self):
- self.queue_jobs = False
+ self._queue_jobs = False
# elapsed_time()
#
@@ -206,81 +205,60 @@ class Scheduler():
#
def elapsed_time(self):
timenow = datetime.datetime.now()
- starttime = self.starttime
+ starttime = self._starttime
if not starttime:
starttime = timenow
return timenow - starttime
- #######################################################
- # Main Loop Events #
- #######################################################
- def interrupt_event(self):
- # Leave this to the frontend to decide, if no
- # interrrupt callback was specified, then just terminate.
- if self.interrupt_callback:
- self.interrupt_callback()
- else:
- # Default without a frontend is just terminate
- self.terminate_jobs()
-
- def terminate_event(self):
- # Terminate gracefully if we receive SIGTERM
- self.terminate_jobs()
-
- def suspend_event(self):
-
- # Ignore the feedback signals from Job.suspend()
- if self.internal_stops:
- self.internal_stops -= 1
- return
+ # sched()
+ #
+ # The main driving function of the scheduler, it will be called
+ # automatically when Scheduler.run() is called initially, and needs
+ # to be called whenever a job can potentially be scheduled, usually
+ # when a Queue completes handling of a job.
+ #
+ # This will process the Queues and pull elements through the Queues
+ # and process anything that is ready.
+ #
+ def sched(self):
- # No need to care if jobs were suspended or not, we _only_ handle this
- # while we know jobs are not suspended.
- self.suspend_jobs()
- os.kill(os.getpid(), signal.SIGSTOP)
- self.resume_jobs()
+ process_queues = True
- #######################################################
- # Internal methods #
- #######################################################
- def connect_signals(self):
- self.loop.add_signal_handler(signal.SIGINT, self.interrupt_event)
- self.loop.add_signal_handler(signal.SIGTERM, self.terminate_event)
- self.loop.add_signal_handler(signal.SIGTSTP, self.suspend_event)
+ while self._queue_jobs and process_queues:
- def disconnect_signals(self):
- self.loop.remove_signal_handler(signal.SIGINT)
- self.loop.remove_signal_handler(signal.SIGTSTP)
- self.loop.remove_signal_handler(signal.SIGTERM)
+ # Pull elements forward through queues
+ elements = []
+ for queue in self.queues:
+ # Enqueue elements complete from the last queue
+ queue.enqueue(elements)
- def failed_elements(self):
- failed = False
- for queue in self.queues:
- if queue.failed_elements:
- failed = True
- break
- return failed
+ # Dequeue processed elements for the next queue
+ elements = list(queue.dequeue())
+ elements = list(elements)
- def terminate_jobs_real(self):
- # 20 seconds is a long time, it can take a while and sometimes
- # we still fail, need to look deeper into this again.
- wait_start = datetime.datetime.now()
- wait_limit = 20.0
+ # Kickoff whatever processes can be processed at this time
+ #
+ # We start by queuing from the last queue first, because we want to
+ # give priority to queues later in the scheduling process in the case
+ # that multiple queues share the same token type.
+ #
+ # This avoids starvation situations where we dont move on to fetch
+ # tasks for elements which failed to pull, and thus need all the pulls
+ # to complete before ever starting a build
+ for queue in reversed(self.queues):
+ queue.process_ready()
- # First tell all jobs to terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- job.terminate()
+ # process_ready() may have skipped jobs, adding them to the done_queue.
+ # Pull these skipped elements forward to the next queue and process them.
+ process_queues = any(q.dequeue_ready() for q in self.queues)
- # Now wait for them to really terminate
+ # If nothings ticking, time to bail out
+ ticking = 0
for queue in self.queues:
- for job in queue.active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
+ ticking += len(queue.active_jobs)
- self.loop.stop()
+ if ticking == 0:
+ self.loop.stop()
# get_job_token():
#
@@ -296,8 +274,8 @@ class Scheduler():
# (bool): Whether a token was handed out or not
#
def get_job_token(self, queue_type):
- if self.job_tokens[queue_type] > 0:
- self.job_tokens[queue_type] -= 1
+ if self._job_tokens[queue_type] > 0:
+ self._job_tokens[queue_type] -= 1
return True
return False
@@ -311,60 +289,135 @@ class Scheduler():
# queue_type (QueueType): The type of token to obtain
#
def put_job_token(self, queue_type):
- self.job_tokens[queue_type] += 1
+ self._job_tokens[queue_type] += 1
- def sched(self):
+ # job_starting():
+ #
+ # Called by the Queue when starting a Job
+ #
+ # Args:
+ # job (Job): The starting Job
+ #
+ def job_starting(self, job):
+ if self._job_start_callback:
+ self._job_start_callback(job.element, job.action_name)
- process_queues = True
+ # job_completed():
+ #
+ # Called by the Queue when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # success (bool): Whether the Job completed with a success status
+ #
+ def job_completed(self, queue, job, success):
+ if self._job_complete_callback:
+ self._job_complete_callback(job.element, queue, job.action_name, success)
- while self.queue_jobs and process_queues:
+ #######################################################
+ # Local Private Methods #
+ #######################################################
- # Pull elements forward through queues
- elements = []
+ # _suspend_jobs()
+ #
+ # Suspend all ongoing jobs.
+ #
+ def _suspend_jobs(self):
+ if not self.suspended:
+ self._suspendtime = datetime.datetime.now()
+ self.suspended = True
for queue in self.queues:
- # Enqueue elements complete from the last queue
- queue.enqueue(elements)
+ for job in queue.active_jobs:
+ job.suspend()
- # Dequeue processed elements for the next queue
- elements = list(queue.dequeue())
- elements = list(elements)
+ # _resume_jobs()
+ #
+ # Resume suspended jobs.
+ #
+ def _resume_jobs(self):
+ if self.suspended:
+ for queue in self.queues:
+ for job in queue.active_jobs:
+ job.resume()
+ self.suspended = False
+ self._starttime += (datetime.datetime.now() - self._suspendtime)
+ self._suspendtime = None
- # Kickoff whatever processes can be processed at this time
- #
- # We start by queuing from the last queue first, because we want to
- # give priority to queues later in the scheduling process in the case
- # that multiple queues share the same token type.
- #
- # This avoids starvation situations where we dont move on to fetch
- # tasks for elements which failed to pull, and thus need all the pulls
- # to complete before ever starting a build
- for queue in reversed(self.queues):
- queue.process_ready()
+ # _interrupt_event():
+ #
+ # A loop registered event callback for keyboard interrupts
+ #
+ def _interrupt_event(self):
+ # Leave this to the frontend to decide, if no
+ # interrrupt callback was specified, then just terminate.
+ if self._interrupt_callback:
+ self._interrupt_callback()
+ else:
+ # Default without a frontend is just terminate
+ self.terminate_jobs()
- # process_ready() may have skipped jobs, adding them to the done_queue.
- # Pull these skipped elements forward to the next queue and process them.
- process_queues = any(q.dequeue_ready() for q in self.queues)
+ # _terminate_event():
+ #
+ # A loop registered event callback for SIGTERM
+ #
+ def _terminate_event(self):
+ self.terminate_jobs()
- # If nothings ticking, time to bail out
- ticking = 0
+ # _suspend_event():
+ #
+ # A loop registered event callback for SIGTSTP
+ #
+ def _suspend_event(self):
+
+ # Ignore the feedback signals from Job.suspend()
+ if self.internal_stops:
+ self.internal_stops -= 1
+ return
+
+ # No need to care if jobs were suspended or not, we _only_ handle this
+ # while we know jobs are not suspended.
+ self._suspend_jobs()
+ os.kill(os.getpid(), signal.SIGSTOP)
+ self._resume_jobs()
+
+ # _connect_signals():
+ #
+ # Connects our signal handler event callbacks to the mainloop
+ #
+ def _connect_signals(self):
+ self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
+ self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
+ self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+
+ def _disconnect_signals(self):
+ self.loop.remove_signal_handler(signal.SIGINT)
+ self.loop.remove_signal_handler(signal.SIGTSTP)
+ self.loop.remove_signal_handler(signal.SIGTERM)
+
+ def _terminate_jobs_real(self):
+ # 20 seconds is a long time, it can take a while and sometimes
+ # we still fail, need to look deeper into this again.
+ wait_start = datetime.datetime.now()
+ wait_limit = 20.0
+
+ # First tell all jobs to terminate
for queue in self.queues:
- ticking += len(queue.active_jobs)
+ for job in queue.active_jobs:
+ job.terminate()
- if ticking == 0:
- self.loop.stop()
+ # Now wait for them to really terminate
+ for queue in self.queues:
+ for job in queue.active_jobs:
+ elapsed = datetime.datetime.now() - wait_start
+ timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+ if not job.terminate_wait(timeout):
+ job.kill()
+
+ self.loop.stop()
# Regular timeout for driving status in the UI
- def tick(self):
+ def _tick(self):
elapsed = self.elapsed_time()
- self.ticker_callback(elapsed)
- self.loop.call_later(1, self.tick)
-
- # Called by the Queue when starting a Job
- def job_starting(self, job):
- if self.job_start_callback:
- self.job_start_callback(job.element, job.action_name)
-
- # Called by the Queue when a Job completed
- def job_completed(self, queue, job, success):
- if self.job_complete_callback:
- self.job_complete_callback(job.element, queue, job.action_name, success)
+ self._ticker_callback(elapsed)
+ self.loop.call_later(1, self._tick)