From dc863015320f1ff9b749a678a2a1ce4b83c006e0 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Tue, 16 Jul 2019 16:48:38 +0100 Subject: Add workarounds for queue querying in main process --- src/buildstream/_frontend/app.py | 3 ++- src/buildstream/_scheduler/scheduler.py | 29 ++++++++++++++++++----------- src/buildstream/_stream.py | 13 ++++++------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 90070af48..7fd10b4b6 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -570,8 +570,9 @@ class App(): if not self.stream.terminated: if element_job: # look-up queue + # Issue with pickling a queue object, so for now only pass action names for q in self.stream.queues: - if q.action_name == action_name: + if q == action_name: queue = q assert queue, "Job action {} does not have a corresponding queue".format(action_name) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 664986534..44ebeef5f 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -26,6 +26,7 @@ from itertools import chain import signal import datetime from contextlib import contextmanager +import queue # Local imports from .resources import Resources, ResourceType @@ -55,6 +56,7 @@ class NotificationType(enum.Enum): EXCEPTION = "exception" TASK_ERROR = "task_error" SCHED_TERMINATE = "sched_terminate" + QUEUES = "queues" class Notification: @@ -70,7 +72,8 @@ class Notification: element=None, exception=None, domain=None, - reason=None): + reason=None, + queues=None): self.notification_type = notification_type self.full_name = full_name @@ -82,6 +85,7 @@ class Notification: self.exception = exception self.domain = domain self.reason = reason + self.queues = queues # Scheduler() @@ -175,6 +179,14 @@ class Scheduler(): # Hold on to the queues to process self.queues = queues + # Report to the main process which queues are in session, + # for now a list of action_names as pickling queues is + # causing errors. Will need actual queue object or bidirectional + # notification queue for error handling later. + queue_list = [q.action_name for q in self.queues] + notifcation = Notification(NotificationType.QUEUES, queues=queue_list) + self._notify(notifcation) + # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. self.loop = asyncio.new_event_loop() @@ -294,20 +306,17 @@ class Scheduler(): # queue (Queue): The Queue holding a complete job # job (Job): The completed Job # status (JobStatus): The status of the completed job - # process_jobs (bool): If the scheduler should also process the - # job, else just generate the notification # - def job_completed(self, job, status, process_jobs=True): + def job_completed(self, job, status): - if process_jobs: - # Remove from the active jobs list - self._active_jobs.remove(job) + self._active_jobs.remove(job) + element = None if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the Element() instance if interactive failure handling. Note # this may change if the frontend is run in a separate process for pickling - element = job._element if (job.element_job and self._interactive_failure) else None + element = job._element if (job.element_job and self._interactive_failure) else element notification = Notification(NotificationType.JOB_COMPLETE, full_name=job.name, @@ -317,9 +326,7 @@ class Scheduler(): element=element) self._notify(notification) - if process_jobs: - # Now check for more jobs - self._sched() + self._sched() # check_cache_size(): # diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 4de975e9b..e0f0842af 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -38,7 +38,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ - SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus + SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus, Notification from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -114,7 +114,6 @@ class Stream(): try: func(*args, **kwargs) except Exception as e: - from ._scheduler.scheduler import Notification, NotificationType queue.put(Notification(NotificationType.EXCEPTION, exception=e)) def run_in_subprocess(self, func, *args, **kwargs): @@ -367,6 +366,7 @@ class Stream(): if track_elements: self._enqueue_plan(track_elements, queue=track_queue) self._enqueue_plan(elements) + self._run() # fetch() @@ -1646,15 +1646,14 @@ class Stream(): elif notification.notification_type == NotificationType.JOB_COMPLETE: self._state.remove_task(notification.job_action, notification.full_name) if notification.job_status == JobStatus.FAIL: - if notification.failed_element: - unique_id = notification.full_name - else: - unique_id = None - self._state.fail_task(notification.job_action, notification.full_name, unique_id) + self._state.fail_task(notification.job_action, notification.full_name, + notification.failed_element, notification.element) elif notification.notification_type == NotificationType.EXCEPTION: raise notification.exception elif notification.notification_type == NotificationType.TASK_ERROR: set_last_task_error(notification.domain, notification.reason) + elif notification.notification_type == NotificationType.QUEUES: + self.queues = notification.queues else: raise StreamError("Unreccognised notification type recieved") -- cgit v1.2.1