summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorNathaniel Case <this.is@nathanielca.se>2017-11-09 15:04:40 -0500
committerGitHub <noreply@github.com>2017-11-09 15:04:40 -0500
commit9c0275a879d96eb9aee5e66987b1f07a013f23db (patch)
tree22e5ade2339675e717ed7eb7d1478920e75d85ab /bin
parent897b31f2499b81367cf671053fdb88145cb3a5cb (diff)
downloadansible-9c0275a879d96eb9aee5e66987b1f07a013f23db.tar.gz
Connection plugins network_cli and netconf (#32521)
* implements jsonrpc message passing for ansible-connection * implements more generic mechanism for persistent connections * starts persistent connection in task_executor if enabled and supported * supports using network_cli as top level connection plugin * enhances logging for persistent connection to stdout * Update action plugins * Fix Python3 RPC * Fix Junos bytes<-->str issues * supports using netconf as top level connection plugin * Error message when running netconf on an unsupported platform * Update tests * Fix `authorize: yes` for `connection: local` * Handle potentially JSON data in terminal * Add clarifying detail if possible on ConnectionError
Diffstat (limited to 'bin')
-rwxr-xr-xbin/ansible-connection351
1 files changed, 138 insertions, 213 deletions
diff --git a/bin/ansible-connection b/bin/ansible-connection
index dd727743da..7fbceedbd9 100755
--- a/bin/ansible-connection
+++ b/bin/ansible-connection
@@ -1,23 +1,6 @@
#!/usr/bin/env python
-
-# (c) 2017, Ansible, Inc. <support@ansible.com>
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-########################################################
+# Copyright: (c) 2017, Ansible Project
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@@ -36,91 +19,68 @@ import socket
import sys
import time
import traceback
-import datetime
import errno
+import json
from ansible import constants as C
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.six import PY3
from ansible.module_utils.six.moves import cPickle
from ansible.module_utils.connection import send_data, recv_data
+from ansible.module_utils.service import fork_process
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import connection_loader
from ansible.utils.path import unfrackpath, makedirs_safe
-from ansible.errors import AnsibleConnectionFailure
+from ansible.errors import AnsibleError
from ansible.utils.display import Display
+from ansible.utils.jsonrpc import JsonRpcServer
-def do_fork():
+class ConnectionProcess(object):
'''
- Does the required double fork for a daemon process. Based on
- http://code.activestate.com/recipes/66012-fork-a-daemon-process-on-unix/
+ The connection process wraps around a Connection object that manages
+ the connection to a remote device that persists over the playbook
'''
- try:
- pid = os.fork()
- if pid > 0:
- return pid
- # This is done as a 'good practice' for daemons, but we need to keep the cwd
- # leaving it here as a note that we KNOW its good practice but are not doing it on purpose.
- # os.chdir("/")
- os.setsid()
- os.umask(0)
-
- try:
- pid = os.fork()
- if pid > 0:
- sys.exit(0)
-
- if C.DEFAULT_LOG_PATH != '':
- out_file = open(C.DEFAULT_LOG_PATH, 'ab+')
- err_file = open(C.DEFAULT_LOG_PATH, 'ab+', 0)
- else:
- out_file = open('/dev/null', 'ab+')
- err_file = open('/dev/null', 'ab+', 0)
-
- os.dup2(out_file.fileno(), sys.stdout.fileno())
- os.dup2(err_file.fileno(), sys.stderr.fileno())
- os.close(sys.stdin.fileno())
-
- return pid
- except OSError as e:
- sys.exit(1)
- except OSError as e:
- sys.exit(1)
-
-
-class Server():
-
- def __init__(self, socket_path, play_context):
- self.socket_path = socket_path
+ def __init__(self, fd, play_context, socket_path, original_path):
self.play_context = play_context
+ self.socket_path = socket_path
+ self.original_path = original_path
- display.display(
- 'creating new control socket for host %s:%s as user %s' %
- (play_context.remote_addr, play_context.port, play_context.remote_user),
- log_only=True
- )
-
- display.display('control socket path is %s' % socket_path, log_only=True)
- display.display('current working directory is %s' % os.getcwd(), log_only=True)
-
- self._start_time = datetime.datetime.now()
-
- display.display("using connection plugin %s" % self.play_context.connection, log_only=True)
-
- self.connection = connection_loader.get(play_context.connection, play_context, sys.stdin)
- self.connection._connect()
-
- if not self.connection.connected:
- raise AnsibleConnectionFailure('unable to connect to remote host %s' % self._play_context.remote_addr)
+ self.fd = fd
+ self.exception = None
- connection_time = datetime.datetime.now() - self._start_time
- display.display('connection established to %s in %s' % (play_context.remote_addr, connection_time), log_only=True)
+ self.srv = JsonRpcServer()
+ self.sock = None
- self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.socket.bind(self.socket_path)
- self.socket.listen(1)
- display.display('local socket is set to listening', log_only=True)
+ def start(self):
+ try:
+ messages = list()
+ result = {}
+
+ messages.append('control socket path is %s' % self.socket_path)
+
+ # If this is a relative path (~ gets expanded later) then plug the
+ # key's path on to the directory we originally came from, so we can
+ # find it now that our cwd is /
+ if self.play_context.private_key_file and self.play_context.private_key_file[0] not in '~/':
+ self.play_context.private_key_file = os.path.join(self.original_path, self.play_context.private_key_file)
+
+ self.connection = connection_loader.get(self.play_context.connection, self.play_context, '/dev/null')
+ self.connection._connect()
+ self.srv.register(self.connection)
+ messages.append('connection to remote device started successfully')
+
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.sock.bind(self.socket_path)
+ self.sock.listen(1)
+ messages.append('local domain socket listeners started successfully')
+ except Exception as exc:
+ result['error'] = to_text(exc)
+ result['exception'] = traceback.format_exc()
+ finally:
+ result['messages'] = messages
+ self.fd.write(json.dumps(result))
+ self.fd.close()
def run(self):
try:
@@ -129,53 +89,36 @@ class Server():
signal.signal(signal.SIGTERM, self.handler)
signal.alarm(C.PERSISTENT_CONNECT_TIMEOUT)
- (s, addr) = self.socket.accept()
- display.display('incoming request accepted on persistent socket', log_only=True)
+ self.exception = None
+ (s, addr) = self.sock.accept()
signal.alarm(0)
+ signal.signal(signal.SIGALRM, self.command_timeout)
while True:
data = recv_data(s)
if not data:
break
- signal.signal(signal.SIGALRM, self.command_timeout)
- signal.alarm(self.play_context.timeout)
-
- op = to_text(data.split(b':')[0])
- display.display('socket operation is %s' % op, log_only=True)
-
- method = getattr(self, 'do_%s' % op, None)
-
- rc = 255
- stdout = stderr = ''
-
- if not method:
- stderr = 'Invalid action specified'
- else:
- rc, stdout, stderr = method(data)
-
+ signal.alarm(self.connection._play_context.timeout)
+ resp = self.srv.handle_request(data)
signal.alarm(0)
- display.display('socket operation completed with rc %s' % rc, log_only=True)
-
- send_data(s, to_bytes(rc))
- send_data(s, to_bytes(stdout))
- send_data(s, to_bytes(stderr))
+ send_data(s, to_bytes(resp))
s.close()
except Exception as e:
# socket.accept() will raise EINTR if the socket.close() is called
- if e.errno != errno.EINTR:
- display.display(traceback.format_exc(), log_only=True)
+ if hasattr(e, 'errno'):
+ if e.errno != errno.EINTR:
+ self.exception = traceback.format_exc()
+ else:
+ self.exception = traceback.format_exc()
finally:
# when done, close the connection properly and cleanup
# the socket file so it can be recreated
self.shutdown()
- end_time = datetime.datetime.now()
- delta = end_time - self._start_time
- display.display('shutdown local socket, connection was active for %s secs' % delta, log_only=True)
def connect_timeout(self, signum, frame):
display.display('persistent connection idle timeout triggered, timeout value is %s secs' % C.PERSISTENT_CONNECT_TIMEOUT, log_only=True)
@@ -190,25 +133,25 @@ class Server():
self.shutdown()
def shutdown(self):
- display.display('shutdown persistent connection requested', log_only=True)
-
+ """ Shuts down the local domain socket
+ """
if not os.path.exists(self.socket_path):
- display.display('persistent connection is not active', log_only=True)
return
try:
- if self.socket:
- display.display('closing local listener', log_only=True)
- self.socket.close()
+ if self.sock:
+ self.sock.close()
if self.connection:
- display.display('closing the connection', log_only=True)
self.connection.close()
+
except:
pass
+
finally:
if os.path.exists(self.socket_path):
- display.display('removing the local control socket', log_only=True)
os.remove(self.socket_path)
+ setattr(self.connection, '_socket_path', None)
+ setattr(self.connection, '_connected', False)
display.display('shutdown complete', log_only=True)
@@ -262,6 +205,13 @@ def communicate(sock, data):
def main():
+ """ Called to initiate the connect to the remote device
+ """
+ rc = 0
+ result = {}
+ messages = list()
+ socket_path = None
+
# Need stdin as a byte stream
if PY3:
stdin = sys.stdin.buffer
@@ -270,116 +220,91 @@ def main():
try:
# read the play context data via stdin, which means depickling it
- # FIXME: as noted above, we will probably need to deserialize the
- # connection loader here as well at some point, otherwise this
- # won't find role- or playbook-based connection plugins
cur_line = stdin.readline()
init_data = b''
+
while cur_line.strip() != b'#END_INIT#':
if cur_line == b'':
raise Exception("EOF found before init data was complete")
init_data += cur_line
cur_line = stdin.readline()
+
if PY3:
pc_data = cPickle.loads(init_data, encoding='bytes')
else:
pc_data = cPickle.loads(init_data)
- pc = PlayContext()
- pc.deserialize(pc_data)
+ play_context = PlayContext()
+ play_context.deserialize(pc_data)
except Exception as e:
- # FIXME: better error message/handling/logging
- sys.stderr.write(traceback.format_exc())
- sys.exit("FAIL: %s" % e)
-
- ssh = connection_loader.get('ssh', class_only=True)
- cp = ssh._create_control_path(pc.remote_addr, pc.port, pc.remote_user, pc.connection)
-
- # create the persistent connection dir if need be and create the paths
- # which we will be using later
- tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
- makedirs_safe(tmp_path)
- lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path)
- socket_path = unfrackpath(cp % dict(directory=tmp_path))
-
- # if the socket file doesn't exist, spin up the daemon process
- lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o600)
- fcntl.lockf(lock_fd, fcntl.LOCK_EX)
-
- if not os.path.exists(socket_path):
- pid = do_fork()
- if pid == 0:
- rc = 0
- try:
- server = Server(socket_path, pc)
- except AnsibleConnectionFailure as exc:
- display.display('connecting to host %s returned an error' % pc.remote_addr, log_only=True)
- display.display(str(exc), log_only=True)
- rc = 1
- except Exception as exc:
- display.display('failed to create control socket for host %s' % pc.remote_addr, log_only=True)
- display.display(traceback.format_exc(), log_only=True)
- rc = 1
- fcntl.lockf(lock_fd, fcntl.LOCK_UN)
- os.close(lock_fd)
- if rc == 0:
- server.run()
- sys.exit(rc)
- else:
- display.display('re-using existing socket for %s@%s:%s' % (pc.remote_user, pc.remote_addr, pc.port), log_only=True)
-
- fcntl.lockf(lock_fd, fcntl.LOCK_UN)
- os.close(lock_fd)
-
- timeout = pc.timeout
- while bool(timeout):
- if os.path.exists(socket_path):
- display.vvvv('connected to local socket in %s' % (pc.timeout - timeout), pc.remote_addr)
- break
- time.sleep(1)
- timeout -= 1
- else:
- raise AnsibleConnectionFailure('timeout waiting for local socket', pc.remote_addr)
-
- # now connect to the daemon process
- # FIXME: if the socket file existed but the daemonized process was killed,
- # the connection will timeout here. Need to make this more resilient.
- while True:
- data = stdin.readline()
- if data == b'':
- break
- if data.strip() == b'':
- continue
-
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-
- connect_retry_timeout = C.PERSISTENT_CONNECT_RETRY_TIMEOUT
- while bool(connect_retry_timeout):
- try:
- sock.connect(socket_path)
- break
- except socket.error:
- time.sleep(1)
- connect_retry_timeout -= 1
- else:
- display.display('connect retry timeout expired, unable to connect to control socket', pc.remote_addr, pc.remote_user, log_only=True)
- display.display('persistent_connect_retry_timeout is %s secs' % (C.PERSISTENT_CONNECT_RETRY_TIMEOUT), pc.remote_addr, pc.remote_user, log_only=True)
- sys.stderr.write('failed to connect to control socket')
- sys.exit(255)
+ rc = 1
+ result.update({
+ 'error': to_text(e),
+ 'exception': traceback.format_exc()
+ })
+
+ if rc == 0:
+ ssh = connection_loader.get('ssh', class_only=True)
+ cp = ssh._create_control_path(play_context.remote_addr, play_context.port, play_context.remote_user, play_context.connection)
+
+ # create the persistent connection dir if need be and create the paths
+ # which we will be using later
+ tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
+ makedirs_safe(tmp_path)
+
+ lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path)
+ socket_path = unfrackpath(cp % dict(directory=tmp_path))
+
+ # if the socket file doesn't exist, spin up the daemon process
+ lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o600)
+ fcntl.lockf(lock_fd, fcntl.LOCK_EX)
+
+ if not os.path.exists(socket_path):
+ messages.append('local domain socket does not exist, starting it')
+ original_path = os.getcwd()
+ r, w = os.pipe()
+ pid = fork_process()
+
+ if pid == 0:
+ try:
+ os.close(r)
+ wfd = os.fdopen(w, 'w')
+ process = ConnectionProcess(wfd, play_context, socket_path, original_path)
+ process.start()
+ except Exception as exc:
+ messages.append(traceback.format_exc())
+ rc = 1
+
+ fcntl.lockf(lock_fd, fcntl.LOCK_UN)
+ os.close(lock_fd)
+
+ if rc == 0:
+ process.run()
+
+ sys.exit(rc)
- # send the play_context back into the connection so the connection
- # can handle any privilege escalation activities
- pc_data = b'CONTEXT: %s' % init_data
- communicate(sock, pc_data)
+ else:
+ os.close(w)
+ rfd = os.fdopen(r, 'r')
+ data = json.loads(rfd.read())
+ messages.extend(data.pop('messages'))
+ result.update(data)
- rc, stdout, stderr = communicate(sock, data.strip())
+ else:
+ messages.append('found existing local domain socket, using it!')
- sys.stdout.write(to_native(stdout))
- sys.stderr.write(to_native(stderr))
+ result.update({
+ 'messages': messages,
+ 'socket_path': socket_path
+ })
- sock.close()
- break
+ if 'exception' in result:
+ rc = 1
+ sys.stderr.write(json.dumps(result))
+ else:
+ rc = 0
+ sys.stdout.write(json.dumps(result))
sys.exit(rc)