summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2014-03-25 16:26:45 +1100
committerJoshua Hesketh <josh@nitrotech.org>2014-04-03 10:42:49 +1100
commit96adb287f8755f089dd68eaa7ac7ee290841f977 (patch)
tree0a6782c6af9585390072b3ccddc1a6818e069867
parent5cd8ea3ce3a115f27424cfe3eb00cb0fbe4e3fca (diff)
downloadturbo-hipster-96adb287f8755f089dd68eaa7ac7ee290841f977.tar.gz
Allow jobs to shutdown turbo-hipster
This lets jobs turn off and exit turbo-hipster once they are done. This is useful for when using nodepool or when a job leaves the environment dirty and we can't run more jobs on this worker. Change-Id: I823be4196a5bf9ca92a14d9caf26163398a9434c
-rw-r--r--tests/etc/default-config.yaml33
-rw-r--r--tests/etc/shutdown-config.yaml25
-rw-r--r--tests/fakes.py45
-rw-r--r--tests/fixtures/default-config.json35
-rw-r--r--tests/test_worker_manager.py68
-rw-r--r--turbo_hipster/cmd/server.py4
-rw-r--r--turbo_hipster/lib/models.py47
-rw-r--r--turbo_hipster/lib/utils.py13
-rw-r--r--turbo_hipster/task_plugins/gate_real_db_upgrade/task.py33
-rw-r--r--turbo_hipster/worker_manager.py50
-rw-r--r--[-rwxr-xr-x]turbo_hipster/worker_server.py25
11 files changed, 268 insertions, 110 deletions
diff --git a/tests/etc/default-config.yaml b/tests/etc/default-config.yaml
new file mode 100644
index 0000000..0412e01
--- /dev/null
+++ b/tests/etc/default-config.yaml
@@ -0,0 +1,33 @@
+zuul_server:
+ gerrit_site: http://review.openstack.org
+ zuul_site: http://119.9.13.90
+ git_origin: git://git.openstack.org/
+ gearman_host: localhost
+ gearman_port: 0
+
+debug_log: /var/log/turbo-hipster/debug.log
+jobs_working_dir: /var/lib/turbo-hipster/jobs
+git_working_dir: /var/lib/turbo-hipster/git
+pip_download_cache: /var/cache/pip
+
+plugins:
+ - name: gate_real_db_upgrade
+ datasets_dir: /var/lib/turbo-hipster/datasets_devstack_131007
+ function: build:gate-real-db-upgrade_nova_mysql_devstack_131007
+
+ - name: gate_real_db_upgrade
+ datasets_dir: /var/lib/turbo-hipster/datasets_user_001
+ function: build:gate-real-db-upgrade_nova_mysql_user_001
+
+ - name: shell_script
+ function: build:do_something_shelly
+
+publish_logs:
+ type: swift
+ authurl: https://identity.api.rackspacecloud.com/v2.0/
+ tenant: XXXX
+ user: XXXXXX
+ password: XXXXXX
+ container: XXXXXX
+ region: SYD
+ prepend_url: http://www.rcbops.com/turbo_hipster/results/
diff --git a/tests/etc/shutdown-config.yaml b/tests/etc/shutdown-config.yaml
new file mode 100644
index 0000000..9175873
--- /dev/null
+++ b/tests/etc/shutdown-config.yaml
@@ -0,0 +1,25 @@
+zuul_server:
+ gerrit_site: http://review.openstack.org
+ zuul_site: http://119.9.13.90
+ git_origin: git://git.openstack.org/
+ gearman_host: localhost
+ gearman_port: 0
+
+debug_log: /var/log/turbo-hipster/debug.log
+jobs_working_dir: /var/lib/turbo-hipster/jobs
+git_working_dir: /var/lib/turbo-hipster/git
+pip_download_cache: /var/cache/pip
+
+plugins:
+ - name: shell_script
+ function: build:demo_job_clean
+ shell_script: /dev/null
+ - name: shell_script
+ function: build:demo_job_dirty
+ shell_script: /dev/null
+ shutdown-th: true
+
+publish_logs:
+ type: local
+ path: /var/lib/turbo_hipster/logs
+ prepend_url: http://mylogserver/ \ No newline at end of file
diff --git a/tests/fakes.py b/tests/fakes.py
index 5f78fbf..1b377cd 100644
--- a/tests/fakes.py
+++ b/tests/fakes.py
@@ -14,6 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import gear
+import json
+import time
+import uuid
+
class FakeJob(object):
def __init__(self):
@@ -21,3 +26,43 @@ class FakeJob(object):
def sendWorkStatus(self, *args, **kwargs):
pass
+
+
+class FakeZuul(object):
+ """A fake zuul/gearman client to request work from gearman and check
+ results"""
+ def __init__(self, server, port):
+ self.gearman = gear.Client('FakeZuul')
+ self.gearman.addServer(server, port)
+ self.gearman.waitForServer()
+ self.job = None
+
+ def make_zuul_data(self, data={}):
+ defaults = {
+ 'ZUUL_UUID': str(uuid.uuid1()),
+ 'ZUUL_REF': 'a',
+ 'ZUUL_COMMIT': 'a',
+ 'ZUUL_PROJECT': 'a',
+ 'ZUUL_PIPELINE': 'a',
+ 'ZUUL_URL': 'http://localhost',
+ 'BASE_LOG_PATH': '56/123456/8',
+ 'LOG_PATH': '56/123456/8/check/job_name/uuid123'
+ }
+ defaults.update(data)
+ return defaults
+
+ def submit_job(self, name, data):
+ if not self.job:
+ self.job = gear.Job(name,
+ json.dumps(data),
+ unique=str(time.time()))
+ self.gearman.submitJob(self.job)
+ else:
+ raise Exception('A job already exists in self.job')
+
+ return self.job
+
+ def wait_for_completion(self):
+ if self.job:
+ while not self.job.complete:
+ time.sleep(0.1)
diff --git a/tests/fixtures/default-config.json b/tests/fixtures/default-config.json
deleted file mode 100644
index e33ea42..0000000
--- a/tests/fixtures/default-config.json
+++ /dev/null
@@ -1,35 +0,0 @@
-{
- "zuul_server": {
- "gerrit_site": "http://review.openstack.org",
- "zuul_site": "http://localhost",
- "git_origin": "git://git.openstack.org/",
- "gearman_host": "localhost",
- "gearman_port": 0
- },
- "debug_log": "/home/josh/var/log/turbo-hipster/debug.log",
- "jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
- "git_working_dir": "/home/josh/var/lib/turbo-hipster/git",
- "pip_download_cache": "/home/josh/var/cache/pip",
- "plugins": [
- {
- "name": "gate_real_db_upgrade",
- "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007",
- "function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007"
- },
- {
- "name": "gate_real_db_upgrade",
- "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
- "function": "build:gate-real-db-upgrade_nova_mysql_user_001"
- },
- {
- "name": "shell_script",
- "function": "build:do_something_shelly"
- }
- ],
- "publish_logs":
- {
- "type": "local",
- "path": "/home/josh/var/www/results/",
- "prepend_url": "http://localhost/results/"
- }
-}
diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py
index 1f3de5e..ff0733f 100644
--- a/tests/test_worker_manager.py
+++ b/tests/test_worker_manager.py
@@ -15,6 +15,7 @@
# under the License.
+import fixtures
import gear
import logging
import os
@@ -25,6 +26,8 @@ import yaml
import turbo_hipster.task_plugins.gate_real_db_upgrade.task
import turbo_hipster.worker_server
+import fakes
+
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s')
@@ -36,12 +39,13 @@ class TestWithGearman(testtools.TestCase):
def setUp(self):
super(TestWithGearman, self).setUp()
-
- self.config = []
- self._load_config_fixture()
-
+ self.config = None
+ self.worker_server = None
self.gearman_server = gear.Server(0)
+ def start_server(self):
+ if not self.config:
+ self._load_config_fixture()
# Grab the port so the clients can connect to it
self.config['zuul_server']['gearman_port'] = self.gearman_server.port
@@ -57,20 +61,39 @@ class TestWithGearman(testtools.TestCase):
self.fail("Failed to start worker_service services")
def tearDown(self):
- self.worker_server.stop()
+ if self.worker_server and not self.worker_server.stopped():
+ self.worker_server.shutdown()
self.gearman_server.shutdown()
super(TestWithGearman, self).tearDown()
- def _load_config_fixture(self, config_name='default-config.json'):
- config_dir = os.path.join(os.path.dirname(__file__), 'fixtures')
+ def _load_config_fixture(self, config_name='default-config.yaml'):
+ config_dir = os.path.join(os.path.dirname(__file__), 'etc')
with open(os.path.join(config_dir, config_name), 'r') as config_stream:
self.config = yaml.safe_load(config_stream)
+ # Set all of the working dirs etc to a writeable temp dir
+ temp_path = self.useFixture(fixtures.TempDir()).path
+ for config_dir in ['debug_log', 'jobs_working_dir', 'git_working_dir',
+ 'pip_download_cache']:
+ if config_dir in self.config:
+ if self.config[config_dir][0] == '/':
+ self.config[config_dir] = self.config[config_dir][1:]
+ self.config[config_dir] = os.path.join(temp_path,
+ self.config[config_dir])
+ if self.config['publish_logs']['type'] == 'local':
+ if self.config['publish_logs']['path'][0] == '/':
+ self.config['publish_logs']['path'] = \
+ self.config['publish_logs']['path'][1:]
+ self.config['publish_logs']['path'] = os.path.join(
+ temp_path, self.config[config_dir])
+
class TestWorkerServer(TestWithGearman):
def test_plugins_load(self):
"Test the configured plugins are loaded"
+ self.start_server()
+
self.assertFalse(self.worker_server.stopped())
self.assertEqual(3, len(self.worker_server.plugins))
@@ -112,10 +135,12 @@ class TestWorkerServer(TestWithGearman):
def test_zuul_client_started(self):
"Test the zuul client has been started"
+ self.start_server()
self.assertFalse(self.worker_server.zuul_client.stopped())
def test_zuul_manager_started(self):
"Test the zuul manager has been started"
+ self.start_server()
self.assertFalse(self.worker_server.zuul_manager.stopped())
@@ -126,6 +151,9 @@ class TestZuulClient(TestWithGearman):
def test_registered_functions(self):
"Test the correct functions are registered with gearman"
+
+ self.start_server()
+
# The client should have all of the functions defined in the config
# registered with gearman
@@ -160,10 +188,36 @@ class TestZuulClient(TestWithGearman):
"Test sending a stop signal to the client exists correctly"
pass
+ def test_job_can_shutdown_th(self):
+ self._load_config_fixture('shutdown-config.yaml')
+ self.start_server()
+ zuul = fakes.FakeZuul(self.config['zuul_server']['gearman_host'],
+ self.config['zuul_server']['gearman_port'])
+
+ # First check we can run a job that /doesn't/ shut down turbo-hipster
+ data_req = zuul.make_zuul_data()
+ zuul.submit_job('build:demo_job_clean', data_req)
+ zuul.wait_for_completion()
+ self.assertTrue(zuul.job.complete)
+ self.assertFalse(self.worker_server.stopped())
+
+ # Now run a job that leaves the environment dirty and /should/ shut
+ # down turbo-hipster
+ zuul.job = None
+ zuul.submit_job('build:demo_job_dirty', data_req)
+ zuul.wait_for_completion()
+ self.assertTrue(zuul.job.complete)
+ # Give the server a second to shutdown
+ time.sleep(1)
+ self.assertTrue(self.worker_server.stopped())
+
class TestZuulManager(TestWithGearman):
def test_registered_functions(self):
"Test the correct functions are registered with gearman"
+
+ self.start_server()
+
# We need to wait for all the functions to register with the server..
# We'll give it up to 10seconds to do so
t0 = time.time()
diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py
index 6dc5e37..2e4df47 100644
--- a/turbo_hipster/cmd/server.py
+++ b/turbo_hipster/cmd/server.py
@@ -44,7 +44,7 @@ def main(args):
server.setup_logging(config['debug_log'])
def term_handler(signum, frame):
- server.stop()
+ server.shutdown()
signal.signal(signal.SIGTERM, term_handler)
if args.background:
@@ -56,7 +56,7 @@ def main(args):
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking tasks to exit nicely...\n"
- server.stop()
+ server.shutdown()
if __name__ == '__main__':
diff --git a/turbo_hipster/lib/models.py b/turbo_hipster/lib/models.py
index cd73a73..f140b6c 100644
--- a/turbo_hipster/lib/models.py
+++ b/turbo_hipster/lib/models.py
@@ -26,8 +26,8 @@ class Task(object):
""" A base object for running a job (aka Task) """
log = logging.getLogger("lib.models.Task")
- def __init__(self, global_config, plugin_config, job_name):
- self.global_config = global_config
+ def __init__(self, worker_server, plugin_config, job_name):
+ self.worker_server = worker_server
self.plugin_config = plugin_config
self.job_name = job_name
self._reset()
@@ -125,10 +125,10 @@ class Task(object):
class ShellTask(Task):
log = logging.getLogger("lib.models.ShellTask")
- def __init__(self, global_config, plugin_config, job_name):
- super(ShellTask, self).__init__(global_config, plugin_config, job_name)
+ def __init__(self, worker_server, plugin_config, job_name):
+ super(ShellTask, self).__init__(worker_server, plugin_config, job_name)
# Define the number of steps we will do to determine our progress.
- self.total_steps = 5
+ self.total_steps = 6
def _reset(self):
super(ShellTask, self)._reset()
@@ -137,21 +137,24 @@ class ShellTask(Task):
self.shell_output_log = None
def do_job_steps(self):
- # Step 1: Prep job working dir
+ self.log.info('Step 1: Prep job working dir')
self._prep_working_dir()
- # Step 2: Checkout updates from git
+ self.log.info('Step 2: Checkout updates from git')
self._grab_patchset(self.job_arguments)
- # Step 3: Run shell script
+ self.log.info('Step 3: Run shell script')
self._execute_script()
- # Step 4: Analyse logs for errors
+ self.log.info('Step 4: Analyse logs for errors')
self._parse_and_check_results()
- # Step 5: handle the results (and upload etc)
+ self.log.info('Step 5: handle the results (and upload etc)')
self._handle_results()
+ self.log.info('Step 6: Handle extra actions such as shutting down')
+ self._handle_cleanup()
+
@common.task_step
def _prep_working_dir(self):
self.job_identifier = utils.determine_job_identifier(
@@ -160,7 +163,7 @@ class ShellTask(Task):
self.job.unique
)
self.job_working_dir = os.path.join(
- self.global_config['jobs_working_dir'],
+ self.worker_server.config['jobs_working_dir'],
self.job_identifier
)
self.shell_output_log = os.path.join(
@@ -176,7 +179,7 @@ class ShellTask(Task):
""" Checkout the reference into config['git_working_dir'] """
self.log.debug("Grab the patchset we want to test against")
- local_path = os.path.join(self.global_config['git_working_dir'],
+ local_path = os.path.join(self.worker_server.config['git_working_dir'],
self.job_name, job_args['ZUUL_PROJECT'])
if not os.path.exists(local_path):
os.makedirs(local_path)
@@ -185,8 +188,8 @@ class ShellTask(Task):
cmd = os.path.join(os.path.join(os.path.dirname(__file__),
'gerrit-git-prep.sh'))
- cmd += ' ' + self.global_config['zuul_server']['gerrit_site']
- cmd += ' ' + self.global_config['zuul_server']['zuul_site']
+ cmd += ' ' + self.worker_server.config['zuul_server']['gerrit_site']
+ cmd += ' ' + self.worker_server.config['zuul_server']['zuul_site']
utils.execute_to_log(cmd, self.shell_output_log, env=git_args,
cwd=local_path)
self.git_path = local_path
@@ -223,10 +226,10 @@ class ShellTask(Task):
self.log.debug("Process the resulting files (upload/push)")
- if 'publish_logs' in self.global_config:
- index_url = utils.push_file(self.job_identifier,
- self.shell_output_log,
- self.global_config['publish_logs'])
+ if 'publish_logs' in self.worker_server.config:
+ index_url = utils.push_file(
+ self.job_identifier, self.shell_output_log,
+ self.worker_server.config['publish_logs'])
self.log.debug("Index URL found at %s" % index_url)
self.work_data['url'] = index_url
@@ -234,3 +237,11 @@ class ShellTask(Task):
# Upload to zuul's url as instructed
utils.zuul_swift_upload(self.job_working_dir, self.job_arguments)
self.work_data['url'] = self.job_identifier
+
+ @common.task_step
+ def _handle_cleanup(self):
+ """Handle and cleanup functions. Shutdown if requested to so that no
+ further jobs are ran if the environment is dirty."""
+ if ('shutdown-th' in self.plugin_config and
+ self.plugin_config['shutdown-th']):
+ self.worker_server.shutdown_gracefully()
diff --git a/turbo_hipster/lib/utils.py b/turbo_hipster/lib/utils.py
index 85e2183..fa295af 100644
--- a/turbo_hipster/lib/utils.py
+++ b/turbo_hipster/lib/utils.py
@@ -256,17 +256,8 @@ def scp_push_file(results_set_name, file_path, local_config):
def determine_job_identifier(zuul_arguments, job, unique):
- if 'build:' in job:
- job = job.split('build:')[1]
-
- path = os.path.join(zuul_arguments['ZUUL_CHANGE'][:2],
- zuul_arguments['ZUUL_CHANGE'],
- zuul_arguments['ZUUL_PATCHSET'],
- zuul_arguments['ZUUL_PIPELINE'],
- job,
- unique[:7])
- log.info('Converted args: %s, job: %s and unique: %s to %s'
- % (zuul_arguments, job, unique, path))
+ # use new determined path from zuul
+ path = zuul_arguments['LOG_PATH']
return path
diff --git a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py
index 668ed88..21946da 100644
--- a/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py
+++ b/turbo_hipster/task_plugins/gate_real_db_upgrade/task.py
@@ -40,15 +40,15 @@ class Runner(models.ShellTask):
log = logging.getLogger("task_plugins.gate_real_db_upgrade.task.Runner")
- def __init__(self, global_config, plugin_config, job_name):
- super(Runner, self).__init__(global_config, plugin_config, job_name)
+ def __init__(self, worker_server, plugin_config, job_name):
+ super(Runner, self).__init__(worker_server, plugin_config, job_name)
# Set up the runner worker
self.datasets = []
self.job_datasets = []
# Define the number of steps we will do to determine our progress.
- self.total_steps = 6
+ self.total_steps += 1
def do_job_steps(self):
# Step 1: Figure out which datasets to run
@@ -74,7 +74,7 @@ class Runner(models.ShellTask):
self.job.unique
)
dataset['job_log_file_path'] = os.path.join(
- self.global_config['jobs_working_dir'],
+ self.worker_server.config['jobs_working_dir'],
dataset['determined_path'],
dataset['name'] + '.log'
)
@@ -102,7 +102,7 @@ class Runner(models.ShellTask):
self.log.debug("Process the resulting files (upload/push)")
index_url = handle_results.generate_push_results(
self.job_datasets,
- self.global_config['publish_logs']
+ self.worker_server.config['publish_logs']
)
self.log.debug("Index URL found at %s" % index_url)
self.work_data['url'] = index_url
@@ -162,8 +162,8 @@ class Runner(models.ShellTask):
for dataset in self.job_datasets:
cmd = os.path.join(os.path.join(os.path.dirname(__file__),
- (self.global_config['baseline_command']
- % self.global_config['flavor'])))
+ (self.worker_server.config['baseline_command']
+ % self.worker_server.config['flavor'])))
rc = utils.execute_to_log(
cmd,
dataset['job_log_file_path'],
@@ -187,7 +187,7 @@ class Runner(models.ShellTask):
% {
'unique_id': self.job.unique,
'job_working_dir': os.path.join(
- self.global_config['jobs_working_dir'],
+ self.worker_server.config['jobs_working_dir'],
dataset['determined_path']
),
'git_path': self.git_path,
@@ -202,7 +202,8 @@ class Runner(models.ShellTask):
dataset['dataset_dir'],
dataset['config']['logging_conf']
),
- 'pip_cache_dir': self.global_config['pip_download_cache']
+ 'pip_cache_dir':
+ self.worker_server.config['pip_download_cache']
}
)
@@ -210,13 +211,13 @@ class Runner(models.ShellTask):
syslog = '/var/log/syslog'
sqlslo = '/var/log/mysql/slow-queries.log'
sqlerr = '/var/log/mysql/error.log'
- if 'logs' in self.global_config:
- if 'syslog' in self.global_config['logs']:
- syslog = self.global_config['logs']['syslog']
- if 'sqlslo' in self.global_config['logs']:
- sqlslo = self.global_config['logs']['sqlslo']
- if 'sqlerr' in self.global_config['logs']:
- sqlerr = self.global_config['logs']['sqlerr']
+ if 'logs' in self.worker_server.config:
+ if 'syslog' in self.worker_server.config['logs']:
+ syslog = self.worker_server.config['logs']['syslog']
+ if 'sqlslo' in self.worker_server.config['logs']:
+ sqlslo = self.worker_server.config['logs']['sqlslo']
+ if 'sqlerr' in self.worker_server.config['logs']:
+ sqlerr = self.worker_server.config['logs']['sqlerr']
rc = utils.execute_to_log(
cmd,
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index a220425..40a1f35 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -18,6 +18,7 @@ import json
import logging
import os
import threading
+import time
class ZuulManager(threading.Thread):
@@ -30,10 +31,13 @@ class ZuulManager(threading.Thread):
log = logging.getLogger("worker_manager.ZuulManager")
- def __init__(self, config, tasks):
+ def __init__(self, worker_server, tasks):
super(ZuulManager, self).__init__()
self._stop = threading.Event()
- self.config = config
+ self.stopping = False
+ self.running = False
+
+ self.worker_server = worker_server
self.tasks = tasks
self.gearman_worker = None
@@ -44,8 +48,8 @@ class ZuulManager(threading.Thread):
self.gearman_worker = gear.Worker('turbo-hipster-manager-%s'
% hostname)
self.gearman_worker.addServer(
- self.config['zuul_server']['gearman_host'],
- self.config['zuul_server']['gearman_port']
+ self.worker_server.config['zuul_server']['gearman_host'],
+ self.worker_server.config['zuul_server']['gearman_port']
)
def register_functions(self):
@@ -53,6 +57,15 @@ class ZuulManager(threading.Thread):
self.gearman_worker.registerFunction(
'stop:turbo-hipster-manager-%s' % hostname)
+ def stop_gracefully(self):
+ self.stopping = True
+ self.gearman_worker.stopWaitingForJobs()
+ while self.running:
+ self.log.debug('waiting to finish')
+ time.sleep(0.1)
+ self._stop.set()
+ self.gearman_worker.shutdown()
+
def stop(self):
self._stop.set()
# Unblock gearman
@@ -64,7 +77,8 @@ class ZuulManager(threading.Thread):
return self._stop.isSet()
def run(self):
- while not self.stopped():
+ while not self.stopped() and not self.stopping:
+ self.running = True
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
@@ -81,6 +95,7 @@ class ZuulManager(threading.Thread):
self.log.debug('We were asked to stop waiting for jobs')
except:
self.log.exception('Unknown exception waiting for job.')
+ self.running = False
self.log.debug("Finished manager thread")
def _handle_job(self, job):
@@ -101,12 +116,13 @@ class ZuulClient(threading.Thread):
log = logging.getLogger("worker_manager.ZuulClient")
- def __init__(self, global_config, worker_name):
+ def __init__(self, worker_server):
super(ZuulClient, self).__init__()
self._stop = threading.Event()
- self.global_config = global_config
+ self.stopping = False
+ self.running = False
- self.worker_name = worker_name
+ self.worker_server = worker_server
# Set up the runner worker
self.gearman_worker = None
@@ -118,10 +134,10 @@ class ZuulClient(threading.Thread):
def setup_gearman(self):
self.log.debug("Set up gearman worker")
- self.gearman_worker = gear.Worker(self.worker_name)
+ self.gearman_worker = gear.Worker(self.worker_server.worker_name)
self.gearman_worker.addServer(
- self.global_config['zuul_server']['gearman_host'],
- self.global_config['zuul_server']['gearman_port']
+ self.worker_server.config['zuul_server']['gearman_host'],
+ self.worker_server.config['zuul_server']['gearman_port']
)
def register_functions(self):
@@ -143,11 +159,20 @@ class ZuulClient(threading.Thread):
self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
+ def stop_gracefully(self):
+ self.stopping = True
+ self.gearman_worker.stopWaitingForJobs()
+ while self.running:
+ time.sleep(0.1)
+ self._stop.set()
+ self.gearman_worker.shutdown()
+
def stopped(self):
return self._stop.isSet()
def run(self):
- while not self.stopped():
+ while not self.stopped() and not self.stopping:
+ self.running = True
try:
# gearman_worker.getJob() blocks until a job is available
self.log.debug("Waiting for server")
@@ -163,6 +188,7 @@ class ZuulClient(threading.Thread):
self.log.debug('We were asked to stop waiting for jobs')
except:
self.log.exception('Unknown exception waiting for job.')
+ self.running = False
self.log.debug("Finished client thread")
def _handle_job(self):
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index 66225ff..35c7882 100755..100644
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -15,9 +15,6 @@
# under the License.
-""" worker_server.py is an executable worker server that loads and runs
-task_plugins. """
-
import logging
import os
import threading
@@ -104,15 +101,14 @@ class Server(threading.Thread):
def start_zuul_client(self):
""" Run the tasks """
self.log.debug('Starting zuul client')
- self.zuul_client = worker_manager.ZuulClient(self.config,
- self.worker_name)
+ self.zuul_client = worker_manager.ZuulClient(self)
for task_number, plugin in enumerate(self.plugins):
module = plugin['module']
job_name = '%s-%s-%s' % (plugin['plugin_config']['name'],
self.worker_name, task_number)
self.tasks[job_name] = module.Runner(
- self.config,
+ self,
plugin['plugin_config'],
job_name
)
@@ -122,14 +118,25 @@ class Server(threading.Thread):
self.zuul_client.start()
def start_zuul_manager(self):
- self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
+ self.zuul_manager = worker_manager.ZuulManager(self, self.tasks)
self.zuul_manager.start()
- def stop(self):
+ def shutdown_gracefully(self):
+ """ Shutdown while no work is currently happening """
+ self.log.debug('Graceful shutdown once jobs are complete...')
+ thread = threading.Thread(target=self._shutdown_gracefully)
+ thread.start()
+
+ def _shutdown_gracefully(self):
+ self.zuul_client.stop_gracefully()
+ self.zuul_manager.stop_gracefully()
self._stop.set()
- self.log.debug('Exiting...')
+
+ def shutdown(self):
+ self.log.debug('Shutting down now!...')
self.zuul_client.stop()
self.zuul_manager.stop()
+ self._stop.set()
def stopped(self):
return self._stop.isSet()