From 0e6454ba2652eaaeea8b1e8a9f93aa2803247395 Mon Sep 17 00:00:00 2001 From: Angelos Evripiotis Date: Wed, 5 Jun 2019 14:37:33 +0100 Subject: _scheduler/jobs: refactor, defensive send_message Simplify the custom 'handle_message' / 'send_message' protocol by not requiring a message_type. These message types share a namespace with the base Job implementation, which could cause trouble. Introduce a new private '_send_message' to implement the old functionality. Subclasses are free to pack a message type into their own messages, this isn't necessary at present and simplifies existing subclass code. --- src/buildstream/_scheduler/jobs/cleanupjob.py | 11 ++---- src/buildstream/_scheduler/jobs/job.py | 55 ++++++++++++++++----------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 672e784bc..327d687d3 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -27,14 +27,10 @@ class CleanupJob(Job): context = self._scheduler.context self._casquota = context.get_casquota() - def handle_message(self, message_type, message): + def handle_message(self, message): # Update the cache size in the main process as we go, # this provides better feedback in the UI. - if message_type == 'update-cache-size': - self._casquota.set_cache_size(message, write_to_disk=False) - return True - - return False + self._casquota.set_cache_size(message, write_to_disk=False) def parent_complete(self, status, result): if status == JobStatus.OK: @@ -54,6 +50,5 @@ class ChildCleanupJob(ChildJob): def child_process(self): def progress(): - self.send_message('update-cache-size', - self._casquota.get_cache_size()) + self.send_message(self._casquota.get_cache_size()) return self._casquota.clean(progress) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 2fb0788dd..0dccadf54 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -344,19 +344,19 @@ class Job(): # handle_message() # # Handle a custom message. This will be called in the main process in - # response to any messages sent to the main proces using the - # Job.send_message() API from inside a Job.child_process() implementation + # response to any messages sent to the main process using the + # Job.send_message() API from inside a Job.child_process() implementation. + # + # There is no need to implement this function if no custom messages are + # expected. # # Args: - # message_type (str): A string to identify the message type # message (any): A simple object (must be pickle-able, i.e. strings, # lists, dicts, numbers, but not Element instances). # - # Returns: - # (bool): Should return a truthy value if message_type is handled. - # - def handle_message(self, message_type, message): - return False + def handle_message(self, message): + raise ImplError("Job '{kind}' does not implement handle_message()" + .format(kind=type(self).__name__)) # parent_complete() # @@ -470,12 +470,11 @@ class Job(): elif envelope.message_type == 'child_data': # If we retry a job, we assign a new value to this self.child_data = envelope.message - - # Try Job subclass specific messages now - elif not self.handle_message(envelope.message_type, - envelope.message): - assert 0, "Unhandled message type '{}': {}" \ - .format(envelope.message_type, envelope.message) + elif envelope.message_type == 'subclass_custom_message': + self.handle_message(envelope.message) + else: + assert False, "Unhandled message type '{}': {}".format( + envelope.message_type, envelope.message) # _parent_process_queue() # @@ -595,13 +594,12 @@ class ChildJob(): # 'message_type's. # # Args: - # message_type (str): The type of message to send. # message_data (any): A simple object (must be pickle-able, i.e. # strings, lists, dicts, numbers, but not Element # instances). This is sent to the parent Job. # - def send_message(self, message_type, message_data): - self._queue.put(_Envelope(message_type, message_data)) + def send_message(self, message_data): + self._send_message('subclass_custom_message', message_data) ####################################################### # Abstract Methods # @@ -706,7 +704,7 @@ class ChildJob(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self.send_message('child_data', self.child_process_data()) + self._send_message('child_data', self.child_process_data()) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -732,7 +730,7 @@ class ChildJob(): else: # No exception occurred in the action - self.send_message('child_data', self.child_process_data()) + self._send_message('child_data', self.child_process_data()) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -748,6 +746,19 @@ class ChildJob(): # Local Private Methods # ####################################################### + # _send_message() + # + # Send data in a message to the parent Job, running in the main process. + # + # Args: + # message_type (str): The type of message to send. + # message_data (any): A simple object (must be pickle-able, i.e. + # strings, lists, dicts, numbers, but not Element + # instances). This is sent to the parent Job. + # + def _send_message(self, message_type, message_data): + self._queue.put(_Envelope(message_type, message_data)) + # _child_send_error() # # Sends an error to the main process through the message queue @@ -763,7 +774,7 @@ class ChildJob(): domain = e.domain reason = e.reason - self.send_message('error', { + self._send_message('error', { 'domain': domain, 'reason': reason }) @@ -782,7 +793,7 @@ class ChildJob(): # def _child_send_result(self, result): if result is not None: - self.send_message('result', result) + self._send_message('result', result) # _child_shutdown() # @@ -818,4 +829,4 @@ class ChildJob(): if message.message_type == MessageType.LOG: return - self.send_message('message', message) + self._send_message('message', message) -- cgit v1.2.1