diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-04-14 05:58:31 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-04-14 05:58:31 +0000 |
commit | 5dce02bbd68be31b67b080ac6480af359f826609 (patch) | |
tree | 799b0046cd977c8bfffa4615e99c85e3dea9bad1 | |
parent | 5cb2076daddc464ddfb6d7c39b138fc174bc567f (diff) | |
parent | 28eecdf97545353fa9f00845305e6b4879b4d438 (diff) | |
download | buildstream-5dce02bbd68be31b67b080ac6480af359f826609.tar.gz |
Merge branch 'abderrahim/backport-scheduler-fixes' into 'bst-1.2'
Backport scheduler fixes to 1.2
See merge request BuildStream/buildstream!1286
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 2 | ||||
-rw-r--r-- | buildstream/_frontend/app.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/__init__.py | 1 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 10 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 13 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/elementjob.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 88 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 146 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/resources.py | 113 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 262 | ||||
-rw-r--r-- | buildstream/utils.py | 5 |
16 files changed, 382 insertions, 295 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 9b5df1d26..965d6e132 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -273,7 +273,7 @@ class ArtifactCache(): # FIXME: Asking the user what to do may be neater default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], 'buildstream.conf') - detail = ("There is not enough space to build the given element.\n" + detail = ("There is not enough space to complete the build.\n" "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 53a342899..312f11d0c 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -40,7 +40,7 @@ from .._message import Message, MessageType, unconditional_messages from .._stream import Stream from .._versions import BST_FORMAT_VERSION from .. import _yaml -from .._scheduler import ElementJob +from .._scheduler import ElementJob, JobStatus # Import frontend assets from . import Profile, LogLine, Status @@ -518,13 +518,13 @@ class App(): self._status.add_job(job) self._maybe_render_status() - def _job_completed(self, job, success): + def _job_completed(self, job, status): self._status.remove_job(job) self._maybe_render_status() # Dont attempt to handle a failure if the user has already opted to # terminate - if not success and not self.stream.terminated: + if status == JobStatus.FAIL and not self.stream.terminated: if isinstance(job, ElementJob): element = job.element diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index b6e3eeb94..470859864 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue from .queues.pullqueue import PullQueue from .scheduler import Scheduler, SchedStatus -from .jobs import ElementJob +from .jobs import ElementJob, JobStatus diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py index 185d8258a..f8fbd6d68 100644 --- a/buildstream/_scheduler/jobs/__init__.py +++ b/buildstream/_scheduler/jobs/__init__.py @@ -1,3 +1,4 @@ from .elementjob import ElementJob from .cachesizejob import CacheSizeJob from .cleanupjob import CleanupJob +from .job import JobStatus diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index 68cd91331..fb56ca016 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job +from .job import Job, JobStatus from ..._platform import Platform @@ -31,12 +31,12 @@ class CacheSizeJob(Job): def child_process(self): return self._artifacts.compute_cache_size() - def parent_complete(self, success, result): - if success: + def parent_complete(self, status, result): + if status == JobStatus.OK: self._artifacts.set_cache_size(result) - if self._complete_cb: - self._complete_cb(result) + if self._complete_cb: + self._complete_cb(status, result) def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index c22ce3b98..97b45901f 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job +from .job import Job, JobStatus from ..._platform import Platform @@ -31,12 +31,9 @@ class CleanupJob(Job): def child_process(self): return self._artifacts.clean() - def parent_complete(self, success, result): - if success: + def parent_complete(self, status, result): + if status == JobStatus.OK: self._artifacts.set_cache_size(result) - if self._complete_cb: - self._complete_cb() - - def child_process_data(self): - return {} + if self._complete_cb: + self._complete_cb(status, result) diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 4d53a9d3d..fb5d38e11 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -60,7 +60,7 @@ from .job import Job # Args: # job (Job): The job object which completed # element (Element): The element passed to the Job() constructor -# success (bool): True if the action_cb did not raise an exception +# status (JobStatus): The status of whether the workload raised an exception # result (object): The deserialized object returned by the `action_cb`, or None # if `success` is False # @@ -93,8 +93,8 @@ class ElementJob(Job): # Run the action return self._action_cb(self._element) - def parent_complete(self, success, result): - self._complete_cb(self, self._element, success, self._result) + def parent_complete(self, status, result): + self._complete_cb(self, self._element, status, self._result) def message(self, message_type, message, **kwargs): args = dict(kwargs) diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index d2f5f6536..348204750 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -28,8 +28,6 @@ import traceback import asyncio import multiprocessing -import psutil - # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages @@ -43,6 +41,22 @@ RC_PERM_FAIL = 2 RC_SKIPPED = 3 +# JobStatus: +# +# The job completion status, passed back through the +# complete callbacks. +# +class JobStatus(): + # Job succeeded + OK = 0 + + # A temporary BstError was raised + FAIL = 1 + + # A SkipJob was raised + SKIPPED = 3 + + # Used to distinguish between status messages and return values class Envelope(): def __init__(self, message_type, message): @@ -71,28 +85,11 @@ class Process(multiprocessing.Process): # action_name (str): The queue action name # logfile (str): A template string that points to the logfile # that should be used - should contain {pid}. -# resources (iter(ResourceType)) - A set of resources this job -# wants to use. -# exclusive_resources (iter(ResourceType)) - A set of resources -# this job wants to use -# exclusively. # max_retries (int): The maximum number of retries # class Job(): - def __init__(self, scheduler, action_name, logfile, *, - resources=None, exclusive_resources=None, max_retries=0): - - if resources is None: - resources = set() - else: - resources = set(resources) - if exclusive_resources is None: - exclusive_resources = set() - else: - exclusive_resources = set(resources) - - assert exclusive_resources <= resources, "All exclusive resources must also be resources!" + def __init__(self, scheduler, action_name, logfile, *, max_retries=0): # # Public members @@ -100,12 +97,6 @@ class Job(): self.action_name = action_name # The action name for the Queue self.child_data = None # Data to be sent to the main process - # The resources this job wants to access - self.resources = resources - # Resources this job needs to access exclusively, i.e., no - # other job should be allowed to access them - self.exclusive_resources = exclusive_resources - # # Private members # @@ -118,7 +109,6 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - self._skipped_flag = False # Indicate whether the job was skipped. self._terminated = False # Whether this job has been explicitly terminated # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. @@ -213,17 +203,10 @@ class Job(): # Forcefully kill the process, and any children it might have. # def kill(self): - # Force kill self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - - try: - utils._kill_process_tree(self._process.pid) - # This can happen if the process died of its own accord before - # we try to kill it - except psutil.NoSuchProcess: - return + utils._kill_process_tree(self._process.pid) # suspend() # @@ -280,18 +263,6 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id - # skipped - # - # This will evaluate to True if the job was skipped - # during processing, or if it was forcefully terminated. - # - # Returns: - # (bool): Whether the job should appear as skipped - # - @property - def skipped(self): - return self._skipped_flag or self._terminated - ####################################################### # Abstract Methods # ####################################################### @@ -302,10 +273,10 @@ class Job(): # pass the result to the main thread. # # Args: - # success (bool): Whether the job was successful. + # status (JobStatus): The job exit status # result (any): The result returned by child_process(). # - def parent_complete(self, success, result): + def parent_complete(self, status, result): raise ImplError("Job '{kind}' does not implement parent_complete()" .format(kind=type(self).__name__)) @@ -569,16 +540,23 @@ class Job(): # self._retry_flag = returncode == RC_FAIL - # Set the flag to alert Queue that this job skipped. - self._skipped_flag = returncode == RC_SKIPPED - if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - success = returncode in (RC_OK, RC_SKIPPED) - self.parent_complete(success, self._result) - self._scheduler.job_completed(self, success) + # Resolve the outward facing overall job completion status + # + if returncode == RC_OK: + status = JobStatus.OK + elif returncode == RC_SKIPPED: + status = JobStatus.SKIPPED + elif returncode in (RC_FAIL, RC_PERM_FAIL): + status = JobStatus.FAIL + else: + status = JobStatus.FAIL + + self.parent_complete(status, self._result) + self._scheduler.job_completed(self, status) # _parent_process_envelope() # diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 05e6f7a8b..df8364552 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -19,6 +19,7 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> from . import Queue, QueueStatus +from ..jobs import JobStatus from ..resources import ResourceType from ..._platform import Platform @@ -65,9 +66,9 @@ class BuildQueue(Queue): if artifacts.has_quota_exceeded(): self._scheduler.check_cache_size() - def done(self, job, element, result, success): + def done(self, job, element, result, status): - if success: + if status == JobStatus.OK: # Inform element in main process that assembly is done element._assemble_done() diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index c5595e14a..423b620de 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -24,6 +24,7 @@ from ... import Consistency # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus # A queue which fetches element sources @@ -64,9 +65,9 @@ class FetchQueue(Queue): return QueueStatus.READY - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return element._fetch_done() diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index e4868953e..333e92b2b 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus from ..._exceptions import SkipJob @@ -51,9 +52,9 @@ class PullQueue(Queue): else: return QueueStatus.SKIP - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return False element._pull_done() @@ -61,4 +62,5 @@ class PullQueue(Queue): # Build jobs will check the "approximate" size first. Since we # do not get an artifact size from pull jobs, we have to # actually check the cache size. - self._scheduler.check_cache_size() + if status == JobStatus.OK: + self._scheduler.check_cache_size() diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index ec1f1405b..ec1e81350 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -25,7 +25,7 @@ from enum import Enum import traceback # Local imports -from ..jobs import ElementJob +from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports @@ -72,8 +72,9 @@ class Queue(): # Private members # self._scheduler = scheduler - self._wait_queue = deque() - self._done_queue = deque() + self._resources = scheduler.resources # Shared resource pool + self._wait_queue = deque() # Ready / Waiting elements + self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 # Assert the subclass has setup class data @@ -115,16 +116,6 @@ class Queue(): def status(self, element): return QueueStatus.READY - # prepare() - # - # Abstract method for handling job preparation in the main process. - # - # Args: - # element (Element): The element which is scheduled - # - def prepare(self, element): - pass - # done() # # Abstract method for handling a successful job completion. @@ -133,10 +124,9 @@ class Queue(): # job (Job): The job which completed processing # element (Element): The element which completed processing # result (any): The return value of the process() implementation - # success (bool): True if the process() implementation did not - # raise any exception + # status (JobStatus): The return status of the Job # - def done(self, job, element, result, success): + def done(self, job, element, result, status): pass ##################################################### @@ -154,26 +144,18 @@ class Queue(): if not elts: return - # Note: The internal lists work with jobs. This is not - # reflected in any external methods (except - # pop/peek_ready_jobs). - def create_job(element): - logfile = self._element_log_path(element) - return ElementJob(self._scheduler, self.action_name, - logfile, element=element, queue=self, - resources=self.resources, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) - - # Place skipped elements directly on the done queue - jobs = [create_job(elt) for elt in elts] - skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP] - wait = [job for job in jobs if job not in skip] - - self._wait_queue.extend(wait) - self._done_queue.extend(skip) - self.skipped_elements.extend(skip) + # Place skipped elements on the done queue right away. + # + # The remaining ready and waiting elements must remain in the + # same queue, and ready status must be determined at the moment + # which the scheduler is asking for the next job. + # + skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] + wait = [elt for elt in elts if elt not in skip] + + self.skipped_elements.extend(skip) # Public record of skipped elements + self._done_queue.extend(skip) # Elements to be processed + self._wait_queue.extend(wait) # Elements eligible to be dequeued # dequeue() # @@ -185,69 +167,59 @@ class Queue(): # def dequeue(self): while self._done_queue: - yield self._done_queue.popleft().element + yield self._done_queue.popleft() # dequeue_ready() # - # Reports whether there are any elements to dequeue + # Reports whether any elements can be promoted to other queues # # Returns: - # (bool): Whether there are elements to dequeue + # (bool): Whether there are elements ready # def dequeue_ready(self): return any(self._done_queue) - # pop_ready_jobs() - # - # Returns: - # ([Job]): A list of jobs to run + # harvest_jobs() # # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and processing them if necessary. - # - # This will have different results for elements depending - # on the Queue.status() implementation. - # - # o Elements which are QueueStatus.WAIT will not be effected + # into the dequeue pool, and creating as many jobs for which resources + # can be reserved. # - # o Elements which are QueueStatus.SKIP will move directly - # to the dequeue pool - # - # o For Elements which are QueueStatus.READY a Job will be - # created and returned to the caller, given that the scheduler - # allows the Queue enough resources for the given job + # Returns: + # ([Job]): A list of jobs which can be run now # - def pop_ready_jobs(self): + def harvest_jobs(self): unready = [] ready = [] while self._wait_queue: - job = self._wait_queue.popleft() - element = job.element + if not self._resources.reserve(self.resources, peek=True): + break + element = self._wait_queue.popleft() status = self.status(element) + if status == QueueStatus.WAIT: - unready.append(job) - continue + unready.append(element) elif status == QueueStatus.SKIP: - self._done_queue.append(job) + self._done_queue.append(element) self.skipped_elements.append(element) - continue - - self.prepare(element) - ready.append(job) + else: + reserved = self._resources.reserve(self.resources) + assert reserved + ready.append(element) - # These were not ready but were in the beginning, give em - # first priority again next time around self._wait_queue.extendleft(unready) - return ready - - def peek_ready_jobs(self): - def ready(job): - return self.status(job.element) == QueueStatus.READY - - yield from (job for job in self._wait_queue if ready(job)) + return [ + ElementJob(self._scheduler, self.action_name, + self._element_log_path(element), + element=element, queue=self, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + for element in ready + ] ##################################################### # Private Methods # @@ -291,7 +263,11 @@ class Queue(): # # See the Job object for an explanation of the call signature # - def _job_done(self, job, element, success, result): + def _job_done(self, job, element, status, result): + + # Now release the resources we reserved + # + self._resources.release(self.resources) # Update values that need to be synchronized in the main task # before calling any queue implementation @@ -301,7 +277,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - self.done(job, element, result, success) + self.done(job, element, result, status) except BstError as e: # Report error and mark as failed @@ -325,21 +301,13 @@ class Queue(): detail=traceback.format_exc()) self.failed_elements.append(element) else: - # - # No exception occured in post processing - # - - # Only place in the output done queue if the job - # was considered successful - if success: - self._done_queue.append(job) + # All elements get placed on the done queue for later processing. + self._done_queue.append(element) - # A Job can be skipped whether or not it has failed, - # we want to only bookkeep them as processed or failed - # if they are not skipped. - if job.skipped: + # These lists are for bookkeeping purposes for the UI and logging. + if status == JobStatus.SKIPPED: self.skipped_elements.append(element) - elif success: + elif status == JobStatus.OK: self.processed_elements.append(element) else: self.failed_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index c9011edb9..d7e6546f3 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -24,6 +24,7 @@ from ...plugin import Plugin # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus # A queue which tracks sources @@ -47,9 +48,9 @@ class TrackQueue(Queue): return QueueStatus.READY - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return # Set the new refs in the main process one by one as they complete diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py index fcf10d7bd..f19d66b44 100644 --- a/buildstream/_scheduler/resources.py +++ b/buildstream/_scheduler/resources.py @@ -34,28 +34,25 @@ class Resources(): ResourceType.UPLOAD: set() } - def clear_job_resources(self, job): - for resource in job.exclusive_resources: - self._exclusive_resources[resource].remove(hash(job)) + # reserve() + # + # Reserves a set of resources + # + # Args: + # resources (set): A set of ResourceTypes + # exclusive (set): Another set of ResourceTypes + # peek (bool): Whether to only peek at whether the resource is available + # + # Returns: + # (bool): True if the resources could be reserved + # + def reserve(self, resources, exclusive=None, *, peek=False): + if exclusive is None: + exclusive = set() - for resource in job.resources: - self._used_resources[resource] -= 1 - - def reserve_exclusive_resources(self, job): - exclusive = job.exclusive_resources - - # The very first thing we do is to register any exclusive - # resources this job may want. Even if the job is not yet - # allowed to run (because another job is holding the resource - # it wants), we can still set this - it just means that any - # job *currently* using these resources has to finish first, - # and no new jobs wanting these can be launched (except other - # exclusive-access jobs). - # - for resource in exclusive: - self._exclusive_resources[resource].add(hash(job)) + resources = set(resources) + exclusive = set(exclusive) - def reserve_job_resources(self, job): # First, we check if the job wants to access a resource that # another job wants exclusive access to. If so, it cannot be # scheduled. @@ -68,7 +65,8 @@ class Resources(): # is currently not possible, but may be worth thinking # about. # - for resource in job.resources - job.exclusive_resources: + for resource in resources - exclusive: + # If our job wants this resource exclusively, we never # check this, so we can get away with not (temporarily) # removing it from the set. @@ -84,14 +82,14 @@ class Resources(): # at a time, despite being allowed to be part of the exclusive # set. # - for exclusive in job.exclusive_resources: - if self._used_resources[exclusive] != 0: + for resource in exclusive: + if self._used_resources[resource] != 0: return False # Finally, we check if we have enough of each resource # available. If we don't have enough, the job cannot be # scheduled. - for resource in job.resources: + for resource in resources: if (self._max_resources[resource] > 0 and self._used_resources[resource] >= self._max_resources[resource]): return False @@ -99,7 +97,70 @@ class Resources(): # Now we register the fact that our job is using the resources # it asked for, and tell the scheduler that it is allowed to # continue. - for resource in job.resources: - self._used_resources[resource] += 1 + if not peek: + for resource in resources: + self._used_resources[resource] += 1 return True + + # release() + # + # Release resources previously reserved with Resources.reserve() + # + # Args: + # resources (set): A set of resources to release + # + def release(self, resources): + for resource in resources: + assert self._used_resources[resource] > 0, "Scheduler resource imbalance" + self._used_resources[resource] -= 1 + + # register_exclusive_interest() + # + # Inform the resources pool that `source` has an interest in + # reserving this resource exclusively. + # + # The source parameter is used to identify the caller, it + # must be ensured to be unique for the time that the + # interest is registered. + # + # This function may be called multiple times, and subsequent + # calls will simply have no effect until clear_exclusive_interest() + # is used to clear the interest. + # + # This must be called in advance of reserve() + # + # Args: + # resources (set): Set of resources to reserve exclusively + # source (any): Source identifier, to be used again when unregistering + # the interest. + # + def register_exclusive_interest(self, resources, source): + + # The very first thing we do is to register any exclusive + # resources this job may want. Even if the job is not yet + # allowed to run (because another job is holding the resource + # it wants), we can still set this - it just means that any + # job *currently* using these resources has to finish first, + # and no new jobs wanting these can be launched (except other + # exclusive-access jobs). + # + for resource in resources: + self._exclusive_resources[resource].add(source) + + # unregister_exclusive_interest() + # + # Clear the exclusive interest in these resources. + # + # This should be called by the given source which registered + # an exclusive interest. + # + # Args: + # resources (set): Set of resources to reserve exclusively + # source (str): Source identifier, to be used again when unregistering + # the interest. + # + def unregister_exclusive_interest(self, resources, source): + + for resource in resources: + self._exclusive_resources[resource].remove(source) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 2975f9758..30e70a4fc 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -28,7 +28,7 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType -from .jobs import CacheSizeJob, CleanupJob +from .jobs import JobStatus, CacheSizeJob, CleanupJob from .._platform import Platform @@ -39,6 +39,12 @@ class SchedStatus(): TERMINATED = 1 +# Some action names for the internal jobs we launch +# +_ACTION_NAME_CLEANUP = 'cleanup' +_ACTION_NAME_CACHE_SIZE = 'cache_size' + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -72,8 +78,6 @@ class Scheduler(): # # Public members # - self.active_jobs = [] # Jobs currently being run in the scheduler - self.waiting_jobs = [] # Jobs waiting for resources self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues self.terminated = False # Whether the scheduler was asked to terminate or has terminated @@ -86,18 +90,35 @@ class Scheduler(): # # Private members # + self._active_jobs = [] # Jobs currently being run in the scheduler + self._starttime = start_time # Initial application start time + self._suspendtime = None # Session time compensation for suspended state + self._queue_jobs = True # Whether we should continue to queue jobs + + # State of cache management related jobs + self._cache_size_scheduled = False # Whether we have a cache size job scheduled + self._cache_size_running = None # A running CacheSizeJob, or None + self._cleanup_scheduled = False # Whether we have a cleanup job scheduled + self._cleanup_running = None # A running CleanupJob, or None + + # Callbacks to report back to the Scheduler owner self._interrupt_callback = interrupt_callback self._ticker_callback = ticker_callback self._job_start_callback = job_start_callback self._job_complete_callback = job_complete_callback - self._starttime = start_time - self._suspendtime = None - self._queue_jobs = True # Whether we should continue to queue jobs + # Whether our exclusive jobs, like 'cleanup' are currently already + # waiting or active. + # + # This is just a bit quicker than scanning the wait queue and active + # queue and comparing job action names. + # + self._exclusive_waiting = set() + self._exclusive_active = set() - self._resources = Resources(context.sched_builders, - context.sched_fetchers, - context.sched_pushers) + self.resources = Resources(context.sched_builders, + context.sched_fetchers, + context.sched_pushers) # run() # @@ -130,7 +151,7 @@ class Scheduler(): self._connect_signals() # Run the queues - self._schedule_queue_jobs() + self._sched() self.loop.run_forever() self.loop.close() @@ -210,19 +231,6 @@ class Scheduler(): starttime = timenow return timenow - starttime - # schedule_jobs() - # - # Args: - # jobs ([Job]): A list of jobs to schedule - # - # Schedule 'Job's for the scheduler to run. Jobs scheduled will be - # run as soon any other queueing jobs finish, provided sufficient - # resources are available for them to run - # - def schedule_jobs(self, jobs): - for job in jobs: - self.waiting_jobs.append(job) - # job_completed(): # # Called when a Job completes @@ -230,13 +238,17 @@ class Scheduler(): # Args: # queue (Queue): The Queue holding a complete job # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status + # status (JobStatus): The status of the completed job # - def job_completed(self, job, success): - self._resources.clear_job_resources(job) - self.active_jobs.remove(job) - self._job_complete_callback(job, success) - self._schedule_queue_jobs() + def job_completed(self, job, status): + + # Remove from the active jobs list + self._active_jobs.remove(job) + + # Scheduler owner facing callback + self._job_complete_callback(job, status) + + # Now check for more jobs self._sched() # check_cache_size(): @@ -245,48 +257,105 @@ class Scheduler(): # size is calculated, a cleanup job will be run automatically # if needed. # - # FIXME: This should ensure that only one cache size job - # is ever pending at a given time. If a cache size - # job is already running, it is correct to queue - # a new one, it is incorrect to have more than one - # of these jobs pending at a given time, though. - # def check_cache_size(self): - job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size', - resources=[ResourceType.CACHE, - ResourceType.PROCESS], - complete_cb=self._run_cleanup) - self.schedule_jobs([job]) + + # Here we assume we are called in response to a job + # completion callback, or before entering the scheduler. + # + # As such there is no need to call `_sched()` from here, + # and we prefer to run it once at the last moment. + # + self._cache_size_scheduled = True ####################################################### # Local Private Methods # ####################################################### - # _sched() + # _spawn_job() # - # The main driving function of the scheduler, it will be called - # automatically when Scheduler.run() is called initially, + # Spanws a job # - def _sched(self): - for job in self.waiting_jobs: - self._resources.reserve_exclusive_resources(job) + # Args: + # job (Job): The job to spawn + # + def _spawn_job(self, job): + job.spawn() + self._active_jobs.append(job) + if self._job_start_callback: + self._job_start_callback(job) - for job in self.waiting_jobs: - if not self._resources.reserve_job_resources(job): - continue + # Callback for the cache size job + def _cache_size_job_complete(self, status, cache_size): - job.spawn() - self.waiting_jobs.remove(job) - self.active_jobs.append(job) + # Deallocate cache size job resources + self._cache_size_running = None + self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) - if self._job_start_callback: - self._job_start_callback(job) + # Schedule a cleanup job if we've hit the threshold + if status != JobStatus.OK: + return - # If nothings ticking, time to bail out - if not self.active_jobs and not self.waiting_jobs: - self.loop.stop() + platform = Platform.get_platform() + artifacts = platform.artifactcache + + if artifacts.has_quota_exceeded(): + self._cleanup_scheduled = True + + # Callback for the cleanup job + def _cleanup_job_complete(self, status, cache_size): - # _schedule_queue_jobs() + # Deallocate cleanup job resources + self._cleanup_running = None + self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) + + # Unregister the exclusive interest when we're done with it + if not self._cleanup_scheduled: + self.resources.unregister_exclusive_interest( + [ResourceType.CACHE], 'cache-cleanup' + ) + + # _sched_cleanup_job() + # + # Runs a cleanup job if one is scheduled to run now and + # sufficient recources are available. + # + def _sched_cleanup_job(self): + + if self._cleanup_scheduled and self._cleanup_running is None: + + # Ensure we have an exclusive interest in the resources + self.resources.register_exclusive_interest( + [ResourceType.CACHE], 'cache-cleanup' + ) + + if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], + [ResourceType.CACHE]): + + # Update state and launch + self._cleanup_scheduled = False + self._cleanup_running = \ + CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup', + complete_cb=self._cleanup_job_complete) + self._spawn_job(self._cleanup_running) + + # _sched_cache_size_job() + # + # Runs a cache size job if one is scheduled to run now and + # sufficient recources are available. + # + def _sched_cache_size_job(self): + + if self._cache_size_scheduled and not self._cache_size_running: + + if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]): + self._cache_size_scheduled = False + self._cache_size_running = \ + CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, + 'cache_size/cache_size', + complete_cb=self._cache_size_job_complete) + self._spawn_job(self._cache_size_running) + + # _sched_queue_jobs() # # Ask the queues what jobs they want to schedule and schedule # them. This is done here so we can ask for new jobs when jobs @@ -295,7 +364,7 @@ class Scheduler(): # This will process the Queues, pull elements through the Queues # and process anything that is ready. # - def _schedule_queue_jobs(self): + def _sched_queue_jobs(self): ready = [] process_queues = True @@ -304,10 +373,7 @@ class Scheduler(): # Pull elements forward through queues elements = [] for queue in self.queues: - # Enqueue elements complete from the last queue queue.enqueue(elements) - - # Dequeue processed elements for the next queue elements = list(queue.dequeue()) # Kickoff whatever processes can be processed at this time @@ -322,42 +388,51 @@ class Scheduler(): # thus need all the pulls to complete before ever starting # a build ready.extend(chain.from_iterable( - queue.pop_ready_jobs() for queue in reversed(self.queues) + q.harvest_jobs() for q in reversed(self.queues) )) - # pop_ready_jobs() may have skipped jobs, adding them to - # the done_queue. Pull these skipped elements forward to - # the next queue and process them. + # harvest_jobs() may have decided to skip some jobs, making + # them eligible for promotion to the next queue as a side effect. + # + # If that happens, do another round. process_queues = any(q.dequeue_ready() for q in self.queues) - self.schedule_jobs(ready) - self._sched() + # Spawn the jobs + # + for job in ready: + self._spawn_job(job) - # _run_cleanup() + # _sched() # - # Schedules the cache cleanup job if the passed size - # exceeds the cache quota. + # Run any jobs which are ready to run, or quit the main loop + # when nothing is running or is ready to run. # - # Args: - # cache_size (int): The calculated cache size (ignored) + # This is the main driving function of the scheduler, it is called + # initially when we enter Scheduler.run(), and at the end of whenever + # any job completes, after any bussiness logic has occurred and before + # going back to sleep. # - # NOTE: This runs in response to completion of the cache size - # calculation job lauched by Scheduler.check_cache_size(), - # which will report the calculated cache size. - # - def _run_cleanup(self, cache_size): - platform = Platform.get_platform() - artifacts = platform.artifactcache + def _sched(self): - if not artifacts.has_quota_exceeded(): - return + if not self.terminated: - job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', - resources=[ResourceType.CACHE, - ResourceType.PROCESS], - exclusive_resources=[ResourceType.CACHE], - complete_cb=None) - self.schedule_jobs([job]) + # + # Try the cache management jobs + # + self._sched_cleanup_job() + self._sched_cache_size_job() + + # + # Run as many jobs as the queues can handle for the + # available resources + # + self._sched_queue_jobs() + + # + # If nothing is ticking then bail out + # + if not self._active_jobs: + self.loop.stop() # _suspend_jobs() # @@ -367,7 +442,7 @@ class Scheduler(): if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - for job in self.active_jobs: + for job in self._active_jobs: job.suspend() # _resume_jobs() @@ -376,7 +451,7 @@ class Scheduler(): # def _resume_jobs(self): if self.suspended: - for job in self.active_jobs: + for job in self._active_jobs: job.resume() self.suspended = False self._starttime += (datetime.datetime.now() - self._suspendtime) @@ -449,19 +524,16 @@ class Scheduler(): wait_limit = 20.0 # First tell all jobs to terminate - for job in self.active_jobs: + for job in self._active_jobs: job.terminate() # Now wait for them to really terminate - for job in self.active_jobs: + for job in self._active_jobs: elapsed = datetime.datetime.now() - wait_start timeout = max(wait_limit - elapsed.total_seconds(), 0.0) if not job.terminate_wait(timeout): job.kill() - # Clear out the waiting jobs - self.waiting_jobs = [] - # Regular timeout for driving status in the UI def _tick(self): elapsed = self.elapsed_time() diff --git a/buildstream/utils.py b/buildstream/utils.py index 992218b5e..e3e9dc8c0 100644 --- a/buildstream/utils.py +++ b/buildstream/utils.py @@ -998,6 +998,11 @@ def _kill_process_tree(pid): # Ignore this error, it can happen with # some setuid bwrap processes. pass + except psutil.NoSuchProcess: + # It is certain that this has already been sent + # SIGTERM, so there is a window where the process + # could have exited already. + pass # Bloody Murder for child in children: |