From 26ec2ecfce96cb3e7d8cb6c6b7db5b7fd9e55bfb Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Wed, 26 Oct 2016 12:38:08 -0500 Subject: Adding a persistent connection utility --- bin/ansible-connection | 302 +++++++++++++++++++++++++++++++++++++++++++++++++ bin/test-output | 27 +++++ 2 files changed, 329 insertions(+) create mode 100755 bin/ansible-connection create mode 100755 bin/test-output (limited to 'bin') diff --git a/bin/ansible-connection b/bin/ansible-connection new file mode 100755 index 0000000000..293c6469db --- /dev/null +++ b/bin/ansible-connection @@ -0,0 +1,302 @@ +#!/usr/bin/env python + +# (c) 2016, Ansible, Inc. +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +######################################################## +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +__requires__ = ['ansible'] +try: + import pkg_resources +except Exception: + pass + +import fcntl +import hashlib +import os +import shlex +import signal +import socket +import struct +import sys +import time +import traceback + +#import q + +from io import BytesIO + +from ansible import constants as C +from ansible.module_utils._text import to_bytes, to_native +from ansible.module_utils.six.moves import cPickle, StringIO +from ansible.playbook.play_context import PlayContext +from ansible.plugins import connection_loader +from ansible.utils.path import unfrackpath, makedirs_safe + +def do_fork(): + ''' + Does the required double fork for a daemon process. Based on + http://code.activestate.com/recipes/66012-fork-a-daemon-process-on-unix/ + ''' + try: + pid = os.fork() + if pid > 0: + return pid + + os.chdir("/") + os.setsid() + os.umask(0) + + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + + os.close(sys.stdin.fileno()) + os.close(sys.stdout.fileno()) + os.close(sys.stderr.fileno()) + + return pid + except OSError as e: + sys.exit(1) + except OSError as e: + sys.exit(1) + +def send_data(s, data): + packed_len = struct.pack('!Q',len(data)) + return s.sendall(packed_len + data) + +def recv_data(s): + header_len = 8 # size of a packed unsigned long long + data = b"" + while len(data) < header_len: + d = s.recv(header_len - len(data)) + if not d: + return None + data += d + data_len = struct.unpack('!Q',data[:header_len])[0] + data = data[header_len:] + while len(data) < data_len: + d = s.recv(data_len - len(data)) + if not d: + return None + data += d + return data + +class Server(): + def __init__(self, path, play_context): + self.path = path + self.play_context = play_context + + # FIXME: the connection loader here is created brand new, + # so it will not see any custom paths loaded (ie. via + # roles), so we will need to serialize the connection + # loader and send it over as we do the PlayContext + # in the main() method below. + self.conn = connection_loader.get(play_context.connection, play_context, sys.stdin) + self.conn._connect() + + #q.q("done setting up connection and connected") + + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket.bind(path) + self.socket.listen(1) + + signal.signal(signal.SIGALRM, self.alarm_handler) + + def alarm_handler(self, signum, frame): + ''' + Alarm handler + ''' + # FIXME: this should also set internal flags for other + # areas of code to check, so they can terminate + # earlier than the socket going back to the accept + # call and failing there. + self.socket.close() + + def run(self): + try: + while True: + # set the alarm, if we don't get an accept before it + # goes off we exit (via an exception caused by the socket + # getting closed while waiting on accept()) + # FIXME: is this the best way to exit? as noted above in the + # handler we should probably be setting a flag to check + # here and in other parts of the code + signal.alarm(C.PERSISTENT_CONNECT_TIMEOUT) + try: + (s, addr) = self.socket.accept() + # clear the alarm + # FIXME: potential race condition here between the accept and + # time to this call. + signal.alarm(0) + except: + break + + while True: + data = recv_data(s) + if not data: + break + + rc = 255 + try: + if data.startswith(b'EXEC: '): + cmd = data.split(b'EXEC: ')[1] + (rc, stdout, stderr) = self.conn.exec_command(cmd) + elif data.startswith(b'PUT: ') or data.startswith(b'FETCH: '): + (op, src, dst) = shlex.split(to_native(data)) + stdout = stderr = '' + try: + if op == 'FETCH:': + self.conn.fetch_file(src, dst) + elif op == 'PUT:': + self.conn.put_file(src, dst) + rc = 0 + except: + pass + else: + stdout = '' + stderr = 'Invalid action specified' + except: + stdout = '' + stderr = traceback.format_exc() + + send_data(s, to_bytes(str(rc))) + send_data(s, to_bytes(stdout)) + send_data(s, to_bytes(stderr)) + s.close() + except Exception as e: + # FIXME: proper logging and error handling here + print("run exception: %s" % e) + print(traceback.format_exc()) + finally: + # when done, close the connection properly and cleanup + # the socket file so it can be recreated + try: + self.conn.close() + except Exception as e: + pass + os.remove(self.path) + +def main(): + try: + # read the play context data via stdin, which means depickling it + # FIXME: as noted above, we will probably need to deserialize the + # connection loader here as well at some point, otherwise this + # won't find role- or playbook-based connection plugins + cur_line = sys.stdin.readline() + init_data = '' + while cur_line.strip() != '#END_INIT#': + if cur_line == '': + raise Exception("EOL found before init data was complete") + init_data += cur_line + cur_line = sys.stdin.readline() + src = BytesIO(to_bytes(init_data)) + pc_data = cPickle.load(src) + src.close() + + pc = PlayContext() + pc.deserialize(pc_data) + except Exception as e: + # FIXME: better error message/handling/logging + print("FAIL: %s" % e) + print(traceback.format_exc()) + sys.exit(1) + #q.q("done reading in and parsing PlayContext") + + # here we create a hash to use later when creating the socket file, + # so we can hide the info about the target host/user/etc. + m = hashlib.sha256() + for attr in ('connection', 'remote_addr', 'port', 'remote_user'): + val = getattr(pc, attr, None) + if val: + m.update(to_bytes(val)) + + # create the persistent connection dir if need be and create the paths + # which we will be using later + tmp_path = unfrackpath("$HOME/.ansible/pc") + makedirs_safe(tmp_path) + lk_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path) + sf_path = unfrackpath("%s/conn-%s" % (tmp_path, m.hexdigest()[0:12])) + + # if the socket file doesn't exist, spin up the daemon process + lock_fd = os.open(lk_path, os.O_RDWR|os.O_CREAT, 0o600) + fcntl.lockf(lock_fd, fcntl.LOCK_EX) + if not os.path.exists(sf_path): + #q.q("creating daemonized connection fork") + pid = do_fork() + if pid == 0: + server = Server(sf_path, pc) + fcntl.lockf(lock_fd, fcntl.LOCK_UN) + os.close(lock_fd) + #q.q("fork done, running server") + server.run() + #q.q("server run complete, exiting") + sys.exit(0) + fcntl.lockf(lock_fd, fcntl.LOCK_UN) + os.close(lock_fd) + + # now connect to the daemon process + # FIXME: if the socket file existed but the daemonized process was killed, + # the connection will timeout here. Need to make this more resilient. + rc = 0 + while rc == 0: + #q.q("waiting for input") + data = sys.stdin.readline() + if data == '': + #q.q("data was empty, aborting") + break + if data.strip() == '': + #q.q("data was empty line, skipping") + continue + sf = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + attempts = 1 + while True: + try: + sf.connect(sf_path) + break + except socket.error: + # FIXME: better error handling/logging/message here + # FIXME: make # of retries configurable? + time.sleep(0.1) + attempts += 1 + if attempts > 10: + sys.stderr.write("failed to connect to the host, connection timeout\n") + sys.exit(255) + + #q.q("sending data to pipe") + send_data(sf, to_bytes(data.strip())) + #q.q("getting data back") + rc = int(recv_data(sf), 10) + #q.q(rc) + stdout = recv_data(sf) + #q.q(stdout) + stderr = recv_data(sf) + #q.q(stderr) + sys.stdout.write(to_native(stdout)) + sys.stderr.write(to_native(stderr)) + #sys.stdout.flush() + #sys.stderr.flush() + + sf.close() + break + sys.exit(rc) + +if __name__ == '__main__': + main() diff --git a/bin/test-output b/bin/test-output new file mode 100755 index 0000000000..0a86ad5208 --- /dev/null +++ b/bin/test-output @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +import cPickle +import sys +from cStringIO import StringIO +from ansible.playbook.play_context import PlayContext + +p = PlayContext() +p.connection = 'paramiko_ssh' +p.remote_addr = '192.168.122.100' +p.port = 22 +p.remote_user = 'root' +p.password = '' + +src = StringIO() +cPickle.dump(p.serialize(), src) +sys.stdout.write(src.getvalue()) +sys.stdout.write('\n#END_INIT#\n') +sys.stdout.flush() + +while True: + data = sys.stdin.readline() + if data == '': + break + if data.strip() != '': + sys.stdout.write(data) + sys.stdout.flush() -- cgit v1.2.1