summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/installation.rst11
-rw-r--r--tests/etc/config.yaml13
-rw-r--r--tests/etc/default-config.yaml9
-rw-r--r--tests/test_common.py8
-rw-r--r--tests/test_shell_task.py2
-rw-r--r--tests/test_worker_manager.py149
-rw-r--r--turbo_hipster/lib/models.py37
-rw-r--r--turbo_hipster/lib/utils.py6
-rw-r--r--turbo_hipster/task_plugins/real_db_upgrade/task.py11
-rw-r--r--turbo_hipster/worker_server.py77
10 files changed, 221 insertions, 102 deletions
diff --git a/doc/source/installation.rst b/doc/source/installation.rst
index eb01021..e748d3e 100644
--- a/doc/source/installation.rst
+++ b/doc/source/installation.rst
@@ -56,7 +56,16 @@ for your environment::
**pip_download_cache**
Some of turbo-hipsters task plugins download requirements
for projects. This is the cache directory used by pip.
- **plugins**
+ **jobs**
+ A list of registered jobs.
+ **name**
+ The name of the job to register. This is the function name
+ for zuul's job. eg build:some_job.
+ **plugin** (optional)
+ The plugin to use. Defaults to shell_task.
+ Any other variables the plugin may require for the job.
+ **plugins** (depreciated)
+ This is depreciated in favour of jobs (above).
A list of enabled plugins and their settings in a dictionary.
The only required parameters are *name*, which should be the
same as the folder containing the plugin module, and
diff --git a/tests/etc/config.yaml b/tests/etc/config.yaml
index a80f6f5..463405e 100644
--- a/tests/etc/config.yaml
+++ b/tests/etc/config.yaml
@@ -8,10 +8,19 @@ debug_log: /home/josh/var/log/turbo-hipster/debug.log
jobs_working_dir: /home/josh/var/lib/turbo-hipster/jobs
git_working_dir: /home/josh/var/lib/turbo-hipster/git
pip_download_cache: /home/josh/var/cache/pip
+
+jobs:
+ - name: build:real-db-upgrade_nova_mysql
+ datasets_dir: /home/josh/var/lib/turbo-hipster/datasets
+ plugin: real_db_upgrade
+ - name: build:some_shell_job
+ shell_script: /dev/null
+
+# Legacy job definition as plugins
plugins:
- name: real_db_upgrade
- datasets_dir": /home/josh/var/lib/turbo-hipster/datasets
- job: real-db-upgrade_nova_mysql
+ datasets_dir: /var/lib/turbo-hipster/datasets_user_001
+ function: build:real-db-upgrade_nova_mysql_user_001
publish_logs:
type: local
diff --git a/tests/etc/default-config.yaml b/tests/etc/default-config.yaml
index 994d57f..da34c46 100644
--- a/tests/etc/default-config.yaml
+++ b/tests/etc/default-config.yaml
@@ -9,6 +9,11 @@ jobs_working_dir: /var/lib/turbo-hipster/jobs
git_working_dir: /var/lib/turbo-hipster/git
pip_download_cache: /var/cache/pip
+jobs:
+ - name: build:do_something_shelly
+ shell_script: 'ls -lah && echo'
+
+# Legacy job definition as plugins
plugins:
- name: real_db_upgrade
datasets_dir: /var/lib/turbo-hipster/datasets_devstack_131007
@@ -18,10 +23,6 @@ plugins:
datasets_dir: /var/lib/turbo-hipster/datasets_user_001
function: build:real-db-upgrade_nova_mysql_user_001
- - name: shell_script
- function: build:do_something_shelly
- shell_script: 'ls -lah && echo'
-
publish_logs:
type: local
path: /tmp/turbo-hipster/var/www/results/
diff --git a/tests/test_common.py b/tests/test_common.py
index 796257c..493cc8e 100644
--- a/tests/test_common.py
+++ b/tests/test_common.py
@@ -25,9 +25,9 @@ from turbo_hipster.lib import models
class TestTaskStep(testtools.TestCase):
def test_task_step_decorator(self):
class FakeTask(models.Task):
- def __init__(self, global_config, plugin_config, job_name):
- super(FakeTask, self).__init__(global_config, plugin_config,
- job_name)
+ def __init__(self, global_config, job_name, job_config):
+ super(FakeTask, self).__init__(global_config, job_name,
+ job_config)
# Define the number of steps we will do to determine our
# progress.
self.total_steps = 2
@@ -43,7 +43,7 @@ class TestTaskStep(testtools.TestCase):
def do_something_more(self):
pass
- task = FakeTask({}, {}, 'job_name')
+ task = FakeTask({}, 'build:function', {})
task.job = fakes.FakeJob()
self.assertEqual(0, task.current_step)
diff --git a/tests/test_shell_task.py b/tests/test_shell_task.py
index 780d0d7..9a4e0ff 100644
--- a/tests/test_shell_task.py
+++ b/tests/test_shell_task.py
@@ -93,7 +93,7 @@ class TestTaskRunner(base.TestWithGearman):
# Modify the job to fail. The git_path, job_working_dir and unqiue_id
# are all passed to the shell script. If we 'ls unique_id' it'll fail
# since it doesn't exist.
- self.config['plugins'][2]['shell_script'] = 'ls -lah'
+ self.config['jobs'][0]['shell_script'] = 'ls -lah'
zuul.submit_job('build:do_something_shelly', data_req)
zuul.wait_for_completion()
diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py
index f1517f5..cebf8dc 100644
--- a/tests/test_worker_manager.py
+++ b/tests/test_worker_manager.py
@@ -20,50 +20,125 @@ import fakes
class TestWorkerServer(base.TestWithGearman):
- def test_plugins_load(self):
- "Test the configured plugins are loaded"
+ def test_jobs_load_from_legacy_plugins(self):
+ "Test the configured plugins are loaded from legacy config.yaml layout"
self.start_server()
self.assertFalse(self.worker_server.stopped())
- self.assertEqual(3, len(self.worker_server.plugins))
-
- plugin0_config = {
- "name": "real_db_upgrade",
- "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007",
- "function": "build:real-db-upgrade_nova_mysql_devstack_131007"
- }
- plugin1_config = {
- "name": "real_db_upgrade",
- "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
- "function": "build:real-db-upgrade_nova_mysql_user_001"
+ self.assertEqual(3, len(self.worker_server.jobs))
+
+ expected_jobs = {
+ 'build:real-db-upgrade_nova_mysql_devstack_131007': {
+ "name": "build:real-db-upgrade_nova_mysql_devstack_131007",
+ "plugin": "real_db_upgrade",
+ "runner_module_name": "turbo_hipster.task_plugins."
+ "real_db_upgrade.task",
+ "plugin_config": {
+ "name": "real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/"
+ "datasets_devstack_131007",
+ "function": "build:real-db-upgrade_nova_mysql_devstack_"
+ "131007"
+ },
+ },
+ 'build:real-db-upgrade_nova_mysql_user_001': {
+ "name": "build:real-db-upgrade_nova_mysql_user_001",
+ "plugin": "real_db_upgrade",
+ "runner_module_name": "turbo_hipster.task_plugins."
+ "real_db_upgrade.task",
+ "plugin_config": {
+ "name": "real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
+ "function": "build:real-db-upgrade_nova_mysql_user_001"
+ },
+ },
+ 'build:do_something_shelly': {
+ "name": "build:do_something_shelly",
+ "plugin": "shell_script",
+ "runner_module_name": "turbo_hipster.task_plugins."
+ "shell_script.task",
+ "job_config": {
+ "name": "build:do_something_shelly",
+ "shell_script": "ls -lah && echo",
+ },
+ },
}
- plugin2_config = {
- "name": "shell_script",
- "function": "build:do_something_shelly",
- "shell_script": "ls -lah && echo",
+
+ for job_name, job in self.worker_server.jobs.items():
+ self.assertEqual(expected_jobs[job_name]['name'],
+ job['name'])
+ self.assertEqual(expected_jobs[job_name]['plugin'],
+ job['plugin'])
+ if 'plugin_config' in job:
+ self.assertEqual(expected_jobs[job_name]['plugin_config'],
+ job['plugin_config'])
+ if 'job_config' in job:
+ self.assertEqual(expected_jobs[job_name]['job_config'],
+ job['job_config'])
+ self.assertEqual(
+ expected_jobs[job_name]['runner_module_name'],
+ job['runner'].__module__
+ )
+
+ def test_job_configuration(self):
+ "Test config.yaml job layout"
+ self._load_config_fixture('config.yaml')
+ self.start_server()
+
+ self.assertFalse(self.worker_server.stopped())
+ self.assertEqual(3, len(self.worker_server.jobs))
+
+ expected_jobs = {
+ 'build:real-db-upgrade_nova_mysql': {
+ "name": "build:real-db-upgrade_nova_mysql",
+ "plugin": "real_db_upgrade",
+ "runner_module_name": "turbo_hipster.task_plugins."
+ "real_db_upgrade.task",
+ "job_config": {
+ "name": "build:real-db-upgrade_nova_mysql",
+ "plugin": "real_db_upgrade",
+ "datasets_dir": "/home/josh/var/lib/turbo-hipster/datasets"
+ },
+ },
+ 'build:real-db-upgrade_nova_mysql_user_001': {
+ "name": "build:real-db-upgrade_nova_mysql_user_001",
+ "plugin": "real_db_upgrade",
+ "runner_module_name": "turbo_hipster.task_plugins."
+ "real_db_upgrade.task",
+ "plugin_config": {
+ "name": "real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
+ "function": "build:real-db-upgrade_nova_mysql_user_001",
+ },
+ },
+ 'build:some_shell_job': {
+ "name": "build:some_shell_job",
+ "plugin": "shell_script",
+ "runner_module_name": "turbo_hipster.task_plugins."
+ "shell_script.task",
+ "job_config": {
+ "name": "build:some_shell_job",
+ "shell_script": "/dev/null",
+ },
+ },
}
- self.assertEqual(plugin0_config,
- self.worker_server.plugins[0]['plugin_config'])
- self.assertEqual(
- 'turbo_hipster.task_plugins.real_db_upgrade.task',
- self.worker_server.plugins[0]['module'].__name__
- )
-
- self.assertEqual(plugin1_config,
- self.worker_server.plugins[1]['plugin_config'])
- self.assertEqual(
- 'turbo_hipster.task_plugins.real_db_upgrade.task',
- self.worker_server.plugins[1]['module'].__name__
- )
-
- self.assertEqual(plugin2_config,
- self.worker_server.plugins[2]['plugin_config'])
- self.assertEqual(
- 'turbo_hipster.task_plugins.shell_script.task',
- self.worker_server.plugins[2]['module'].__name__
- )
+ for job_name, job in self.worker_server.jobs.items():
+ self.assertEqual(expected_jobs[job_name]['name'],
+ job['name'])
+ self.assertEqual(expected_jobs[job_name]['plugin'],
+ job['plugin'])
+ if 'plugin_config' in job:
+ self.assertEqual(expected_jobs[job_name]['plugin_config'],
+ job['plugin_config'])
+ if 'job_config' in job:
+ self.assertEqual(expected_jobs[job_name]['job_config'],
+ job['job_config'])
+ self.assertEqual(
+ expected_jobs[job_name]['runner_module_name'],
+ job['runner'].__module__
+ )
def test_zuul_client_started(self):
"Test the zuul client has been started"
diff --git a/turbo_hipster/lib/models.py b/turbo_hipster/lib/models.py
index 3599766..403a7f1 100644
--- a/turbo_hipster/lib/models.py
+++ b/turbo_hipster/lib/models.py
@@ -19,6 +19,7 @@ import logging
import os
import pkg_resources
import socket
+import uuid
from turbo_hipster.lib import common
from turbo_hipster.lib import utils
@@ -28,9 +29,13 @@ class Task(object):
""" A base object for running a job (aka Task) """
log = logging.getLogger("task")
- def __init__(self, worker_server, plugin_config, job_name):
+ def __init__(self, worker_server, job_name, job_config):
+ # TODO(jhesketh): remove the need for worker_server here
self.worker_server = worker_server
- self.plugin_config = plugin_config
+ # NOTE(jhesketh): job_config may be in the old format where name
+ # refers to the plugin and function is the job name. Thus these should
+ # never be used in a job, instead use the provided job_name.
+ self.job_config = job_config
self.job_name = job_name
self._reset()
@@ -52,16 +57,16 @@ class Task(object):
self.messages = []
self.current_step = 0
self.log_handler = None
+ self.th_uuid = str(uuid.uuid4())[-12:]
def _prep_working_dir(self):
- self.job_identifier = utils.determine_job_identifier(
- self.job_arguments,
- self.plugin_config['function'],
- self.job.unique
- )
+ # Use the th_uuid so that if the same job is somehow taken twice from
+ # zuul we won't re-use zuul's uuid. This shouldn't happen but if it
+ # does it prevents overwriting previous results
self.job_working_dir = os.path.join(
self.worker_server.config['jobs_working_dir'],
- self.job_identifier
+ self.th_uuid,
+ self.job_arguments['LOG_PATH']
)
self.job_results_dir = os.path.join(
self.job_working_dir,
@@ -221,7 +226,7 @@ class Task(object):
if 'publish_logs' in self.worker_server.config:
index_url = utils.push_file(
- self.job_identifier, self.job_results_dir,
+ self.job_arguments['LOG_PATH'], self.job_results_dir,
self.worker_server.config['publish_logs'])
self.log.debug("Index URL found at %s" % index_url)
self.work_data['url'] = index_url
@@ -229,14 +234,14 @@ class Task(object):
if 'ZUUL_EXTRA_SWIFT_URL' in self.job_arguments:
# Upload to zuul's url as instructed
utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
- self.work_data['url'] = self.job_identifier
+ self.work_data['url'] = self.job_arguments['LOG_PATH']
class ShellTask(Task):
log = logging.getLogger("task.shell_task")
- def __init__(self, worker_server, plugin_config, job_name):
- super(ShellTask, self).__init__(worker_server, plugin_config, job_name)
+ def __init__(self, worker_server, job_name, job_config):
+ super(ShellTask, self).__init__(worker_server, job_name, job_config)
# Define the number of steps we will do to determine our progress.
self.total_steps = 5
@@ -285,7 +290,7 @@ class ShellTask(Task):
self.log.debug("Grab the patchset we want to test against")
local_path = os.path.join(self.worker_server.config['git_working_dir'],
- self.job_name, job_args['ZUUL_PROJECT'])
+ self.th_uuid, job_args['ZUUL_PROJECT'])
if not os.path.exists(local_path):
os.makedirs(local_path)
@@ -305,7 +310,7 @@ class ShellTask(Task):
@common.task_step
def _execute_script(self):
# Run script
- cmd = self.plugin_config['shell_script']
+ cmd = self.job_config['shell_script']
cmd += (
(' %(git_path)s %(job_working_dir)s %(unique_id)s')
% {
@@ -339,8 +344,8 @@ class ShellTask(Task):
def _handle_cleanup(self):
"""Handle and cleanup functions. Shutdown if requested to so that no
further jobs are ran if the environment is dirty."""
- if ('shutdown-th' in self.plugin_config and
- self.plugin_config['shutdown-th']):
+ if ('shutdown-th' in self.job_config and
+ self.job_config['shutdown-th']):
self.worker_server.shutdown_gracefully()
@common.task_step
diff --git a/turbo_hipster/lib/utils.py b/turbo_hipster/lib/utils.py
index aee31e7..3a8fc2c 100644
--- a/turbo_hipster/lib/utils.py
+++ b/turbo_hipster/lib/utils.py
@@ -263,12 +263,6 @@ def scp_push_file(results_set_name, file_path, local_config):
pass
-def determine_job_identifier(zuul_arguments, job, unique):
- # use new determined path from zuul
- path = zuul_arguments['LOG_PATH']
- return path
-
-
def zuul_swift_upload(file_path, job_arguments):
"""Upload working_dir to swift as per zuul's instructions"""
# NOTE(jhesketh): Zuul specifies an object prefix in the destination so
diff --git a/turbo_hipster/task_plugins/real_db_upgrade/task.py b/turbo_hipster/task_plugins/real_db_upgrade/task.py
index 635566c..25bd5ba 100644
--- a/turbo_hipster/task_plugins/real_db_upgrade/task.py
+++ b/turbo_hipster/task_plugins/real_db_upgrade/task.py
@@ -40,8 +40,8 @@ class Runner(models.ShellTask):
log = logging.getLogger("task.real_db_upgrade")
- def __init__(self, worker_server, plugin_config, job_name):
- super(Runner, self).__init__(worker_server, plugin_config, job_name)
+ def __init__(self, worker_server, job_name, job_config):
+ super(Runner, self).__init__(worker_server, job_name, job_config)
# Set up the runner worker
self.datasets = []
@@ -69,10 +69,7 @@ class Runner(models.ShellTask):
if (self.job_arguments['ZUUL_PROJECT'] ==
dataset['config']['project'] and
self._get_project_command(dataset['config']['type'])):
- dataset['determined_path'] = utils.determine_job_identifier(
- self.job_arguments, self.plugin_config['function'],
- self.job.unique
- )
+ dataset['determined_path'] = self.job_arguments['LOG_PATH']
dataset['job_log_file_path'] = os.path.join(
self.worker_server.config['jobs_working_dir'],
dataset['determined_path'],
@@ -129,7 +126,7 @@ class Runner(models.ShellTask):
if len(self.datasets) > 0:
return self.datasets
- datasets_path = self.plugin_config['datasets_dir']
+ datasets_path = self.job_config['datasets_dir']
for ent in os.listdir(datasets_path):
dataset_dir = os.path.join(datasets_path, ent)
if (os.path.isdir(dataset_dir) and os.path.isfile(
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index fae193d..5a2ff00 100644
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -47,15 +47,14 @@ class Server(threading.Thread):
# Config init
self.zuul_manager = None
self.zuul_client = None
- self.plugins = []
self.services_started = False
# TODO: Make me unique (random?) and we should be able to run multiple
# instances of turbo-hipster on the one host
self.worker_name = os.uname()[1]
- self.tasks = {}
- self.load_plugins()
+ self.jobs = {}
+ self.load_jobs()
def load_extra_configuration(self):
if isdir(self.config["conf_d"]):
@@ -84,41 +83,71 @@ class Server(threading.Thread):
filename=log_file,
level=logging.DEBUG)
+ def load_jobs(self):
+ # Legacy, load the plugins first
+ self.load_plugins()
+
+ self.log.debug("Loading jobs")
+ if 'jobs' in self.config:
+ for job in self.config['jobs']:
+ try:
+ plugin = 'shell_script'
+ if 'plugin' in job:
+ plugin = job['plugin']
+
+ module = __import__('turbo_hipster.task_plugins.' +
+ plugin + '.task',
+ fromlist='turbo_hipster.task_plugins' +
+ plugin)
+
+ self.jobs[job['name']] = {
+ 'name': job['name'],
+ 'plugin': plugin,
+ 'job_config': job,
+ 'runner': module.Runner(self, job['name'], job),
+ }
+ self.log.debug('Job %s loaded' % job['name'])
+ except Exception as e:
+ self.log.exception("Failure loading job")
+ self.log.exception(e)
+
def load_plugins(self):
""" Load the available plugins from task_plugins """
self.log.debug('Loading plugins')
# Load plugins
- for plugin in self.config['plugins']:
- self.plugins.append({
- 'module': __import__('turbo_hipster.task_plugins.' +
- plugin['name'] + '.task',
- fromlist='turbo_hipster.task_plugins' +
- plugin['name']),
- 'plugin_config': plugin
- })
- self.log.debug('Plugin %s loaded' % plugin['name'])
+ if 'plugins' in self.config:
+ for plugin in self.config['plugins']:
+ try:
+ module = __import__('turbo_hipster.task_plugins.' +
+ plugin['name'] + '.task',
+ fromlist='turbo_hipster.task_plugins' +
+ plugin['name'])
+
+ self.jobs[plugin['function']] = {
+ 'name': plugin['function'],
+ 'plugin': plugin['name'],
+ 'plugin_config': plugin,
+ 'runner': module.Runner(
+ self, plugin['function'], plugin
+ ),
+ }
+ self.log.debug('Job %s loaded' % plugin['function'])
+ except Exception as e:
+ self.log.exception("Failure loading plugin")
+ self.log.exception(e)
def start_zuul_client(self):
""" Run the tasks """
self.log.debug('Starting zuul client')
self.zuul_client = worker_manager.ZuulClient(self)
- for task_number, plugin in enumerate(self.plugins):
- module = plugin['module']
- job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
- self.worker_name, task_number)
- self.tasks[job_name] = module.Runner(
- self,
- plugin['plugin_config'],
- job_name
- )
- self.zuul_client.add_function(plugin['plugin_config']['function'],
- self.tasks[job_name])
+ for job in self.jobs.values():
+ self.zuul_client.add_function(job['name'], job['runner'])
self.zuul_client.start()
def start_zuul_manager(self):
- self.zuul_manager = worker_manager.ZuulManager(self, self.tasks)
+ self.zuul_manager = worker_manager.ZuulManager(self, self.jobs)
self.zuul_manager.start()
def shutdown_gracefully(self):