summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/process
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/executor/process')
-rw-r--r--lib/ansible/executor/process/__init__.py21
-rw-r--r--lib/ansible/executor/process/result.py176
-rw-r--r--lib/ansible/executor/process/worker.py155
3 files changed, 352 insertions, 0 deletions
diff --git a/lib/ansible/executor/process/__init__.py b/lib/ansible/executor/process/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/lib/ansible/executor/process/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py
new file mode 100644
index 0000000000..f0416db852
--- /dev/null
+++ b/lib/ansible/executor/process/result.py
@@ -0,0 +1,176 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from six.moves import queue
+import multiprocessing
+import os
+import signal
+import sys
+import time
+import traceback
+
+HAS_ATFORK=True
+try:
+ from Crypto.Random import atfork
+except ImportError:
+ HAS_ATFORK=False
+
+from ansible.playbook.handler import Handler
+from ansible.playbook.task import Task
+
+from ansible.utils.debug import debug
+
+__all__ = ['ResultProcess']
+
+
+class ResultProcess(multiprocessing.Process):
+ '''
+ The result worker thread, which reads results from the results
+ queue and fires off callbacks/etc. as necessary.
+ '''
+
+ def __init__(self, final_q, workers):
+
+ # takes a task queue manager as the sole param:
+ self._final_q = final_q
+ self._workers = workers
+ self._cur_worker = 0
+ self._terminated = False
+
+ super(ResultProcess, self).__init__()
+
+ def _send_result(self, result):
+ debug("sending result: %s" % (result,))
+ self._final_q.put(result, block=False)
+ debug("done sending result")
+
+ def _read_worker_result(self):
+ result = None
+ starting_point = self._cur_worker
+ while True:
+ (worker_prc, main_q, rslt_q) = self._workers[self._cur_worker]
+ self._cur_worker += 1
+ if self._cur_worker >= len(self._workers):
+ self._cur_worker = 0
+
+ try:
+ if not rslt_q.empty():
+ debug("worker %d has data to read" % self._cur_worker)
+ result = rslt_q.get(block=False)
+ debug("got a result from worker %d: %s" % (self._cur_worker, result))
+ break
+ except queue.Empty:
+ pass
+
+ if self._cur_worker == starting_point:
+ break
+
+ return result
+
+ def terminate(self):
+ self._terminated = True
+ super(ResultProcess, self).terminate()
+
+ def run(self):
+ '''
+ The main thread execution, which reads from the results queue
+ indefinitely and sends callbacks/etc. when results are received.
+ '''
+
+ if HAS_ATFORK:
+ atfork()
+
+ while True:
+ try:
+ result = self._read_worker_result()
+ if result is None:
+ time.sleep(0.1)
+ continue
+
+ host_name = result._host.get_name()
+
+ # send callbacks, execute other options based on the result status
+ # FIXME: this should all be cleaned up and probably moved to a sub-function.
+ # the fact that this sometimes sends a TaskResult and other times
+ # sends a raw dictionary back may be confusing, but the result vs.
+ # results implementation for tasks with loops should be cleaned up
+ # better than this
+ if result.is_unreachable():
+ self._send_result(('host_unreachable', result))
+ elif result.is_failed():
+ self._send_result(('host_task_failed', result))
+ elif result.is_skipped():
+ self._send_result(('host_task_skipped', result))
+ else:
+ # if this task is notifying a handler, do it now
+ if result._task.notify:
+ # The shared dictionary for notified handlers is a proxy, which
+ # does not detect when sub-objects within the proxy are modified.
+ # So, per the docs, we reassign the list so the proxy picks up and
+ # notifies all other threads
+ for notify in result._task.notify:
+ self._send_result(('notify_handler', result._host, notify))
+
+ if result._task.loop:
+ # this task had a loop, and has more than one result, so
+ # loop over all of them instead of a single result
+ result_items = result._result['results']
+ else:
+ result_items = [ result._result ]
+
+ for result_item in result_items:
+ #if 'include' in result_item:
+ # include_variables = result_item.get('include_variables', dict())
+ # if 'item' in result_item:
+ # include_variables['item'] = result_item['item']
+ # self._send_result(('include', result._host, result._task, result_item['include'], include_variables))
+ #elif 'add_host' in result_item:
+ if 'add_host' in result_item:
+ # this task added a new host (add_host module)
+ self._send_result(('add_host', result_item))
+ elif 'add_group' in result_item:
+ # this task added a new group (group_by module)
+ self._send_result(('add_group', result._host, result_item))
+ elif 'ansible_facts' in result_item:
+ # if this task is registering facts, do that now
+ if result._task.action in ('set_fact', 'include_vars'):
+ for (key, value) in result_item['ansible_facts'].iteritems():
+ self._send_result(('set_host_var', result._host, key, value))
+ else:
+ self._send_result(('set_host_facts', result._host, result_item['ansible_facts']))
+
+ # finally, send the ok for this task
+ self._send_result(('host_task_ok', result))
+
+ # if this task is registering a result, do it now
+ if result._task.register:
+ self._send_result(('set_host_var', result._host, result._task.register, result._result))
+
+ except queue.Empty:
+ pass
+ except (KeyboardInterrupt, IOError, EOFError):
+ break
+ except:
+ # FIXME: we should probably send a proper callback here instead of
+ # simply dumping a stack trace on the screen
+ traceback.print_exc()
+ break
+
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
new file mode 100644
index 0000000000..d8e8960fe4
--- /dev/null
+++ b/lib/ansible/executor/process/worker.py
@@ -0,0 +1,155 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from six.moves import queue
+import multiprocessing
+import os
+import signal
+import sys
+import time
+import traceback
+
+HAS_ATFORK=True
+try:
+ from Crypto.Random import atfork
+except ImportError:
+ HAS_ATFORK=False
+
+from ansible.errors import AnsibleError, AnsibleConnectionFailure
+from ansible.executor.task_executor import TaskExecutor
+from ansible.executor.task_result import TaskResult
+from ansible.playbook.handler import Handler
+from ansible.playbook.task import Task
+
+from ansible.utils.debug import debug
+
+__all__ = ['WorkerProcess']
+
+
+class WorkerProcess(multiprocessing.Process):
+ '''
+ The worker thread class, which uses TaskExecutor to run tasks
+ read from a job queue and pushes results into a results queue
+ for reading later.
+ '''
+
+ def __init__(self, tqm, main_q, rslt_q, loader):
+
+ # takes a task queue manager as the sole param:
+ self._main_q = main_q
+ self._rslt_q = rslt_q
+ self._loader = loader
+
+ # dupe stdin, if we have one
+ self._new_stdin = sys.stdin
+ try:
+ fileno = sys.stdin.fileno()
+ if fileno is not None:
+ try:
+ self._new_stdin = os.fdopen(os.dup(fileno))
+ except OSError, e:
+ # couldn't dupe stdin, most likely because it's
+ # not a valid file descriptor, so we just rely on
+ # using the one that was passed in
+ pass
+ except ValueError:
+ # couldn't get stdin's fileno, so we just carry on
+ pass
+
+ super(WorkerProcess, self).__init__()
+
+ def run(self):
+ '''
+ Called when the process is started, and loops indefinitely
+ until an error is encountered (typically an IOerror from the
+ queue pipe being disconnected). During the loop, we attempt
+ to pull tasks off the job queue and run them, pushing the result
+ onto the results queue. We also remove the host from the blocked
+ hosts list, to signify that they are ready for their next task.
+ '''
+
+ if HAS_ATFORK:
+ atfork()
+
+ while True:
+ task = None
+ try:
+ if not self._main_q.empty():
+ debug("there's work to be done!")
+ (host, task, basedir, job_vars, connection_info, shared_loader_obj) = self._main_q.get(block=False)
+ debug("got a task/handler to work on: %s" % task)
+
+ # because the task queue manager starts workers (forks) before the
+ # playbook is loaded, set the basedir of the loader inherted by
+ # this fork now so that we can find files correctly
+ self._loader.set_basedir(basedir)
+
+ # Serializing/deserializing tasks does not preserve the loader attribute,
+ # since it is passed to the worker during the forking of the process and
+ # would be wasteful to serialize. So we set it here on the task now, and
+ # the task handles updating parent/child objects as needed.
+ task.set_loader(self._loader)
+
+ # apply the given task's information to the connection info,
+ # which may override some fields already set by the play or
+ # the options specified on the command line
+ new_connection_info = connection_info.set_task_override(task)
+
+ # execute the task and build a TaskResult from the result
+ debug("running TaskExecutor() for %s/%s" % (host, task))
+ executor_result = TaskExecutor(host, task, job_vars, new_connection_info, self._new_stdin, self._loader, shared_loader_obj).run()
+ debug("done running TaskExecutor() for %s/%s" % (host, task))
+ task_result = TaskResult(host, task, executor_result)
+
+ # put the result on the result queue
+ debug("sending task result")
+ self._rslt_q.put(task_result, block=False)
+ debug("done sending task result")
+
+ else:
+ time.sleep(0.1)
+
+ except queue.Empty:
+ pass
+ except (IOError, EOFError, KeyboardInterrupt):
+ break
+ except AnsibleConnectionFailure:
+ try:
+ if task:
+ task_result = TaskResult(host, task, dict(unreachable=True))
+ self._rslt_q.put(task_result, block=False)
+ except:
+ # FIXME: most likely an abort, catch those kinds of errors specifically
+ break
+ except Exception, e:
+ debug("WORKER EXCEPTION: %s" % e)
+ debug("WORKER EXCEPTION: %s" % traceback.format_exc())
+ try:
+ if task:
+ task_result = TaskResult(host, task, dict(failed=True, exception=traceback.format_exc(), stdout=''))
+ self._rslt_q.put(task_result, block=False)
+ except:
+ # FIXME: most likely an abort, catch those kinds of errors specifically
+ break
+
+ debug("WORKER PROCESS EXITING")
+
+