From 6215922889c11b57eaa5bbd6f46c50d7a9d742a3 Mon Sep 17 00:00:00 2001 From: Ganesh Nalawade Date: Tue, 6 Jun 2017 13:56:25 +0530 Subject: Add support for cliconf and netconf plugin (#25093) * ansible-connection refactor and action plugin changes * Add cliconf plugin for eos, ios, iosxr, junos, nxos, vyos * Add netconf plugin for junos * Add jsonrpc support * Modify network_cli and netconf connection plugin * Fix py3 unit test failure * Fix review comment * Minor fixes * Fix ansible-connection review comments * Fix CI issue * platform_agnostic related changes --- bin/ansible-connection | 298 ++++++++++++++++++++++++++----------------------- 1 file changed, 156 insertions(+), 142 deletions(-) (limited to 'bin/ansible-connection') diff --git a/bin/ansible-connection b/bin/ansible-connection index c49354a30d..de118aecb8 100755 --- a/bin/ansible-connection +++ b/bin/ansible-connection @@ -1,6 +1,6 @@ #!/usr/bin/env python -# (c) 2016, Ansible, Inc. +# (c) 2017, Ansible, Inc. # # This file is part of Ansible # @@ -33,18 +33,17 @@ import os import shlex import signal import socket -import struct import sys import time import traceback -import syslog import datetime -import logging +import errno from ansible import constants as C from ansible.module_utils._text import to_bytes, to_native from ansible.module_utils.six import PY3 from ansible.module_utils.six.moves import cPickle +from ansible.module_utils.connection import send_data, recv_data from ansible.playbook.play_context import PlayContext from ansible.plugins import connection_loader from ansible.utils.path import unfrackpath, makedirs_safe @@ -88,33 +87,11 @@ def do_fork(): except OSError as e: sys.exit(1) -def send_data(s, data): - packed_len = struct.pack('!Q', len(data)) - return s.sendall(packed_len + data) - -def recv_data(s): - header_len = 8 # size of a packed unsigned long long - data = b"" - while len(data) < header_len: - d = s.recv(header_len - len(data)) - if not d: - return None - data += d - data_len = struct.unpack('!Q', data[:header_len])[0] - data = data[header_len:] - while len(data) < data_len: - d = s.recv(data_len - len(data)) - if not d: - return None - data += d - return data - class Server(): - def __init__(self, path, play_context): - - self.path = path + def __init__(self, socket_path, play_context): + self.socket_path = socket_path self.play_context = play_context display.display( @@ -123,135 +100,163 @@ class Server(): log_only=True ) - display.display('control socket path is %s' % path, log_only=True) + display.display('control socket path is %s' % socket_path, log_only=True) display.display('current working directory is %s' % os.getcwd(), log_only=True) self._start_time = datetime.datetime.now() display.display("using connection plugin %s" % self.play_context.connection, log_only=True) - self.conn = connection_loader.get(play_context.connection, play_context, sys.stdin) - self.conn._connect() - if not self.conn.connected: + self.connection = connection_loader.get(play_context.connection, play_context, sys.stdin) + self.connection._connect() + + if not self.connection.connected: raise AnsibleConnectionFailure('unable to connect to remote host %s' % self._play_context.remote_addr) connection_time = datetime.datetime.now() - self._start_time display.display('connection established to %s in %s' % (play_context.remote_addr, connection_time), log_only=True) self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.bind(path) + self.socket.bind(self.socket_path) self.socket.listen(1) - - signal.signal(signal.SIGALRM, self.alarm_handler) - - def dispatch(self, obj, name, *args, **kwargs): - meth = getattr(obj, name, None) - if meth: - return meth(*args, **kwargs) - - def alarm_handler(self, signum, frame): - ''' - Alarm handler - ''' - # FIXME: this should also set internal flags for other - # areas of code to check, so they can terminate - # earlier than the socket going back to the accept - # call and failing there. - # - # hooks the connection plugin to handle any cleanup - self.dispatch(self.conn, 'alarm_handler', signum, frame) - self.socket.close() + display.display('local socket is set to listening', log_only=True) def run(self): try: while True: - # set the alarm, if we don't get an accept before it - # goes off we exit (via an exception caused by the socket - # getting closed while waiting on accept()) - # FIXME: is this the best way to exit? as noted above in the - # handler we should probably be setting a flag to check - # here and in other parts of the code + signal.signal(signal.SIGALRM, self.connect_timeout) + signal.signal(signal.SIGTERM, self.handler) signal.alarm(C.PERSISTENT_CONNECT_TIMEOUT) - try: - (s, addr) = self.socket.accept() - display.display('incoming request accepted on persistent socket', log_only=True) - # clear the alarm - # FIXME: potential race condition here between the accept and - # time to this call. - signal.alarm(0) - except: - break + + (s, addr) = self.socket.accept() + display.display('incoming request accepted on persistent socket', log_only=True) + signal.alarm(0) while True: data = recv_data(s) if not data: break + signal.signal(signal.SIGALRM, self.command_timeout) signal.alarm(self.play_context.timeout) + op = data.split(':')[0] + display.display('socket operation is %s' % op, log_only=True) + + method = getattr(self, 'do_%s' % op, None) + rc = 255 - try: - if data.startswith(b'EXEC: '): - display.display("socket operation is EXEC", log_only=True) - cmd = data.split(b'EXEC: ')[1] - (rc, stdout, stderr) = self.conn.exec_command(cmd) - elif data.startswith(b'PUT: ') or data.startswith(b'FETCH: '): - (op, src, dst) = shlex.split(to_native(data)) - stdout = stderr = '' - try: - if op == 'FETCH:': - display.display("socket operation is FETCH", log_only=True) - self.conn.fetch_file(src, dst) - elif op == 'PUT:': - display.display("socket operation is PUT", log_only=True) - self.conn.put_file(src, dst) - rc = 0 - except: - pass - elif data.startswith(b'CONTEXT: '): - display.display("socket operation is CONTEXT", log_only=True) - pc_data = data.split(b'CONTEXT: ', 1)[1] - - if PY3: - pc_data = cPickle.loads(pc_data, encoding='bytes') - else: - pc_data = cPickle.loads(pc_data) - - pc = PlayContext() - pc.deserialize(pc_data) - - self.dispatch(self.conn, 'update_play_context', pc) - continue - else: - display.display("socket operation is UNKNOWN", log_only=True) - stdout = '' - stderr = 'Invalid action specified' - except: - stdout = '' - stderr = traceback.format_exc() + stdout = stderr = '' + + if not method: + stderr = 'Invalid action specified' + else: + rc, stdout, stderr = method(data) signal.alarm(0) - display.display("socket operation completed with rc %s" % rc, log_only=True) + display.display('socket operation completed with rc %s' % rc, log_only=True) send_data(s, to_bytes(rc)) send_data(s, to_bytes(stdout)) send_data(s, to_bytes(stderr)) + s.close() + except Exception as e: - display.display(traceback.format_exc(), log_only=True) + # socket.accept() will raise EINTR if the socket.close() is called + if e.errno != errno.EINTR: + display.display(traceback.format_exc(), log_only=True) + finally: # when done, close the connection properly and cleanup # the socket file so it can be recreated + self.shutdown() end_time = datetime.datetime.now() delta = end_time - self._start_time - display.display('shutting down control socket, connection was active for %s secs' % delta, log_only=True) - try: - self.conn.close() + display.display('shutdown local socket, connection was active for %s secs' % delta, log_only=True) + + def connect_timeout(self, signum, frame): + display.display('connect timeout triggered, timeout value is %s secs' % C.PERSISTENT_CONNECT_TIMEOUT, log_only=True) + self.shutdown() + + def command_timeout(self, signum, frame): + display.display('commnad timeout triggered, timeout value is %s secs' % self.play_context.timeout, log_only=True) + self.shutdown() + + def handler(self, signum, frame): + display.display('signal handler called with signal %s' % signum, log_only=True) + self.shutdown() + + def shutdown(self): + display.display('shutdown persistent connection requested', log_only=True) + + if not os.path.exists(self.socket_path): + display.display('persistent connection is not active', log_only=True) + return + + try: + if self.socket: + display.display('closing local listener', log_only=True) self.socket.close() - except Exception as e: - pass - os.remove(self.path) + if self.connection: + display.display('closing the connection', log_only=True) + self.close() + except: + pass + finally: + if os.path.exists(self.socket_path): + display.display('removing the local control socket', log_only=True) + os.remove(self.socket_path) + + display.display('shutdown complete', log_only=True) + + def do_EXEC(self, data): + cmd = data.split(b'EXEC: ')[1] + return self.connection.exec_command(cmd) + + def do_PUT(self, data): + (op, src, dst) = shlex.split(to_native(data)) + return self.connection.fetch_file(src, dst) + + def do_FETCH(self, data): + (op, src, dst) = shlex.split(to_native(data)) + return self.connection.put_file(src, dst) + + def do_CONTEXT(self, data): + pc_data = data.split(b'CONTEXT: ', 1)[1] + + if PY3: + pc_data = cPickle.loads(pc_data, encoding='bytes') + else: + pc_data = cPickle.loads(pc_data) + + pc = PlayContext() + pc.deserialize(pc_data) + + try: + self.connection.update_play_context(pc) + except AttributeError: + pass + + return (0, 'ok', '') + + def do_RUN(self, data): + timeout = self.play_context.timeout + while bool(timeout): + if os.path.exists(self.socket_path): + break + time.sleep(1) + timeout -= 1 + return (0, self.socket_path, '') + + +def communicate(sock, data): + send_data(sock, data) + rc = int(recv_data(sock), 10) + stdout = recv_data(sock) + stderr = recv_data(sock) + return (rc, stdout, stderr) def main(): # Need stdin as a byte stream @@ -279,30 +284,32 @@ def main(): pc = PlayContext() pc.deserialize(pc_data) + except Exception as e: # FIXME: better error message/handling/logging sys.stderr.write(traceback.format_exc()) sys.exit("FAIL: %s" % e) ssh = connection_loader.get('ssh', class_only=True) - m = ssh._create_control_path(pc.remote_addr, pc.port, pc.remote_user) + cp = ssh._create_control_path(pc.remote_addr, pc.connection, pc.remote_user) # create the persistent connection dir if need be and create the paths # which we will be using later - tmp_path = unfrackpath("$HOME/.ansible/pc") + tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR) makedirs_safe(tmp_path) - lk_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path) - sf_path = unfrackpath(m % dict(directory=tmp_path)) + lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path) + socket_path = unfrackpath(cp % dict(directory=tmp_path)) # if the socket file doesn't exist, spin up the daemon process - lock_fd = os.open(lk_path, os.O_RDWR|os.O_CREAT, 0o600) + lock_fd = os.open(lock_path, os.O_RDWR|os.O_CREAT, 0o600) fcntl.lockf(lock_fd, fcntl.LOCK_EX) - if not os.path.exists(sf_path): + + if not os.path.exists(socket_path): pid = do_fork() if pid == 0: rc = 0 try: - server = Server(sf_path, pc) + server = Server(socket_path, pc) except AnsibleConnectionFailure as exc: display.display('connecting to host %s returned an error' % pc.remote_addr, log_only=True) display.display(str(exc), log_only=True) @@ -318,50 +325,57 @@ def main(): sys.exit(rc) else: display.display('re-using existing socket for %s@%s:%s' % (pc.remote_user, pc.remote_addr, pc.port), log_only=True) + fcntl.lockf(lock_fd, fcntl.LOCK_UN) os.close(lock_fd) + timeout = pc.timeout + while bool(timeout): + if os.path.exists(socket_path): + display.vvvv('connected to local socket in %s' % (pc.timeout - timeout), pc.remote_addr) + break + time.sleep(1) + timeout -= 1 + else: + raise AnsibleConnectionFailure('timeout waiting for local socket', pc.remote_addr) + # now connect to the daemon process # FIXME: if the socket file existed but the daemonized process was killed, # the connection will timeout here. Need to make this more resilient. - rc = 0 - while rc == 0: + while True: data = stdin.readline() if data == b'': break if data.strip() == b'': continue - sf = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - attempts = 1 - while True: + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + attempts = C.PERSISTENT_CONNECT_RETRIES + while bool(attempts): try: - sf.connect(sf_path) + sock.connect(socket_path) break except socket.error: - # FIXME: better error handling/logging/message here time.sleep(C.PERSISTENT_CONNECT_INTERVAL) - attempts += 1 - if attempts > C.PERSISTENT_CONNECT_RETRIES: - display.display('number of connection attempts exceeded, unable to connect to control socket', pc.remote_addr, pc.remote_user, log_only=True) - display.display('persistent_connect_interval=%s, persistent_connect_retries=%s' % (C.PERSISTENT_CONNECT_INTERVAL, C.PERSISTENT_CONNECT_RETRIES), pc.remote_addr, pc.remote_user, log_only=True) - sys.stderr.write('failed to connect to control socket') - sys.exit(255) + attempts -= 1 + else: + display.display('number of connection attempts exceeded, unable to connect to control socket', pc.remote_addr, pc.remote_user, log_only=True) + display.display('persistent_connect_interval=%s, persistent_connect_retries=%s' % (C.PERSISTENT_CONNECT_INTERVAL, C.PERSISTENT_CONNECT_RETRIES), pc.remote_addr, pc.remote_user, log_only=True) + sys.stderr.write('failed to connect to control socket') + sys.exit(255) # send the play_context back into the connection so the connection # can handle any privilege escalation activities pc_data = b'CONTEXT: %s' % init_data - send_data(sf, pc_data) - - send_data(sf, data.strip()) + communicate(sock, pc_data) - rc = int(recv_data(sf), 10) - stdout = recv_data(sf) - stderr = recv_data(sf) + rc, stdout, stderr = communicate(sock, data.strip()) sys.stdout.write(to_native(stdout)) sys.stderr.write(to_native(stderr)) - sf.close() + sock.close() break sys.exit(rc) -- cgit v1.2.1