summaryrefslogtreecommitdiff
path: root/Tools/Scripts/webkitpy/common/message_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/Scripts/webkitpy/common/message_pool.py')
-rw-r--r--Tools/Scripts/webkitpy/common/message_pool.py324
1 files changed, 0 insertions, 324 deletions
diff --git a/Tools/Scripts/webkitpy/common/message_pool.py b/Tools/Scripts/webkitpy/common/message_pool.py
deleted file mode 100644
index 03056cf64..000000000
--- a/Tools/Scripts/webkitpy/common/message_pool.py
+++ /dev/null
@@ -1,324 +0,0 @@
-# Copyright (C) 2011 Google Inc. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Module for handling messages and concurrency for run-webkit-tests
-and test-webkitpy. This module follows the design for multiprocessing.Pool
-and concurrency.futures.ProcessPoolExecutor, with the following differences:
-
-* Tasks are executed in stateful subprocesses via objects that implement the
- Worker interface - this allows the workers to share state across tasks.
-* The pool provides an asynchronous event-handling interface so the caller
- may receive events as tasks are processed.
-
-If you don't need these features, use multiprocessing.Pool or concurrency.futures
-intead.
-
-"""
-
-import cPickle
-import logging
-import multiprocessing
-import Queue
-import sys
-import time
-import traceback
-
-
-from webkitpy.common.host import Host
-from webkitpy.common.system import stack_utils
-
-
-_log = logging.getLogger(__name__)
-
-
-def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
- """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
- return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host)
-
-
-class _MessagePool(object):
- def __init__(self, caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
- self._caller = caller
- self._worker_factory = worker_factory
- self._num_workers = num_workers
- self._worker_startup_delay_secs = worker_startup_delay_secs
- self._workers = []
- self._workers_stopped = set()
- self._host = host
- self._name = 'manager'
- self._running_inline = (self._num_workers == 1)
- if self._running_inline:
- self._messages_to_worker = Queue.Queue()
- self._messages_to_manager = Queue.Queue()
- else:
- self._messages_to_worker = multiprocessing.Queue()
- self._messages_to_manager = multiprocessing.Queue()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, exc_traceback):
- self._close()
- return False
-
- def run(self, shards):
- """Posts a list of messages to the pool and waits for them to complete."""
- for message in shards:
- self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
-
- for _ in xrange(self._num_workers):
- self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
-
- self.wait()
-
- def _start_workers(self):
- assert not self._workers
- self._workers_stopped = set()
- host = None
- if self._running_inline or self._can_pickle(self._host):
- host = self._host
-
- for worker_number in xrange(self._num_workers):
- worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None, self._worker_log_level())
- self._workers.append(worker)
- worker.start()
- if self._worker_startup_delay_secs:
- time.sleep(self._worker_startup_delay_secs)
-
- def _worker_log_level(self):
- log_level = logging.NOTSET
- for handler in logging.root.handlers:
- if handler.level != logging.NOTSET:
- if log_level == logging.NOTSET:
- log_level = handler.level
- else:
- log_level = min(log_level, handler.level)
- return log_level
-
- def wait(self):
- try:
- self._start_workers()
- if self._running_inline:
- self._workers[0].run()
- self._loop(block=False)
- else:
- self._loop(block=True)
- finally:
- self._close()
-
- def _close(self):
- for worker in self._workers:
- if worker.is_alive():
- worker.terminate()
- worker.join()
- self._workers = []
- if not self._running_inline:
- # FIXME: This is a hack to get multiprocessing to not log tracebacks during shutdown :(.
- multiprocessing.util._exiting = True
- if self._messages_to_worker:
- self._messages_to_worker.close()
- self._messages_to_worker = None
- if self._messages_to_manager:
- self._messages_to_manager.close()
- self._messages_to_manager = None
-
- def _log_messages(self, messages):
- for message in messages:
- logging.root.handle(message)
-
- def _handle_done(self, source):
- self._workers_stopped.add(source)
-
- @staticmethod
- def _handle_worker_exception(source, exception_type, exception_value, _):
- if exception_type == KeyboardInterrupt:
- raise exception_type(exception_value)
- raise WorkerException(str(exception_value))
-
- def _can_pickle(self, host):
- try:
- cPickle.dumps(host)
- return True
- except TypeError:
- return False
-
- def _loop(self, block):
- try:
- while True:
- if len(self._workers_stopped) == len(self._workers):
- block = False
- message = self._messages_to_manager.get(block)
- self._log_messages(message.logs)
- if message.from_user:
- self._caller.handle(message.name, message.src, *message.args)
- continue
- method = getattr(self, '_handle_' + message.name)
- assert method, 'bad message %s' % repr(message)
- method(message.src, *message.args)
- except Queue.Empty:
- pass
-
-
-class WorkerException(BaseException):
- """Raised when we receive an unexpected/unknown exception from a worker."""
- pass
-
-
-class _Message(object):
- def __init__(self, src, message_name, message_args, from_user, logs):
- self.src = src
- self.name = message_name
- self.args = message_args
- self.from_user = from_user
- self.logs = logs
-
- def __repr__(self):
- return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
-
-
-class _Worker(multiprocessing.Process):
- def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager, log_level):
- super(_Worker, self).__init__()
- self.host = host
- self.worker_number = worker_number
- self.name = 'worker/%d' % worker_number
- self.log_messages = []
- self.log_level = log_level
- self._running_inline = running_inline
- self._manager = manager
-
- self._messages_to_manager = messages_to_manager
- self._messages_to_worker = messages_to_worker
- self._worker = worker_factory(self)
- self._logger = None
- self._log_handler = None
-
- def terminate(self):
- if self._worker:
- if hasattr(self._worker, 'stop'):
- self._worker.stop()
- self._worker = None
- if self.is_alive():
- super(_Worker, self).terminate()
-
- def _close(self):
- if self._log_handler and self._logger:
- self._logger.removeHandler(self._log_handler)
- self._log_handler = None
- self._logger = None
-
- def start(self):
- if not self._running_inline:
- super(_Worker, self).start()
-
- def run(self):
- if not self.host:
- self.host = Host()
- if not self._running_inline:
- self._set_up_logging()
-
- worker = self._worker
- exception_msg = ""
- _log.debug("%s starting" % self.name)
-
- try:
- if hasattr(worker, 'start'):
- worker.start()
- while True:
- message = self._messages_to_worker.get()
- if message.from_user:
- worker.handle(message.name, message.src, *message.args)
- self._yield_to_manager()
- else:
- assert message.name == 'stop', 'bad message %s' % repr(message)
- break
-
- _log.debug("%s exiting" % self.name)
- except Queue.Empty:
- assert False, '%s: ran out of messages in worker queue.' % self.name
- except KeyboardInterrupt, e:
- self._raise(sys.exc_info())
- except Exception, e:
- self._raise(sys.exc_info())
- finally:
- try:
- if hasattr(worker, 'stop'):
- worker.stop()
- finally:
- self._post(name='done', args=(), from_user=False)
- self._close()
-
- def post(self, name, *args):
- self._post(name, args, from_user=True)
- self._yield_to_manager()
-
- def _yield_to_manager(self):
- if self._running_inline:
- self._manager._loop(block=False)
-
- def _post(self, name, args, from_user):
- log_messages = self.log_messages
- self.log_messages = []
- self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
-
- def _raise(self, exc_info):
- exception_type, exception_value, exception_traceback = exc_info
- if self._running_inline:
- raise exception_type, exception_value, exception_traceback
-
- if exception_type == KeyboardInterrupt:
- _log.debug("%s: interrupted, exiting" % self.name)
- stack_utils.log_traceback(_log.debug, exception_traceback)
- else:
- _log.error("%s: %s('%s') raised:" % (self.name, exception_value.__class__.__name__, str(exception_value)))
- stack_utils.log_traceback(_log.error, exception_traceback)
- # Since tracebacks aren't picklable, send the extracted stack instead.
- stack = traceback.extract_tb(exception_traceback)
- self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
-
- def _set_up_logging(self):
- self._logger = logging.getLogger()
-
- # The unix multiprocessing implementation clones any log handlers into the child process,
- # so we remove them to avoid duplicate logging.
- for h in self._logger.handlers:
- self._logger.removeHandler(h)
-
- self._log_handler = _WorkerLogHandler(self)
- self._logger.addHandler(self._log_handler)
- self._logger.setLevel(self.log_level)
-
-
-class _WorkerLogHandler(logging.Handler):
- def __init__(self, worker):
- logging.Handler.__init__(self)
- self._worker = worker
- self.setLevel(worker.log_level)
-
- def emit(self, record):
- self._worker.log_messages.append(record)