summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAbhijit Menon-Sen <ams@2ndQuadrant.com>2015-08-01 09:26:07 +0530
committerAbhijit Menon-Sen <ams@2ndQuadrant.com>2015-09-03 08:43:21 +0530
commit5887e96b270b2f0d19d3e34a09e16e21824c4887 (patch)
tree75cd01edaa824173098d174f7507fa307bad5588
parent88d3751c28d0504f47f8223dc0119fb23767c6d2 (diff)
downloadansible-5887e96b270b2f0d19d3e34a09e16e21824c4887.tar.gz
Introduce a connection locking infrastructure
The lock file is (a temporary file) opened in the parent process, whose open fd is inherited by the workers after fork, and passed down through the PlayContext. Connection grows lock/unlock methods which can be used by individual connection plugins.
-rw-r--r--lib/ansible/executor/task_queue_manager.py5
-rw-r--r--lib/ansible/playbook/play_context.py6
-rw-r--r--lib/ansible/plugins/connections/__init__.py10
3 files changed, 21 insertions, 0 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 59f48142b1..1189e35f91 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -23,6 +23,7 @@ import multiprocessing
import os
import socket
import sys
+import tempfile
from ansible import constants as C
from ansible.errors import AnsibleError
@@ -78,6 +79,10 @@ class TaskQueueManager:
self._failed_hosts = dict()
self._unreachable_hosts = dict()
+ # A temporary file (opened pre-fork) used by connection plugins for
+ # inter-process locking.
+ self._options.connection_lockfile = tempfile.TemporaryFile()
+
self._final_q = multiprocessing.Queue()
# create the pool of worker threads, based on the number of forks specified
diff --git a/lib/ansible/playbook/play_context.py b/lib/ansible/playbook/play_context.py
index 355efbaf26..d4a184fa61 100644
--- a/lib/ansible/playbook/play_context.py
+++ b/lib/ansible/playbook/play_context.py
@@ -161,6 +161,7 @@ class PlayContext(Base):
_private_key_file = FieldAttribute(isa='string', default=C.DEFAULT_PRIVATE_KEY_FILE)
_timeout = FieldAttribute(isa='int', default=C.DEFAULT_TIMEOUT)
_shell = FieldAttribute(isa='string')
+ _connection_lockfd= FieldAttribute(isa='int', default=None)
# privilege escalation fields
_become = FieldAttribute(isa='bool')
@@ -244,6 +245,11 @@ class PlayContext(Base):
if options.connection:
self.connection = options.connection
+ # The lock file is opened in the parent process, and the workers will
+ # inherit the open file, so we just need to help them find it.
+ if options.connection_lockfile:
+ self.connection_lockfd = options.connection_lockfile.fileno()
+
self.remote_user = options.remote_user
self.private_key_file = options.private_key_file
diff --git a/lib/ansible/plugins/connections/__init__.py b/lib/ansible/plugins/connections/__init__.py
index 1ad2876381..5dfcf4c344 100644
--- a/lib/ansible/plugins/connections/__init__.py
+++ b/lib/ansible/plugins/connections/__init__.py
@@ -155,3 +155,13 @@ class ConnectionBase(with_metaclass(ABCMeta, object)):
if incorrect_password in output:
raise AnsibleError('Incorrect %s password' % self._play_context.become_method)
+ def lock_connection(self):
+ f = self._play_context.connection_lockfd
+ self._display.vvvv('CONNECTION: pid %d waiting for lock on %d' % (os.getpid(), f))
+ fcntl.lockf(f, fcntl.LOCK_EX)
+ self._display.vvvv('CONNECTION: pid %d acquired lock on %d' % (os.getpid(), f))
+
+ def unlock_connection(self):
+ f = self._play_context.connection_lockfd
+ fcntl.lockf(f, fcntl.LOCK_UN)
+ self._display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f))