#!/usr/bin/python # # Copyright (C) 2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import sys, os, yaml from optparse import OptionParser from sys import stderr, stdout, exit, argv from smtplib import SMTP from email.utils import formatdate from socket import gethostname import tempfile import subprocess verbose = False def utter(msg): if verbose: stdout.write(msg + "\n") stdout.flush() class html_output: html_head = """ Trove Health Monitor status for %s

Trove Heath Monitor

Tests begin at %s

""" % (gethostname(), formatdate()) def __init__(self, filename): self.filename = filename self.tempfile = tempfile.NamedTemporaryFile(prefix = filename, delete = False) self.fh = self.tempfile.file utter("HTML temporary filename is %s" % self.tempfile.name) fh = self.fh fh.write(self.html_head) def result(self, testname, testparam, result, success, msg): fh = self.fh fh.write("" % (testname, testparam['name'])) if 'threshold' in testparam: fh.write("") fh.write("" % str(result)) fh.write("" % ( "green" if success else "red", "OK" if success else "FAILURE
%s" % msg )) fh.write("") def finish(self): fh = self.fh fh.write("
Test name Description Threshold Current value Result
%s%s%s " % str(testparam['threshold'])) fh.write(testparam['type'] if 'test' in testparam else "high") else: fh.write("Pass/Fail") fh.write("%s%s
") fh.close() os.rename(self.tempfile.name, self.filename) class email_output: body = "Subject: Trove Overseer failure report from %s\n" % (gethostname()) failures = False def __init__(self, config): self.config = config self.body += "From: %s\nTo: %s\n" % (config['mailfrom'], config['rcptto']) self.body += "Date: %s\n\n" % formatdate() def result(self, testname, testparam, result, success, msg): if success: return self.failures = True self.body += ("\nTest %s (%s) failed:" % (testparam['name'], testname)) if 'threshold' in testparam: self.body += ("\n\tThreshold: %s ") % str(testparam['threshold']) self.body += testparam['type'] if 'type' in testparam else "high" self.body += ("\n\tResult: %s" % str(result)) if msg: self.body += ("\n\t%s") % msg self.body += "\n\n" def finish(self): if self.failures == False: return config = self.config utter("SMTP server: %s" % config['server']) conn = SMTP(host = config['server']) if verbose: conn.set_debuglevel(256) conn.sendmail(config['mailfrom'], config['rcptto'], self.body) conn.quit() def human_to_bytes(threshold): if type(threshold) == int or type(threshold) == float: return threshold suffix = threshold[-1:].upper() number = float(threshold[:-1]) if suffix == "K": number *= 1024 elif suffix == "M": number *= (1024 * 1024) elif suffix == "G": number *= (1024 * 1024 * 1024) elif suffix == "T": number *= (1024 * 1024 * 1024 * 1024) else: utter("Unknown power suffix %s!" % suffix) return float(threshold) return number def bytes_to_human(num): for x in ['','K','M','G','T']: if num < 1024.0: return "%3.0f%s" % (num, x) num /= 1024.0 def report_failure(testname, testparam, msg, result = 0): utter("Test '%s' %s failed: %s" % (testname, testparam, msg)) result = bytes_to_human(result) if html_report: html_report.result(testname, testparam, result, False, msg) if email_report: email_report.result(testname, testparam, result, False, msg) def run_command(command): p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) return p def run_test(testname, testparam): utter("processing test '%s'" % testname) if 'path' in testparam: test_cmd = testparam["path"] else: test_cmd = "/usr/share/trove-overseer/plugins/" + testname if 'parameters' in testparam: test_cmd = test_cmd + (" %s" % testparam['parameters']) pipe = run_command(test_cmd) result = pipe.stdout.readline() exitcode = pipe.wait() if exitcode != 0: result += pipe.stdout.read() report_failure(testname, testparam, "Test failed with non-zero result: %d, %s" % (exitcode, result), result = exitcode) return if not 'threshold' in testparam: # this is a pass/fail test: assume success if html_report: html_report.result(testname, testparam, 1, True, "") return result = float(result) hightest = True if 'type' in testparam and testparam['type'] == "low": hightest = False threshold = human_to_bytes(testparam['threshold']) utter("test returned %d, threshold is %d\n" % (result, threshold)) if hightest and result > threshold: report_failure(testname, testparam, "Result above threshold of %s" % testparam['threshold'], result = result) return if not hightest and result < threshold: report_failure(testname, testparam, "Result below threshold of %s" % testparam['threshold'], result = result) return if html_report: html_report.result(testname, testparam, result, True, "") def main(): if os.getenv("OVERSEER_DEBUG") != None: global verbose verbose = True if len(argv) > 1: webpage = argv[1] else: webpage = None if os.system("git pull -q") != 0: stderr.write("Unable to execute git pull\n") exit(1) try: config_file = open("overseer.yaml", "r") except IOError as e: stderr.write("Unable to open overseer.yaml: %s\n" % e.strerror) exit(1) try: config_data = yaml.load(config_file) except yaml.scanner.ScannerError as e: stderr.write("YAML scanner error: %s\n" % e) exit(1) except yaml.parser.ParserError as e: stderr.write("YAML parser error: %s\n" % e) exit(1) if not 'overseer' in config_data: stderr.write("Configuration contains no overseer data.\n") exit(1) global html_report if webpage != None: html_report = html_output(webpage) else: html_report = False global email_report if 'email' in config_data['overseer']: utter("Creating email report") email_report = email_output(config_data['overseer']['email']) else: utter("Skipping email report") email_report = False for test in config_data['overseer']['tests']: (testname, testparam) = test.iteritems().next() run_test(testname, testparam) if html_report: html_report.finish() if email_report: email_report.finish() if __name__ == '__main__': main()