#!/usr/bin/python
#
# Copyright (C) 2014 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import sys, os, yaml
from optparse import OptionParser
from sys import stderr, stdout, exit, argv
from smtplib import SMTP
from email.utils import formatdate
from socket import gethostname
import tempfile
import subprocess
verbose = False
def utter(msg):
if verbose:
stdout.write(msg + "\n")
stdout.flush()
class html_output:
html_head = """
Trove Health Monitor status for %s
Trove Heath Monitor
Tests begin at %s
Test name |
Description |
Threshold |
Current value |
Result |
""" % (gethostname(), formatdate())
def __init__(self, filename):
self.filename = filename
self.tempfile = tempfile.NamedTemporaryFile(prefix = filename, delete = False)
self.fh = self.tempfile.file
utter("HTML temporary filename is %s" % self.tempfile.name)
fh = self.fh
fh.write(self.html_head)
def result(self, testname, testparam, result, success, msg):
fh = self.fh
fh.write("%s | %s | " % (testname, testparam['name']))
if 'threshold' in testparam:
fh.write("%s " % str(testparam['threshold']))
fh.write(testparam['type'] if 'test' in testparam else "high")
else:
fh.write(" | Pass/Fail")
fh.write(" | ")
fh.write("%s | " % str(result))
fh.write("%s | " %
(
"green" if success else "red",
"OK" if success else "FAILURE
%s" % msg
))
fh.write("
")
def finish(self):
fh = self.fh
fh.write("
")
fh.close()
os.rename(self.tempfile.name, self.filename)
class email_output:
body = "Subject: Trove Overseer failure report from %s\n" % (gethostname())
failures = False
def __init__(self, config):
self.config = config
self.body += "From: %s\nTo: %s\n" % (config['mailfrom'], config['rcptto'])
self.body += "Date: %s\n\n" % formatdate()
def result(self, testname, testparam, result, success, msg):
if success:
return
self.failures = True
self.body += ("\nTest %s (%s) failed:" % (testparam['name'], testname))
if 'threshold' in testparam:
self.body += ("\n\tThreshold: %s ") % str(testparam['threshold'])
self.body += testparam['type'] if 'type' in testparam else "high"
self.body += ("\n\tResult: %s" % str(result))
if msg:
self.body += ("\n\t%s") % msg
self.body += "\n\n"
def finish(self):
if self.failures == False:
return
config = self.config
utter("SMTP server: %s" % config['server'])
conn = SMTP(host = config['server'])
if verbose:
conn.set_debuglevel(256)
conn.sendmail(config['mailfrom'], config['rcptto'], self.body)
conn.quit()
def human_to_bytes(threshold):
if type(threshold) == int or type(threshold) == float:
return threshold
suffix = threshold[-1:].upper()
number = float(threshold[:-1])
if suffix == "K":
number *= 1024
elif suffix == "M":
number *= (1024 * 1024)
elif suffix == "G":
number *= (1024 * 1024 * 1024)
elif suffix == "T":
number *= (1024 * 1024 * 1024 * 1024)
else:
utter("Unknown power suffix %s!" % suffix)
return float(threshold)
return number
def bytes_to_human(num):
for x in ['','K','M','G','T']:
if num < 1024.0:
return "%3.0f%s" % (num, x)
num /= 1024.0
def report_failure(testname, testparam, msg, result = 0):
utter("Test '%s' %s failed: %s" % (testname, testparam, msg))
result = bytes_to_human(result)
if html_report:
html_report.result(testname, testparam, result, False, msg)
if email_report:
email_report.result(testname, testparam, result, False, msg)
def run_command(command):
p = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
return p
def run_test(testname, testparam):
utter("processing test '%s'" % testname)
if 'path' in testparam:
test_cmd = testparam["path"]
else:
test_cmd = "/usr/share/trove-overseer/plugins/" + testname
if 'parameters' in testparam:
test_cmd = test_cmd + (" %s" % testparam['parameters'])
pipe = run_command(test_cmd)
result = pipe.stdout.readline()
exitcode = pipe.wait()
if exitcode != 0:
result += pipe.stdout.read()
report_failure(testname, testparam,
"Test failed with non-zero result: %d, %s" %
(exitcode, result), result = exitcode)
return
if not 'threshold' in testparam:
# this is a pass/fail test: assume success
if html_report:
html_report.result(testname, testparam, 1, True, "")
return
result = float(result)
hightest = True
if 'type' in testparam and testparam['type'] == "low":
hightest = False
threshold = human_to_bytes(testparam['threshold'])
utter("test returned %d, threshold is %d\n" % (result, threshold))
if hightest and result > threshold:
report_failure(testname, testparam,
"Result above threshold of %s" % testparam['threshold'],
result = result)
return
if not hightest and result < threshold:
report_failure(testname, testparam,
"Result below threshold of %s" % testparam['threshold'],
result = result)
return
if html_report:
html_report.result(testname, testparam, result, True, "")
def main():
if os.getenv("OVERSEER_DEBUG") != None:
global verbose
verbose = True
if len(argv) > 1:
webpage = argv[1]
else:
webpage = None
if os.system("git pull -q") != 0:
stderr.write("Unable to execute git pull\n")
exit(1)
try:
config_file = open("overseer.yaml", "r")
except IOError as e:
stderr.write("Unable to open overseer.yaml: %s\n" % e.strerror)
exit(1)
try:
config_data = yaml.load(config_file)
except yaml.scanner.ScannerError as e:
stderr.write("YAML scanner error: %s\n" % e)
exit(1)
except yaml.parser.ParserError as e:
stderr.write("YAML parser error: %s\n" % e)
exit(1)
if not 'overseer' in config_data:
stderr.write("Configuration contains no overseer data.\n")
exit(1)
global html_report
if webpage != None:
html_report = html_output(webpage)
else:
html_report = False
global email_report
if 'email' in config_data['overseer']:
utter("Creating email report")
email_report = email_output(config_data['overseer']['email'])
else:
utter("Skipping email report")
email_report = False
for test in config_data['overseer']['tests']:
(testname, testparam) = test.iteritems().next()
run_test(testname, testparam)
if html_report:
html_report.finish()
if email_report:
email_report.finish()
if __name__ == '__main__':
main()