summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-04-14 05:58:31 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-04-14 05:58:31 +0000
commit5dce02bbd68be31b67b080ac6480af359f826609 (patch)
tree799b0046cd977c8bfffa4615e99c85e3dea9bad1
parent5cb2076daddc464ddfb6d7c39b138fc174bc567f (diff)
parent28eecdf97545353fa9f00845305e6b4879b4d438 (diff)
downloadbuildstream-5dce02bbd68be31b67b080ac6480af359f826609.tar.gz
Merge branch 'abderrahim/backport-scheduler-fixes' into 'bst-1.2'
Backport scheduler fixes to 1.2 See merge request BuildStream/buildstream!1286
-rw-r--r--buildstream/_artifactcache/artifactcache.py2
-rw-r--r--buildstream/_frontend/app.py6
-rw-r--r--buildstream/_scheduler/__init__.py2
-rw-r--r--buildstream/_scheduler/jobs/__init__.py1
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py10
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py13
-rw-r--r--buildstream/_scheduler/jobs/elementjob.py6
-rw-r--r--buildstream/_scheduler/jobs/job.py88
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py5
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py5
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py8
-rw-r--r--buildstream/_scheduler/queues/queue.py146
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py5
-rw-r--r--buildstream/_scheduler/resources.py113
-rw-r--r--buildstream/_scheduler/scheduler.py262
-rw-r--r--buildstream/utils.py5
16 files changed, 382 insertions, 295 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 9b5df1d26..965d6e132 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -273,7 +273,7 @@ class ArtifactCache():
# FIXME: Asking the user what to do may be neater
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
- detail = ("There is not enough space to build the given element.\n"
+ detail = ("There is not enough space to complete the build.\n"
"Please increase the cache-quota in {}."
.format(self.context.config_origin or default_conf))
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 53a342899..312f11d0c 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -40,7 +40,7 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
-from .._scheduler import ElementJob
+from .._scheduler import ElementJob, JobStatus
# Import frontend assets
from . import Profile, LogLine, Status
@@ -518,13 +518,13 @@ class App():
self._status.add_job(job)
self._maybe_render_status()
- def _job_completed(self, job, success):
+ def _job_completed(self, job, status):
self._status.remove_job(job)
self._maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if not success and not self.stream.terminated:
+ if status == JobStatus.FAIL and not self.stream.terminated:
if isinstance(job, ElementJob):
element = job.element
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index b6e3eeb94..470859864 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
from .queues.pullqueue import PullQueue
from .scheduler import Scheduler, SchedStatus
-from .jobs import ElementJob
+from .jobs import ElementJob, JobStatus
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 185d8258a..f8fbd6d68 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1,3 +1,4 @@
from .elementjob import ElementJob
from .cachesizejob import CacheSizeJob
from .cleanupjob import CleanupJob
+from .job import JobStatus
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index 68cd91331..fb56ca016 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job
+from .job import Job, JobStatus
from ..._platform import Platform
@@ -31,12 +31,12 @@ class CacheSizeJob(Job):
def child_process(self):
return self._artifacts.compute_cache_size()
- def parent_complete(self, success, result):
- if success:
+ def parent_complete(self, status, result):
+ if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
- if self._complete_cb:
- self._complete_cb(result)
+ if self._complete_cb:
+ self._complete_cb(status, result)
def child_process_data(self):
return {}
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index c22ce3b98..97b45901f 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job
+from .job import Job, JobStatus
from ..._platform import Platform
@@ -31,12 +31,9 @@ class CleanupJob(Job):
def child_process(self):
return self._artifacts.clean()
- def parent_complete(self, success, result):
- if success:
+ def parent_complete(self, status, result):
+ if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
- if self._complete_cb:
- self._complete_cb()
-
- def child_process_data(self):
- return {}
+ if self._complete_cb:
+ self._complete_cb(status, result)
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 4d53a9d3d..fb5d38e11 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -60,7 +60,7 @@ from .job import Job
# Args:
# job (Job): The job object which completed
# element (Element): The element passed to the Job() constructor
-# success (bool): True if the action_cb did not raise an exception
+# status (JobStatus): The status of whether the workload raised an exception
# result (object): The deserialized object returned by the `action_cb`, or None
# if `success` is False
#
@@ -93,8 +93,8 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, success, result):
- self._complete_cb(self, self._element, success, self._result)
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index d2f5f6536..348204750 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -28,8 +28,6 @@ import traceback
import asyncio
import multiprocessing
-import psutil
-
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
@@ -43,6 +41,22 @@ RC_PERM_FAIL = 2
RC_SKIPPED = 3
+# JobStatus:
+#
+# The job completion status, passed back through the
+# complete callbacks.
+#
+class JobStatus():
+ # Job succeeded
+ OK = 0
+
+ # A temporary BstError was raised
+ FAIL = 1
+
+ # A SkipJob was raised
+ SKIPPED = 3
+
+
# Used to distinguish between status messages and return values
class Envelope():
def __init__(self, message_type, message):
@@ -71,28 +85,11 @@ class Process(multiprocessing.Process):
# action_name (str): The queue action name
# logfile (str): A template string that points to the logfile
# that should be used - should contain {pid}.
-# resources (iter(ResourceType)) - A set of resources this job
-# wants to use.
-# exclusive_resources (iter(ResourceType)) - A set of resources
-# this job wants to use
-# exclusively.
# max_retries (int): The maximum number of retries
#
class Job():
- def __init__(self, scheduler, action_name, logfile, *,
- resources=None, exclusive_resources=None, max_retries=0):
-
- if resources is None:
- resources = set()
- else:
- resources = set(resources)
- if exclusive_resources is None:
- exclusive_resources = set()
- else:
- exclusive_resources = set(resources)
-
- assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
+ def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
@@ -100,12 +97,6 @@ class Job():
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
- # The resources this job wants to access
- self.resources = resources
- # Resources this job needs to access exclusively, i.e., no
- # other job should be allowed to access them
- self.exclusive_resources = exclusive_resources
-
#
# Private members
#
@@ -118,7 +109,6 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
- self._skipped_flag = False # Indicate whether the job was skipped.
self._terminated = False # Whether this job has been explicitly terminated
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
@@ -213,17 +203,10 @@ class Job():
# Forcefully kill the process, and any children it might have.
#
def kill(self):
-
# Force kill
self.message(MessageType.WARN,
"{} did not terminate gracefully, killing".format(self.action_name))
-
- try:
- utils._kill_process_tree(self._process.pid)
- # This can happen if the process died of its own accord before
- # we try to kill it
- except psutil.NoSuchProcess:
- return
+ utils._kill_process_tree(self._process.pid)
# suspend()
#
@@ -280,18 +263,6 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
- # skipped
- #
- # This will evaluate to True if the job was skipped
- # during processing, or if it was forcefully terminated.
- #
- # Returns:
- # (bool): Whether the job should appear as skipped
- #
- @property
- def skipped(self):
- return self._skipped_flag or self._terminated
-
#######################################################
# Abstract Methods #
#######################################################
@@ -302,10 +273,10 @@ class Job():
# pass the result to the main thread.
#
# Args:
- # success (bool): Whether the job was successful.
+ # status (JobStatus): The job exit status
# result (any): The result returned by child_process().
#
- def parent_complete(self, success, result):
+ def parent_complete(self, status, result):
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
@@ -569,16 +540,23 @@ class Job():
#
self._retry_flag = returncode == RC_FAIL
- # Set the flag to alert Queue that this job skipped.
- self._skipped_flag = returncode == RC_SKIPPED
-
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
- success = returncode in (RC_OK, RC_SKIPPED)
- self.parent_complete(success, self._result)
- self._scheduler.job_completed(self, success)
+ # Resolve the outward facing overall job completion status
+ #
+ if returncode == RC_OK:
+ status = JobStatus.OK
+ elif returncode == RC_SKIPPED:
+ status = JobStatus.SKIPPED
+ elif returncode in (RC_FAIL, RC_PERM_FAIL):
+ status = JobStatus.FAIL
+ else:
+ status = JobStatus.FAIL
+
+ self.parent_complete(status, self._result)
+ self._scheduler.job_completed(self, status)
# _parent_process_envelope()
#
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 05e6f7a8b..df8364552 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -19,6 +19,7 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
from . import Queue, QueueStatus
+from ..jobs import JobStatus
from ..resources import ResourceType
from ..._platform import Platform
@@ -65,9 +66,9 @@ class BuildQueue(Queue):
if artifacts.has_quota_exceeded():
self._scheduler.check_cache_size()
- def done(self, job, element, result, success):
+ def done(self, job, element, result, status):
- if success:
+ if status == JobStatus.OK:
# Inform element in main process that assembly is done
element._assemble_done()
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index c5595e14a..423b620de 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -24,6 +24,7 @@ from ... import Consistency
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
# A queue which fetches element sources
@@ -64,9 +65,9 @@ class FetchQueue(Queue):
return QueueStatus.READY
- def done(self, _, element, result, success):
+ def done(self, _, element, result, status):
- if not success:
+ if status == JobStatus.FAIL:
return
element._fetch_done()
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index e4868953e..333e92b2b 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
from ..._exceptions import SkipJob
@@ -51,9 +52,9 @@ class PullQueue(Queue):
else:
return QueueStatus.SKIP
- def done(self, _, element, result, success):
+ def done(self, _, element, result, status):
- if not success:
+ if status == JobStatus.FAIL:
return False
element._pull_done()
@@ -61,4 +62,5 @@ class PullQueue(Queue):
# Build jobs will check the "approximate" size first. Since we
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
- self._scheduler.check_cache_size()
+ if status == JobStatus.OK:
+ self._scheduler.check_cache_size()
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index ec1f1405b..ec1e81350 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ from enum import Enum
import traceback
# Local imports
-from ..jobs import ElementJob
+from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
@@ -72,8 +72,9 @@ class Queue():
# Private members
#
self._scheduler = scheduler
- self._wait_queue = deque()
- self._done_queue = deque()
+ self._resources = scheduler.resources # Shared resource pool
+ self._wait_queue = deque() # Ready / Waiting elements
+ self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
# Assert the subclass has setup class data
@@ -115,16 +116,6 @@ class Queue():
def status(self, element):
return QueueStatus.READY
- # prepare()
- #
- # Abstract method for handling job preparation in the main process.
- #
- # Args:
- # element (Element): The element which is scheduled
- #
- def prepare(self, element):
- pass
-
# done()
#
# Abstract method for handling a successful job completion.
@@ -133,10 +124,9 @@ class Queue():
# job (Job): The job which completed processing
# element (Element): The element which completed processing
# result (any): The return value of the process() implementation
- # success (bool): True if the process() implementation did not
- # raise any exception
+ # status (JobStatus): The return status of the Job
#
- def done(self, job, element, result, success):
+ def done(self, job, element, result, status):
pass
#####################################################
@@ -154,26 +144,18 @@ class Queue():
if not elts:
return
- # Note: The internal lists work with jobs. This is not
- # reflected in any external methods (except
- # pop/peek_ready_jobs).
- def create_job(element):
- logfile = self._element_log_path(element)
- return ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- resources=self.resources,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
-
- # Place skipped elements directly on the done queue
- jobs = [create_job(elt) for elt in elts]
- skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
- wait = [job for job in jobs if job not in skip]
-
- self._wait_queue.extend(wait)
- self._done_queue.extend(skip)
- self.skipped_elements.extend(skip)
+ # Place skipped elements on the done queue right away.
+ #
+ # The remaining ready and waiting elements must remain in the
+ # same queue, and ready status must be determined at the moment
+ # which the scheduler is asking for the next job.
+ #
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
+ wait = [elt for elt in elts if elt not in skip]
+
+ self.skipped_elements.extend(skip) # Public record of skipped elements
+ self._done_queue.extend(skip) # Elements to be processed
+ self._wait_queue.extend(wait) # Elements eligible to be dequeued
# dequeue()
#
@@ -185,69 +167,59 @@ class Queue():
#
def dequeue(self):
while self._done_queue:
- yield self._done_queue.popleft().element
+ yield self._done_queue.popleft()
# dequeue_ready()
#
- # Reports whether there are any elements to dequeue
+ # Reports whether any elements can be promoted to other queues
#
# Returns:
- # (bool): Whether there are elements to dequeue
+ # (bool): Whether there are elements ready
#
def dequeue_ready(self):
return any(self._done_queue)
- # pop_ready_jobs()
- #
- # Returns:
- # ([Job]): A list of jobs to run
+ # harvest_jobs()
#
# Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and processing them if necessary.
- #
- # This will have different results for elements depending
- # on the Queue.status() implementation.
- #
- # o Elements which are QueueStatus.WAIT will not be effected
+ # into the dequeue pool, and creating as many jobs for which resources
+ # can be reserved.
#
- # o Elements which are QueueStatus.SKIP will move directly
- # to the dequeue pool
- #
- # o For Elements which are QueueStatus.READY a Job will be
- # created and returned to the caller, given that the scheduler
- # allows the Queue enough resources for the given job
+ # Returns:
+ # ([Job]): A list of jobs which can be run now
#
- def pop_ready_jobs(self):
+ def harvest_jobs(self):
unready = []
ready = []
while self._wait_queue:
- job = self._wait_queue.popleft()
- element = job.element
+ if not self._resources.reserve(self.resources, peek=True):
+ break
+ element = self._wait_queue.popleft()
status = self.status(element)
+
if status == QueueStatus.WAIT:
- unready.append(job)
- continue
+ unready.append(element)
elif status == QueueStatus.SKIP:
- self._done_queue.append(job)
+ self._done_queue.append(element)
self.skipped_elements.append(element)
- continue
-
- self.prepare(element)
- ready.append(job)
+ else:
+ reserved = self._resources.reserve(self.resources)
+ assert reserved
+ ready.append(element)
- # These were not ready but were in the beginning, give em
- # first priority again next time around
self._wait_queue.extendleft(unready)
- return ready
-
- def peek_ready_jobs(self):
- def ready(job):
- return self.status(job.element) == QueueStatus.READY
-
- yield from (job for job in self._wait_queue if ready(job))
+ return [
+ ElementJob(self._scheduler, self.action_name,
+ self._element_log_path(element),
+ element=element, queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+ for element in ready
+ ]
#####################################################
# Private Methods #
@@ -291,7 +263,11 @@ class Queue():
#
# See the Job object for an explanation of the call signature
#
- def _job_done(self, job, element, success, result):
+ def _job_done(self, job, element, status, result):
+
+ # Now release the resources we reserved
+ #
+ self._resources.release(self.resources)
# Update values that need to be synchronized in the main task
# before calling any queue implementation
@@ -301,7 +277,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- self.done(job, element, result, success)
+ self.done(job, element, result, status)
except BstError as e:
# Report error and mark as failed
@@ -325,21 +301,13 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
- #
- # No exception occured in post processing
- #
-
- # Only place in the output done queue if the job
- # was considered successful
- if success:
- self._done_queue.append(job)
+ # All elements get placed on the done queue for later processing.
+ self._done_queue.append(element)
- # A Job can be skipped whether or not it has failed,
- # we want to only bookkeep them as processed or failed
- # if they are not skipped.
- if job.skipped:
+ # These lists are for bookkeeping purposes for the UI and logging.
+ if status == JobStatus.SKIPPED:
self.skipped_elements.append(element)
- elif success:
+ elif status == JobStatus.OK:
self.processed_elements.append(element)
else:
self.failed_elements.append(element)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index c9011edb9..d7e6546f3 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -24,6 +24,7 @@ from ...plugin import Plugin
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
# A queue which tracks sources
@@ -47,9 +48,9 @@ class TrackQueue(Queue):
return QueueStatus.READY
- def done(self, _, element, result, success):
+ def done(self, _, element, result, status):
- if not success:
+ if status == JobStatus.FAIL:
return
# Set the new refs in the main process one by one as they complete
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
index fcf10d7bd..f19d66b44 100644
--- a/buildstream/_scheduler/resources.py
+++ b/buildstream/_scheduler/resources.py
@@ -34,28 +34,25 @@ class Resources():
ResourceType.UPLOAD: set()
}
- def clear_job_resources(self, job):
- for resource in job.exclusive_resources:
- self._exclusive_resources[resource].remove(hash(job))
+ # reserve()
+ #
+ # Reserves a set of resources
+ #
+ # Args:
+ # resources (set): A set of ResourceTypes
+ # exclusive (set): Another set of ResourceTypes
+ # peek (bool): Whether to only peek at whether the resource is available
+ #
+ # Returns:
+ # (bool): True if the resources could be reserved
+ #
+ def reserve(self, resources, exclusive=None, *, peek=False):
+ if exclusive is None:
+ exclusive = set()
- for resource in job.resources:
- self._used_resources[resource] -= 1
-
- def reserve_exclusive_resources(self, job):
- exclusive = job.exclusive_resources
-
- # The very first thing we do is to register any exclusive
- # resources this job may want. Even if the job is not yet
- # allowed to run (because another job is holding the resource
- # it wants), we can still set this - it just means that any
- # job *currently* using these resources has to finish first,
- # and no new jobs wanting these can be launched (except other
- # exclusive-access jobs).
- #
- for resource in exclusive:
- self._exclusive_resources[resource].add(hash(job))
+ resources = set(resources)
+ exclusive = set(exclusive)
- def reserve_job_resources(self, job):
# First, we check if the job wants to access a resource that
# another job wants exclusive access to. If so, it cannot be
# scheduled.
@@ -68,7 +65,8 @@ class Resources():
# is currently not possible, but may be worth thinking
# about.
#
- for resource in job.resources - job.exclusive_resources:
+ for resource in resources - exclusive:
+
# If our job wants this resource exclusively, we never
# check this, so we can get away with not (temporarily)
# removing it from the set.
@@ -84,14 +82,14 @@ class Resources():
# at a time, despite being allowed to be part of the exclusive
# set.
#
- for exclusive in job.exclusive_resources:
- if self._used_resources[exclusive] != 0:
+ for resource in exclusive:
+ if self._used_resources[resource] != 0:
return False
# Finally, we check if we have enough of each resource
# available. If we don't have enough, the job cannot be
# scheduled.
- for resource in job.resources:
+ for resource in resources:
if (self._max_resources[resource] > 0 and
self._used_resources[resource] >= self._max_resources[resource]):
return False
@@ -99,7 +97,70 @@ class Resources():
# Now we register the fact that our job is using the resources
# it asked for, and tell the scheduler that it is allowed to
# continue.
- for resource in job.resources:
- self._used_resources[resource] += 1
+ if not peek:
+ for resource in resources:
+ self._used_resources[resource] += 1
return True
+
+ # release()
+ #
+ # Release resources previously reserved with Resources.reserve()
+ #
+ # Args:
+ # resources (set): A set of resources to release
+ #
+ def release(self, resources):
+ for resource in resources:
+ assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
+ self._used_resources[resource] -= 1
+
+ # register_exclusive_interest()
+ #
+ # Inform the resources pool that `source` has an interest in
+ # reserving this resource exclusively.
+ #
+ # The source parameter is used to identify the caller, it
+ # must be ensured to be unique for the time that the
+ # interest is registered.
+ #
+ # This function may be called multiple times, and subsequent
+ # calls will simply have no effect until clear_exclusive_interest()
+ # is used to clear the interest.
+ #
+ # This must be called in advance of reserve()
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (any): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def register_exclusive_interest(self, resources, source):
+
+ # The very first thing we do is to register any exclusive
+ # resources this job may want. Even if the job is not yet
+ # allowed to run (because another job is holding the resource
+ # it wants), we can still set this - it just means that any
+ # job *currently* using these resources has to finish first,
+ # and no new jobs wanting these can be launched (except other
+ # exclusive-access jobs).
+ #
+ for resource in resources:
+ self._exclusive_resources[resource].add(source)
+
+ # unregister_exclusive_interest()
+ #
+ # Clear the exclusive interest in these resources.
+ #
+ # This should be called by the given source which registered
+ # an exclusive interest.
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (str): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def unregister_exclusive_interest(self, resources, source):
+
+ for resource in resources:
+ self._exclusive_resources[resource].remove(source)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 2975f9758..30e70a4fc 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import CacheSizeJob, CleanupJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._platform import Platform
@@ -39,6 +39,12 @@ class SchedStatus():
TERMINATED = 1
+# Some action names for the internal jobs we launch
+#
+_ACTION_NAME_CLEANUP = 'cleanup'
+_ACTION_NAME_CACHE_SIZE = 'cache_size'
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -72,8 +78,6 @@ class Scheduler():
#
# Public members
#
- self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -86,18 +90,35 @@ class Scheduler():
#
# Private members
#
+ self._active_jobs = [] # Jobs currently being run in the scheduler
+ self._starttime = start_time # Initial application start time
+ self._suspendtime = None # Session time compensation for suspended state
+ self._queue_jobs = True # Whether we should continue to queue jobs
+
+ # State of cache management related jobs
+ self._cache_size_scheduled = False # Whether we have a cache size job scheduled
+ self._cache_size_running = None # A running CacheSizeJob, or None
+ self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
+ self._cleanup_running = None # A running CleanupJob, or None
+
+ # Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
self._job_start_callback = job_start_callback
self._job_complete_callback = job_complete_callback
- self._starttime = start_time
- self._suspendtime = None
- self._queue_jobs = True # Whether we should continue to queue jobs
+ # Whether our exclusive jobs, like 'cleanup' are currently already
+ # waiting or active.
+ #
+ # This is just a bit quicker than scanning the wait queue and active
+ # queue and comparing job action names.
+ #
+ self._exclusive_waiting = set()
+ self._exclusive_active = set()
- self._resources = Resources(context.sched_builders,
- context.sched_fetchers,
- context.sched_pushers)
+ self.resources = Resources(context.sched_builders,
+ context.sched_fetchers,
+ context.sched_pushers)
# run()
#
@@ -130,7 +151,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
- self._schedule_queue_jobs()
+ self._sched()
self.loop.run_forever()
self.loop.close()
@@ -210,19 +231,6 @@ class Scheduler():
starttime = timenow
return timenow - starttime
- # schedule_jobs()
- #
- # Args:
- # jobs ([Job]): A list of jobs to schedule
- #
- # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
- # run as soon any other queueing jobs finish, provided sufficient
- # resources are available for them to run
- #
- def schedule_jobs(self, jobs):
- for job in jobs:
- self.waiting_jobs.append(job)
-
# job_completed():
#
# Called when a Job completes
@@ -230,13 +238,17 @@ class Scheduler():
# Args:
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
+ # status (JobStatus): The status of the completed job
#
- def job_completed(self, job, success):
- self._resources.clear_job_resources(job)
- self.active_jobs.remove(job)
- self._job_complete_callback(job, success)
- self._schedule_queue_jobs()
+ def job_completed(self, job, status):
+
+ # Remove from the active jobs list
+ self._active_jobs.remove(job)
+
+ # Scheduler owner facing callback
+ self._job_complete_callback(job, status)
+
+ # Now check for more jobs
self._sched()
# check_cache_size():
@@ -245,48 +257,105 @@ class Scheduler():
# size is calculated, a cleanup job will be run automatically
# if needed.
#
- # FIXME: This should ensure that only one cache size job
- # is ever pending at a given time. If a cache size
- # job is already running, it is correct to queue
- # a new one, it is incorrect to have more than one
- # of these jobs pending at a given time, though.
- #
def check_cache_size(self):
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
- resources=[ResourceType.CACHE,
- ResourceType.PROCESS],
- complete_cb=self._run_cleanup)
- self.schedule_jobs([job])
+
+ # Here we assume we are called in response to a job
+ # completion callback, or before entering the scheduler.
+ #
+ # As such there is no need to call `_sched()` from here,
+ # and we prefer to run it once at the last moment.
+ #
+ self._cache_size_scheduled = True
#######################################################
# Local Private Methods #
#######################################################
- # _sched()
+ # _spawn_job()
#
- # The main driving function of the scheduler, it will be called
- # automatically when Scheduler.run() is called initially,
+ # Spanws a job
#
- def _sched(self):
- for job in self.waiting_jobs:
- self._resources.reserve_exclusive_resources(job)
+ # Args:
+ # job (Job): The job to spawn
+ #
+ def _spawn_job(self, job):
+ job.spawn()
+ self._active_jobs.append(job)
+ if self._job_start_callback:
+ self._job_start_callback(job)
- for job in self.waiting_jobs:
- if not self._resources.reserve_job_resources(job):
- continue
+ # Callback for the cache size job
+ def _cache_size_job_complete(self, status, cache_size):
- job.spawn()
- self.waiting_jobs.remove(job)
- self.active_jobs.append(job)
+ # Deallocate cache size job resources
+ self._cache_size_running = None
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
- if self._job_start_callback:
- self._job_start_callback(job)
+ # Schedule a cleanup job if we've hit the threshold
+ if status != JobStatus.OK:
+ return
- # If nothings ticking, time to bail out
- if not self.active_jobs and not self.waiting_jobs:
- self.loop.stop()
+ platform = Platform.get_platform()
+ artifacts = platform.artifactcache
+
+ if artifacts.has_quota_exceeded():
+ self._cleanup_scheduled = True
+
+ # Callback for the cleanup job
+ def _cleanup_job_complete(self, status, cache_size):
- # _schedule_queue_jobs()
+ # Deallocate cleanup job resources
+ self._cleanup_running = None
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
+
+ # Unregister the exclusive interest when we're done with it
+ if not self._cleanup_scheduled:
+ self.resources.unregister_exclusive_interest(
+ [ResourceType.CACHE], 'cache-cleanup'
+ )
+
+ # _sched_cleanup_job()
+ #
+ # Runs a cleanup job if one is scheduled to run now and
+ # sufficient recources are available.
+ #
+ def _sched_cleanup_job(self):
+
+ if self._cleanup_scheduled and self._cleanup_running is None:
+
+ # Ensure we have an exclusive interest in the resources
+ self.resources.register_exclusive_interest(
+ [ResourceType.CACHE], 'cache-cleanup'
+ )
+
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
+ [ResourceType.CACHE]):
+
+ # Update state and launch
+ self._cleanup_scheduled = False
+ self._cleanup_running = \
+ CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
+ complete_cb=self._cleanup_job_complete)
+ self._spawn_job(self._cleanup_running)
+
+ # _sched_cache_size_job()
+ #
+ # Runs a cache size job if one is scheduled to run now and
+ # sufficient recources are available.
+ #
+ def _sched_cache_size_job(self):
+
+ if self._cache_size_scheduled and not self._cache_size_running:
+
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
+ self._cache_size_scheduled = False
+ self._cache_size_running = \
+ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
+ 'cache_size/cache_size',
+ complete_cb=self._cache_size_job_complete)
+ self._spawn_job(self._cache_size_running)
+
+ # _sched_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
# them. This is done here so we can ask for new jobs when jobs
@@ -295,7 +364,7 @@ class Scheduler():
# This will process the Queues, pull elements through the Queues
# and process anything that is ready.
#
- def _schedule_queue_jobs(self):
+ def _sched_queue_jobs(self):
ready = []
process_queues = True
@@ -304,10 +373,7 @@ class Scheduler():
# Pull elements forward through queues
elements = []
for queue in self.queues:
- # Enqueue elements complete from the last queue
queue.enqueue(elements)
-
- # Dequeue processed elements for the next queue
elements = list(queue.dequeue())
# Kickoff whatever processes can be processed at this time
@@ -322,42 +388,51 @@ class Scheduler():
# thus need all the pulls to complete before ever starting
# a build
ready.extend(chain.from_iterable(
- queue.pop_ready_jobs() for queue in reversed(self.queues)
+ q.harvest_jobs() for q in reversed(self.queues)
))
- # pop_ready_jobs() may have skipped jobs, adding them to
- # the done_queue. Pull these skipped elements forward to
- # the next queue and process them.
+ # harvest_jobs() may have decided to skip some jobs, making
+ # them eligible for promotion to the next queue as a side effect.
+ #
+ # If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
- self.schedule_jobs(ready)
- self._sched()
+ # Spawn the jobs
+ #
+ for job in ready:
+ self._spawn_job(job)
- # _run_cleanup()
+ # _sched()
#
- # Schedules the cache cleanup job if the passed size
- # exceeds the cache quota.
+ # Run any jobs which are ready to run, or quit the main loop
+ # when nothing is running or is ready to run.
#
- # Args:
- # cache_size (int): The calculated cache size (ignored)
+ # This is the main driving function of the scheduler, it is called
+ # initially when we enter Scheduler.run(), and at the end of whenever
+ # any job completes, after any bussiness logic has occurred and before
+ # going back to sleep.
#
- # NOTE: This runs in response to completion of the cache size
- # calculation job lauched by Scheduler.check_cache_size(),
- # which will report the calculated cache size.
- #
- def _run_cleanup(self, cache_size):
- platform = Platform.get_platform()
- artifacts = platform.artifactcache
+ def _sched(self):
- if not artifacts.has_quota_exceeded():
- return
+ if not self.terminated:
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
- resources=[ResourceType.CACHE,
- ResourceType.PROCESS],
- exclusive_resources=[ResourceType.CACHE],
- complete_cb=None)
- self.schedule_jobs([job])
+ #
+ # Try the cache management jobs
+ #
+ self._sched_cleanup_job()
+ self._sched_cache_size_job()
+
+ #
+ # Run as many jobs as the queues can handle for the
+ # available resources
+ #
+ self._sched_queue_jobs()
+
+ #
+ # If nothing is ticking then bail out
+ #
+ if not self._active_jobs:
+ self.loop.stop()
# _suspend_jobs()
#
@@ -367,7 +442,7 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- for job in self.active_jobs:
+ for job in self._active_jobs:
job.suspend()
# _resume_jobs()
@@ -376,7 +451,7 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
- for job in self.active_jobs:
+ for job in self._active_jobs:
job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
@@ -449,19 +524,16 @@ class Scheduler():
wait_limit = 20.0
# First tell all jobs to terminate
- for job in self.active_jobs:
+ for job in self._active_jobs:
job.terminate()
# Now wait for them to really terminate
- for job in self.active_jobs:
+ for job in self._active_jobs:
elapsed = datetime.datetime.now() - wait_start
timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
if not job.terminate_wait(timeout):
job.kill()
- # Clear out the waiting jobs
- self.waiting_jobs = []
-
# Regular timeout for driving status in the UI
def _tick(self):
elapsed = self.elapsed_time()
diff --git a/buildstream/utils.py b/buildstream/utils.py
index 992218b5e..e3e9dc8c0 100644
--- a/buildstream/utils.py
+++ b/buildstream/utils.py
@@ -998,6 +998,11 @@ def _kill_process_tree(pid):
# Ignore this error, it can happen with
# some setuid bwrap processes.
pass
+ except psutil.NoSuchProcess:
+ # It is certain that this has already been sent
+ # SIGTERM, so there is a window where the process
+ # could have exited already.
+ pass
# Bloody Murder
for child in children: