summaryrefslogtreecommitdiff
path: root/turbo_hipster
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2014-10-29 17:42:59 +1100
committerJoshua Hesketh <josh@nitrotech.org>2014-12-02 13:23:23 +1100
commitd5d7a21ed0b147f160bd3c15cfdf23ebb7eca38d (patch)
treeda915f72a2e9b8750b7d2e935622e745c333b8c7 /turbo_hipster
parent3e3deef7b81baf88cd1ebc744f5e7342ab97b72c (diff)
downloadturbo-hipster-d5d7a21ed0b147f160bd3c15cfdf23ebb7eca38d.tar.gz
Improve how jobs log
Improve the logging for jobs in turbo-hipster so that if something fails in a plugin (for example the db migration checks) it is logged and uploaded appropriately. This causes multiple logs per job. Change-Id: I01e73ea418defbc0c1abd3b2b4357a816ddf99c3
Diffstat (limited to 'turbo_hipster')
-rw-r--r--turbo_hipster/lib/models.py160
-rw-r--r--turbo_hipster/lib/utils.py24
-rw-r--r--turbo_hipster/task_plugins/real_db_upgrade/task.py2
3 files changed, 135 insertions, 51 deletions
diff --git a/turbo_hipster/lib/models.py b/turbo_hipster/lib/models.py
index 4da226b..3599766 100644
--- a/turbo_hipster/lib/models.py
+++ b/turbo_hipster/lib/models.py
@@ -26,7 +26,7 @@ from turbo_hipster.lib import utils
class Task(object):
""" A base object for running a job (aka Task) """
- log = logging.getLogger("lib.models.Task")
+ log = logging.getLogger("task")
def __init__(self, worker_server, plugin_config, job_name):
self.worker_server = worker_server
@@ -37,6 +37,12 @@ class Task(object):
# Define the number of steps we will do to determine our progress.
self.total_steps = 0
+ def _cleanup(self):
+ if self.log_handler:
+ self.log.removeHandler(self.log_handler)
+ self.log_handler.flush()
+ self.log_handler.close()
+
def _reset(self):
self.job = None
self.job_arguments = None
@@ -45,6 +51,36 @@ class Task(object):
self.success = True
self.messages = []
self.current_step = 0
+ self.log_handler = None
+
+ def _prep_working_dir(self):
+ self.job_identifier = utils.determine_job_identifier(
+ self.job_arguments,
+ self.plugin_config['function'],
+ self.job.unique
+ )
+ self.job_working_dir = os.path.join(
+ self.worker_server.config['jobs_working_dir'],
+ self.job_identifier
+ )
+ self.job_results_dir = os.path.join(
+ self.job_working_dir,
+ 'results'
+ )
+ self.task_output_log = os.path.join(
+ self.job_results_dir,
+ 'task_output.log'
+ )
+
+ if not os.path.isdir(os.path.dirname(self.task_output_log)):
+ os.makedirs(os.path.dirname(self.task_output_log))
+
+ def _setup_task_logging(self):
+ self.log_handler = logging.FileHandler(self.task_output_log)
+ log_formatter = logging.Formatter('%(asctime)s %(message)s')
+ self.log_handler.setFormatter(log_formatter)
+ self.log.addHandler(self.log_handler)
+ self.log.setLevel(logging.DEBUG)
def start_job(self, job):
self._reset()
@@ -59,16 +95,54 @@ class Task(object):
# Send an initial WORK_DATA and WORK_STATUS packets
self._send_work_data()
+ # Prep working dirs
+ self._prep_working_dir()
+
+ # Now we have working dirs we can log the job details to a file
+ self._setup_task_logging()
+
+ except Exception as e:
+ # If something failed during this section we have been unable
+ # to log to file. As such raise an exception to gearman
+ self.log.exception("Failure during setup")
+ self.log.exception(e)
+ if not self.cancelled:
+ self.success = False
+ self.messages.append('FAILURE during the job setup')
+ self.messages.append('Exception: %s' % e)
+ self._send_work_data()
+ self.job.sendWorkException(str(e).encode('utf-8'))
+ # No point trying the job, lets return here
+ return
+
+ # From here we can log exceptions
+ try:
# Execute the job_steps
self.do_job_steps()
+ except Exception as e:
+ # Log the problem
+ if not self.cancelled:
+ self.success = False
+ self.log.exception('Something failed running the job!')
+ self.messages.append('FAILURE running the job')
+ self.messages.append('Exception: %s' % e)
+ # Don't return from here as we can continue uploading the
+ # logs
+ try:
+ self._cleanup()
+ self._upload_results()
# Finally, send updated work data and completed packets
self._send_final_results()
-
except Exception as e:
- self.log.exception('Exception handling log event.')
+ # If something failed during this section we have been unable
+ # to upload the log. As such raise an exception to gearman
+ self.log.exception("Failure during cleanup and upload")
+ self.log.exception(e)
if not self.cancelled:
self.success = False
+ self.messages.append('FAILURE during cleanup and log '
+ 'upload')
self.messages.append('Exception: %s' % e)
self._send_work_data()
self.job.sendWorkException(str(e).encode('utf-8'))
@@ -139,24 +213,43 @@ class Task(object):
self.current_step += 1
self.job.sendWorkStatus(self.current_step, self.total_steps)
+ def _upload_results(self):
+ """Upload the contents of the working dir either using the instructions
+ provided by zuul and/or our configuration"""
+
+ self.log.debug("Process the resulting files (upload/push)")
+
+ if 'publish_logs' in self.worker_server.config:
+ index_url = utils.push_file(
+ self.job_identifier, self.job_results_dir,
+ self.worker_server.config['publish_logs'])
+ self.log.debug("Index URL found at %s" % index_url)
+ self.work_data['url'] = index_url
+
+ if 'ZUUL_EXTRA_SWIFT_URL' in self.job_arguments:
+ # Upload to zuul's url as instructed
+ utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
+ self.work_data['url'] = self.job_identifier
+
class ShellTask(Task):
- log = logging.getLogger("lib.models.ShellTask")
+ log = logging.getLogger("task.shell_task")
def __init__(self, worker_server, plugin_config, job_name):
super(ShellTask, self).__init__(worker_server, plugin_config, job_name)
# Define the number of steps we will do to determine our progress.
- self.total_steps = 6
+ self.total_steps = 5
def _reset(self):
super(ShellTask, self)._reset()
self.git_path = None
self.job_working_dir = None
self.shell_output_log = None
+ self.git_prep_log = None
def do_job_steps(self):
- self.log.info('Step 1: Prep job working dir')
- self._prep_working_dir()
+ self.log.info('Step 1: Setup environment')
+ self._setup_environment()
self.log.info('Step 2: Checkout updates from git')
self._grab_patchset(self.job_arguments)
@@ -167,35 +260,29 @@ class ShellTask(Task):
self.log.info('Step 4: Analyse logs for errors')
self._parse_and_check_results()
- self.log.info('Step 5: handle the results (and upload etc)')
+ self.log.info('Step 5: handle the results')
self._handle_results()
self.log.info('Step 6: Handle extra actions such as shutting down')
self._handle_cleanup()
@common.task_step
- def _prep_working_dir(self):
- self.job_identifier = utils.determine_job_identifier(
- self.job_arguments,
- self.plugin_config['function'],
- self.job.unique
- )
- self.job_working_dir = os.path.join(
- self.worker_server.config['jobs_working_dir'],
- self.job_identifier
+ def _setup_environment(self):
+ self.git_prep_log = os.path.join(
+ self.job_results_dir,
+ 'git_prep.log'
)
self.shell_output_log = os.path.join(
- self.job_working_dir,
+ self.job_results_dir,
'shell_output.log'
)
- if not os.path.isdir(os.path.dirname(self.shell_output_log)):
- os.makedirs(os.path.dirname(self.shell_output_log))
-
@common.task_step
def _grab_patchset(self, job_args):
""" Checkout the reference into config['git_working_dir'] """
+ # TODO(jhesketh): Use the zuul cloner stuff instead :-)
+
self.log.debug("Grab the patchset we want to test against")
local_path = os.path.join(self.worker_server.config['git_working_dir'],
self.job_name, job_args['ZUUL_PROJECT'])
@@ -204,11 +291,13 @@ class ShellTask(Task):
git_args = copy.deepcopy(job_args)
- cmd = os.path.join(os.path.join(os.path.dirname(__file__),
- 'gerrit-git-prep.sh'))
+ cmd = os.path.join(
+ os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ 'gerrit-git-prep.sh')
+ )
cmd += ' ' + self.worker_server.config['zuul_server']['gerrit_site']
cmd += ' ' + self.worker_server.config['zuul_server']['git_origin']
- utils.execute_to_log(cmd, self.shell_output_log, env=git_args,
+ utils.execute_to_log(cmd, self.git_prep_log, env=git_args,
cwd=local_path)
self.git_path = local_path
return local_path
@@ -247,28 +336,13 @@ class ShellTask(Task):
'(%d)' % self.script_return_code)
@common.task_step
- def _handle_results(self):
- """Upload the contents of the working dir either using the instructions
- provided by zuul and/or our configuration"""
-
- self.log.debug("Process the resulting files (upload/push)")
-
- if 'publish_logs' in self.worker_server.config:
- index_url = utils.push_file(
- self.job_identifier, self.shell_output_log,
- self.worker_server.config['publish_logs'])
- self.log.debug("Index URL found at %s" % index_url)
- self.work_data['url'] = index_url
-
- if 'ZUUL_EXTRA_SWIFT_URL' in self.job_arguments:
- # Upload to zuul's url as instructed
- utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
- self.work_data['url'] = self.job_identifier
-
- @common.task_step
def _handle_cleanup(self):
"""Handle and cleanup functions. Shutdown if requested to so that no
further jobs are ran if the environment is dirty."""
if ('shutdown-th' in self.plugin_config and
self.plugin_config['shutdown-th']):
self.worker_server.shutdown_gracefully()
+
+ @common.task_step
+ def _handle_results(self):
+ pass
diff --git a/turbo_hipster/lib/utils.py b/turbo_hipster/lib/utils.py
index 5487765..aee31e7 100644
--- a/turbo_hipster/lib/utils.py
+++ b/turbo_hipster/lib/utils.py
@@ -232,17 +232,27 @@ def swift_push_file(results_set_name, file_path, swift_config):
def local_push_file(results_set_name, file_path, local_config):
""" Copy the file locally somewhere sensible """
- dest_dir = os.path.join(local_config['path'], results_set_name)
- dest_filename = os.path.basename(file_path)
- if not os.path.isdir(dest_dir):
- os.makedirs(dest_dir)
+ def _push_file_or_dir(results_set_name, file_path, local_config):
+ dest_dir = os.path.join(local_config['path'], results_set_name)
+ dest_filename = os.path.basename(file_path)
+ if not os.path.isdir(dest_dir):
+ os.makedirs(dest_dir)
+
+ dest_file = os.path.join(dest_dir, dest_filename)
- dest_file = os.path.join(dest_dir, dest_filename)
+ if os.path.isfile(file_path):
+ shutil.copyfile(file_path, dest_file)
+ elif os.path.isdir(file_path):
+ shutil.copytree(file_path, dest_file)
if os.path.isfile(file_path):
- shutil.copyfile(file_path, dest_file)
+ _push_file_or_dir(results_set_name, file_path, local_config)
elif os.path.isdir(file_path):
- shutil.copytree(file_path, dest_file)
+ for f in os.listdir(file_path):
+ f_path = os.path.join(file_path, f)
+ _push_file_or_dir(results_set_name, f_path, local_config)
+
+ dest_filename = os.path.basename(file_path)
return local_config['prepend_url'] + os.path.join(results_set_name,
dest_filename)
diff --git a/turbo_hipster/task_plugins/real_db_upgrade/task.py b/turbo_hipster/task_plugins/real_db_upgrade/task.py
index 252160a..635566c 100644
--- a/turbo_hipster/task_plugins/real_db_upgrade/task.py
+++ b/turbo_hipster/task_plugins/real_db_upgrade/task.py
@@ -38,7 +38,7 @@ class Runner(models.ShellTask):
It pulls in a gearman job from the build:real-db-upgrade
queue and runs it through _handle_patchset"""
- log = logging.getLogger("task_plugins.real_db_upgrade.task.Runner")
+ log = logging.getLogger("task.real_db_upgrade")
def __init__(self, worker_server, plugin_config, job_name):
super(Runner, self).__init__(worker_server, plugin_config, job_name)