diff options
Diffstat (limited to 'lib/ansible/executor/process')
-rw-r--r-- | lib/ansible/executor/process/__init__.py | 21 | ||||
-rw-r--r-- | lib/ansible/executor/process/result.py | 176 | ||||
-rw-r--r-- | lib/ansible/executor/process/worker.py | 155 |
3 files changed, 352 insertions, 0 deletions
diff --git a/lib/ansible/executor/process/__init__.py b/lib/ansible/executor/process/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/lib/ansible/executor/process/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py new file mode 100644 index 0000000000..f0416db852 --- /dev/null +++ b/lib/ansible/executor/process/result.py @@ -0,0 +1,176 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from six.moves import queue +import multiprocessing +import os +import signal +import sys +import time +import traceback + +HAS_ATFORK=True +try: + from Crypto.Random import atfork +except ImportError: + HAS_ATFORK=False + +from ansible.playbook.handler import Handler +from ansible.playbook.task import Task + +from ansible.utils.debug import debug + +__all__ = ['ResultProcess'] + + +class ResultProcess(multiprocessing.Process): + ''' + The result worker thread, which reads results from the results + queue and fires off callbacks/etc. as necessary. + ''' + + def __init__(self, final_q, workers): + + # takes a task queue manager as the sole param: + self._final_q = final_q + self._workers = workers + self._cur_worker = 0 + self._terminated = False + + super(ResultProcess, self).__init__() + + def _send_result(self, result): + debug("sending result: %s" % (result,)) + self._final_q.put(result, block=False) + debug("done sending result") + + def _read_worker_result(self): + result = None + starting_point = self._cur_worker + while True: + (worker_prc, main_q, rslt_q) = self._workers[self._cur_worker] + self._cur_worker += 1 + if self._cur_worker >= len(self._workers): + self._cur_worker = 0 + + try: + if not rslt_q.empty(): + debug("worker %d has data to read" % self._cur_worker) + result = rslt_q.get(block=False) + debug("got a result from worker %d: %s" % (self._cur_worker, result)) + break + except queue.Empty: + pass + + if self._cur_worker == starting_point: + break + + return result + + def terminate(self): + self._terminated = True + super(ResultProcess, self).terminate() + + def run(self): + ''' + The main thread execution, which reads from the results queue + indefinitely and sends callbacks/etc. when results are received. + ''' + + if HAS_ATFORK: + atfork() + + while True: + try: + result = self._read_worker_result() + if result is None: + time.sleep(0.1) + continue + + host_name = result._host.get_name() + + # send callbacks, execute other options based on the result status + # FIXME: this should all be cleaned up and probably moved to a sub-function. + # the fact that this sometimes sends a TaskResult and other times + # sends a raw dictionary back may be confusing, but the result vs. + # results implementation for tasks with loops should be cleaned up + # better than this + if result.is_unreachable(): + self._send_result(('host_unreachable', result)) + elif result.is_failed(): + self._send_result(('host_task_failed', result)) + elif result.is_skipped(): + self._send_result(('host_task_skipped', result)) + else: + # if this task is notifying a handler, do it now + if result._task.notify: + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for notify in result._task.notify: + self._send_result(('notify_handler', result._host, notify)) + + if result._task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = result._result['results'] + else: + result_items = [ result._result ] + + for result_item in result_items: + #if 'include' in result_item: + # include_variables = result_item.get('include_variables', dict()) + # if 'item' in result_item: + # include_variables['item'] = result_item['item'] + # self._send_result(('include', result._host, result._task, result_item['include'], include_variables)) + #elif 'add_host' in result_item: + if 'add_host' in result_item: + # this task added a new host (add_host module) + self._send_result(('add_host', result_item)) + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._send_result(('add_group', result._host, result_item)) + elif 'ansible_facts' in result_item: + # if this task is registering facts, do that now + if result._task.action in ('set_fact', 'include_vars'): + for (key, value) in result_item['ansible_facts'].iteritems(): + self._send_result(('set_host_var', result._host, key, value)) + else: + self._send_result(('set_host_facts', result._host, result_item['ansible_facts'])) + + # finally, send the ok for this task + self._send_result(('host_task_ok', result)) + + # if this task is registering a result, do it now + if result._task.register: + self._send_result(('set_host_var', result._host, result._task.register, result._result)) + + except queue.Empty: + pass + except (KeyboardInterrupt, IOError, EOFError): + break + except: + # FIXME: we should probably send a proper callback here instead of + # simply dumping a stack trace on the screen + traceback.print_exc() + break + diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py new file mode 100644 index 0000000000..d8e8960fe4 --- /dev/null +++ b/lib/ansible/executor/process/worker.py @@ -0,0 +1,155 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from six.moves import queue +import multiprocessing +import os +import signal +import sys +import time +import traceback + +HAS_ATFORK=True +try: + from Crypto.Random import atfork +except ImportError: + HAS_ATFORK=False + +from ansible.errors import AnsibleError, AnsibleConnectionFailure +from ansible.executor.task_executor import TaskExecutor +from ansible.executor.task_result import TaskResult +from ansible.playbook.handler import Handler +from ansible.playbook.task import Task + +from ansible.utils.debug import debug + +__all__ = ['WorkerProcess'] + + +class WorkerProcess(multiprocessing.Process): + ''' + The worker thread class, which uses TaskExecutor to run tasks + read from a job queue and pushes results into a results queue + for reading later. + ''' + + def __init__(self, tqm, main_q, rslt_q, loader): + + # takes a task queue manager as the sole param: + self._main_q = main_q + self._rslt_q = rslt_q + self._loader = loader + + # dupe stdin, if we have one + self._new_stdin = sys.stdin + try: + fileno = sys.stdin.fileno() + if fileno is not None: + try: + self._new_stdin = os.fdopen(os.dup(fileno)) + except OSError, e: + # couldn't dupe stdin, most likely because it's + # not a valid file descriptor, so we just rely on + # using the one that was passed in + pass + except ValueError: + # couldn't get stdin's fileno, so we just carry on + pass + + super(WorkerProcess, self).__init__() + + def run(self): + ''' + Called when the process is started, and loops indefinitely + until an error is encountered (typically an IOerror from the + queue pipe being disconnected). During the loop, we attempt + to pull tasks off the job queue and run them, pushing the result + onto the results queue. We also remove the host from the blocked + hosts list, to signify that they are ready for their next task. + ''' + + if HAS_ATFORK: + atfork() + + while True: + task = None + try: + if not self._main_q.empty(): + debug("there's work to be done!") + (host, task, basedir, job_vars, connection_info, shared_loader_obj) = self._main_q.get(block=False) + debug("got a task/handler to work on: %s" % task) + + # because the task queue manager starts workers (forks) before the + # playbook is loaded, set the basedir of the loader inherted by + # this fork now so that we can find files correctly + self._loader.set_basedir(basedir) + + # Serializing/deserializing tasks does not preserve the loader attribute, + # since it is passed to the worker during the forking of the process and + # would be wasteful to serialize. So we set it here on the task now, and + # the task handles updating parent/child objects as needed. + task.set_loader(self._loader) + + # apply the given task's information to the connection info, + # which may override some fields already set by the play or + # the options specified on the command line + new_connection_info = connection_info.set_task_override(task) + + # execute the task and build a TaskResult from the result + debug("running TaskExecutor() for %s/%s" % (host, task)) + executor_result = TaskExecutor(host, task, job_vars, new_connection_info, self._new_stdin, self._loader, shared_loader_obj).run() + debug("done running TaskExecutor() for %s/%s" % (host, task)) + task_result = TaskResult(host, task, executor_result) + + # put the result on the result queue + debug("sending task result") + self._rslt_q.put(task_result, block=False) + debug("done sending task result") + + else: + time.sleep(0.1) + + except queue.Empty: + pass + except (IOError, EOFError, KeyboardInterrupt): + break + except AnsibleConnectionFailure: + try: + if task: + task_result = TaskResult(host, task, dict(unreachable=True)) + self._rslt_q.put(task_result, block=False) + except: + # FIXME: most likely an abort, catch those kinds of errors specifically + break + except Exception, e: + debug("WORKER EXCEPTION: %s" % e) + debug("WORKER EXCEPTION: %s" % traceback.format_exc()) + try: + if task: + task_result = TaskResult(host, task, dict(failed=True, exception=traceback.format_exc(), stdout='')) + self._rslt_q.put(task_result, block=False) + except: + # FIXME: most likely an abort, catch those kinds of errors specifically + break + + debug("WORKER PROCESS EXITING") + + |