summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2014-03-27 15:28:00 +1100
committerJoshua Hesketh <josh@nitrotech.org>2014-03-28 15:53:57 +1100
commit5cd8ea3ce3a115f27424cfe3eb00cb0fbe4e3fca (patch)
tree936a20579f5ace69f98e91ed0b5585aefb1a8b75
parent49c906314778ee588dc0036593bdbf92ac899b25 (diff)
downloadturbo-hipster-5cd8ea3ce3a115f27424cfe3eb00cb0fbe4e3fca.tar.gz
Add in tests for ZuulManager and ZuulClient
Also tidy up unnecessary fakes. Change-Id: I165667faf61faa7fc06c10925db462e4650bda6a
-rw-r--r--.testr.conf1
-rw-r--r--requirements.txt4
-rw-r--r--tests/fakes.py286
-rw-r--r--tests/fixtures/default-config.json35
-rw-r--r--tests/test_worker_manager.py183
-rw-r--r--tox.ini5
-rw-r--r--turbo_hipster/cmd/server.py6
-rw-r--r--turbo_hipster/worker_manager.py10
-rwxr-xr-xturbo_hipster/worker_server.py15
9 files changed, 207 insertions, 338 deletions
diff --git a/.testr.conf b/.testr.conf
index 3aa8cae..b951a05 100644
--- a/.testr.conf
+++ b/.testr.conf
@@ -6,3 +6,4 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
test_id_option=--load-list $IDFILE
test_list_option=--list
+test_run_concurrency=echo 1
diff --git a/requirements.txt b/requirements.txt
index d64b484..26e13b2 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
pbr>=0.5.21,<0.6
-gear
+gear>=0.5.4,<1.0.0
python-swiftclient
python-keystoneclient
@@ -13,4 +13,4 @@ sphinxcontrib-seqdiag
mysql-python
requests
-PyYAML
+PyYAML>=3.1.0,<4.0.0
diff --git a/tests/fakes.py b/tests/fakes.py
index 07bac13..5f78fbf 100644
--- a/tests/fakes.py
+++ b/tests/fakes.py
@@ -14,292 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import gear
-import json
-import threading
-import os
-import re
-import time
-
-from turbo_hipster.worker_manager import ZuulManager
-from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\
- as RealDbUpgradeRunner
-
-
-class FakeZuulManager(ZuulManager):
- def __init__(self, config, tasks, test):
- self.test = test
- super(FakeZuulManager, self).__init__(config, tasks)
-
- def setup_gearman(self):
- hostname = os.uname()[1]
- self.gearman_worker = FakeWorker('turbo-hipster-manager-%s'
- % hostname, self.test)
- self.gearman_worker.addServer(
- self.config['zuul_server']['gearman_host'],
- self.config['zuul_server']['gearman_port']
- )
- self.gearman_worker.registerFunction(
- 'stop:turbo-hipster-manager-%s' % hostname)
-
-
-class FakeWorker(gear.Worker):
- def __init__(self, worker_id, test):
- super(FakeWorker, self).__init__(worker_id)
- self.gearman_jobs = {}
- self.build_history = []
- self.running_builds = []
- self.build_counter = 0
- self.fail_tests = {}
- self.test = test
-
- self.hold_jobs_in_build = False
- self.lock = threading.Lock()
- self.__work_thread = threading.Thread(target=self.work)
- self.__work_thread.daemon = True
- self.__work_thread.start()
-
- def handleJob(self, job):
- parts = job.name.split(":")
- cmd = parts[0]
- name = parts[1]
- if len(parts) > 2:
- node = parts[2]
- else:
- node = None
- if cmd == 'build':
- self.handleBuild(job, name, node)
- elif cmd == 'stop':
- self.handleStop(job, name)
- elif cmd == 'set_description':
- self.handleSetDescription(job, name)
-
- def handleBuild(self, job, name, node):
- build = FakeBuild(self, job, self.build_counter, node)
- job.build = build
- self.gearman_jobs[job.unique] = job
- self.build_counter += 1
-
- self.running_builds.append(build)
- build.start()
-
- def handleStop(self, job, name):
- self.log.debug("handle stop")
- parameters = json.loads(job.arguments)
- name = parameters['name']
- number = parameters['number']
- for build in self.running_builds:
- if build.name == name and build.number == number:
- build.aborted = True
- build.release()
- job.sendWorkComplete()
- return
- job.sendWorkFail()
-
- def handleSetDescription(self, job, name):
- self.log.debug("handle set description")
- parameters = json.loads(job.arguments)
- name = parameters['name']
- number = parameters['number']
- descr = parameters['html_description']
- for build in self.running_builds:
- if build.name == name and build.number == number:
- build.description = descr
- job.sendWorkComplete()
- return
- for build in self.build_history:
- if build.name == name and build.number == number:
- build.description = descr
- job.sendWorkComplete()
- return
- job.sendWorkFail()
-
- def work(self):
- while self.running:
- try:
- job = self.getJob()
- except gear.InterruptedError:
- continue
- try:
- self.handleJob(job)
- except:
- self.log.exception("Worker exception:")
-
- def addFailTest(self, name, change):
- l = self.fail_tests.get(name, [])
- l.append(change)
- self.fail_tests[name] = l
-
- def shouldFailTest(self, name, ref):
- l = self.fail_tests.get(name, [])
- for change in l:
- if self.test.ref_has_change(ref, change):
- return True
- return False
-
- def release(self, regex=None):
- builds = self.running_builds[:]
- self.log.debug("releasing build %s (%s)" % (regex,
- len(self.running_builds)))
- for build in builds:
- if not regex or re.match(regex, build.name):
- self.log.debug("releasing build %s" %
- (build.parameters['ZUUL_UUID']))
- build.release()
- else:
- self.log.debug("not releasing build %s" %
- (build.parameters['ZUUL_UUID']))
- self.log.debug("done releasing builds %s (%s)" %
- (regex, len(self.running_builds)))
-
-
-class FakeRealDbUpgradeRunner(RealDbUpgradeRunner):
- def __init__(self, global_config, plugin_config, worker_name, test):
- self.test = test
- super(FakeRealDbUpgradeRunner, self).__init__(global_config,
- plugin_config,
- worker_name)
-
-
-class BuildHistory(object):
- def __init__(self, **kw):
- self.__dict__.update(kw)
-
- def __repr__(self):
- return ("<Completed build, result: %s name: %s #%s changes: %s>" %
- (self.result, self.name, self.number, self.changes))
-
-
-class FakeBuild(threading.Thread):
- def __init__(self, worker, job, number, node):
- threading.Thread.__init__(self)
- self.daemon = True
- self.worker = worker
- self.job = job
- self.name = job.name.split(':')[1]
- self.number = number
- self.node = node
- self.parameters = json.loads(job.arguments)
- self.unique = self.parameters['ZUUL_UUID']
- self.wait_condition = threading.Condition()
- self.waiting = False
- self.aborted = False
- self.created = time.time()
- self.description = ''
-
- def release(self):
- self.wait_condition.acquire()
- self.wait_condition.notify()
- self.waiting = False
- self.log.debug("Build %s released" % self.unique)
- self.wait_condition.release()
-
- def isWaiting(self):
- self.wait_condition.acquire()
- if self.waiting:
- ret = True
- else:
- ret = False
- self.wait_condition.release()
- return ret
-
- def _wait(self):
- self.wait_condition.acquire()
- self.waiting = True
- self.log.debug("Build %s waiting" % self.unique)
- self.wait_condition.wait()
- self.wait_condition.release()
-
- def run(self):
- data = {
- 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
- 'name': self.name,
- 'number': self.number,
- 'manager': self.worker.worker_id,
- }
-
- self.job.sendWorkData(json.dumps(data))
- self.job.sendWorkStatus(0, 100)
-
- if self.worker.hold_jobs_in_build:
- self._wait()
- self.log.debug("Build %s continuing" % self.unique)
-
- self.worker.lock.acquire()
-
- result = 'SUCCESS'
- if (('ZUUL_REF' in self.parameters) and
- self.worker.shouldFailTest(self.name,
- self.parameters['ZUUL_REF'])):
- result = 'FAILURE'
- if self.aborted:
- result = 'ABORTED'
-
- data = {'result': result}
- changes = None
- if 'ZUUL_CHANGE_IDS' in self.parameters:
- changes = self.parameters['ZUUL_CHANGE_IDS']
-
- self.worker.build_history.append(
- BuildHistory(name=self.name, number=self.number,
- result=result, changes=changes, node=self.node,
- uuid=self.unique, description=self.description,
- pipeline=self.parameters['ZUUL_PIPELINE'])
- )
-
- self.job.sendWorkComplete(json.dumps(data))
- del self.worker.gearman_jobs[self.job.unique]
- self.worker.running_builds.remove(self)
- self.worker.lock.release()
-
-
-class FakeGearmanServer(gear.Server):
- def __init__(self, port=4730):
- self.hold_jobs_in_queue = False
- super(FakeGearmanServer, self).__init__(port)
-
- def getJobForConnection(self, connection, peek=False):
- for queue in [self.high_queue, self.normal_queue, self.low_queue]:
- for job in queue:
- if not hasattr(job, 'waiting'):
- if job.name.startswith('build:'):
- job.waiting = self.hold_jobs_in_queue
- else:
- job.waiting = False
- if job.waiting:
- continue
- if job.name in connection.functions:
- if not peek:
- queue.remove(job)
- connection.related_jobs[job.handle] = job
- job.worker_connection = connection
- job.running = True
- return job
- return None
-
- def release(self, regex=None):
- released = False
- qlen = (len(self.high_queue) + len(self.normal_queue) +
- len(self.low_queue))
- self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
- for job in self.getQueue():
- cmd, name = job.name.split(':')
- if cmd != 'build':
- continue
- if not regex or re.match(regex, name):
- self.log.debug("releasing queued job %s" %
- job.unique)
- job.waiting = False
- released = True
- else:
- self.log.debug("not releasing queued job %s" %
- job.unique)
- if released:
- self.wakeConnections()
- qlen = (len(self.high_queue) + len(self.normal_queue) +
- len(self.low_queue))
- self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
-
class FakeJob(object):
def __init__(self):
diff --git a/tests/fixtures/default-config.json b/tests/fixtures/default-config.json
new file mode 100644
index 0000000..e33ea42
--- /dev/null
+++ b/tests/fixtures/default-config.json
@@ -0,0 +1,35 @@
+{
+ "zuul_server": {
+ "gerrit_site": "http://review.openstack.org",
+ "zuul_site": "http://localhost",
+ "git_origin": "git://git.openstack.org/",
+ "gearman_host": "localhost",
+ "gearman_port": 0
+ },
+ "debug_log": "/home/josh/var/log/turbo-hipster/debug.log",
+ "jobs_working_dir": "/home/josh/var/lib/turbo-hipster/jobs",
+ "git_working_dir": "/home/josh/var/lib/turbo-hipster/git",
+ "pip_download_cache": "/home/josh/var/cache/pip",
+ "plugins": [
+ {
+ "name": "gate_real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007",
+ "function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007"
+ },
+ {
+ "name": "gate_real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
+ "function": "build:gate-real-db-upgrade_nova_mysql_user_001"
+ },
+ {
+ "name": "shell_script",
+ "function": "build:do_something_shelly"
+ }
+ ],
+ "publish_logs":
+ {
+ "type": "local",
+ "path": "/home/josh/var/www/results/",
+ "prepend_url": "http://localhost/results/"
+ }
+}
diff --git a/tests/test_worker_manager.py b/tests/test_worker_manager.py
index ae2d5b5..1f3de5e 100644
--- a/tests/test_worker_manager.py
+++ b/tests/test_worker_manager.py
@@ -15,58 +15,171 @@
# under the License.
+import gear
+import logging
import os
import testtools
import time
import yaml
-from fakes import FakeZuulManager, FakeGearmanServer,\
- FakeRealDbUpgradeRunner
-CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'etc')
-with open(os.path.join(CONFIG_DIR, 'config.yaml'), 'r') as config_stream:
- CONFIG = yaml.safe_load(config_stream)
+import turbo_hipster.task_plugins.gate_real_db_upgrade.task
+import turbo_hipster.worker_server
+logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s')
+
+
+class TestWithGearman(testtools.TestCase):
+
+ log = logging.getLogger("TestWithGearman")
-class TestZuulManager(testtools.TestCase):
def setUp(self):
- super(TestZuulManager, self).setUp()
- self.config = CONFIG
- self.gearman_server = FakeGearmanServer(
- self.config['zuul_server']['gearman_port'])
- self.config['zuul_server']['gearman_port'] = self.gearman_server.port
+ super(TestWithGearman, self).setUp()
+
+ self.config = []
+ self._load_config_fixture()
- self.task = FakeRealDbUpgradeRunner(self.config,
- self.config['plugins'][0],
- 'test-worker-1', self)
- self.tasks = dict(FakeRealDbUpgradeRunner_worker=self.task)
+ self.gearman_server = gear.Server(0)
- self.gearman_manager = FakeZuulManager(self.config, self.tasks, self)
+ # Grab the port so the clients can connect to it
+ self.config['zuul_server']['gearman_port'] = self.gearman_server.port
+
+ self.worker_server = turbo_hipster.worker_server.Server(self.config)
+ self.worker_server.setup_logging()
+ self.worker_server.start()
+ t0 = time.time()
+ while time.time() - t0 < 10:
+ if self.worker_server.services_started:
+ break
+ time.sleep(0.01)
+ if not self.worker_server.services_started:
+ self.fail("Failed to start worker_service services")
def tearDown(self):
- super(TestZuulManager, self).tearDown()
+ self.worker_server.stop()
self.gearman_server.shutdown()
+ super(TestWithGearman, self).tearDown()
- def test_manager_function_registered(self):
- """ Check the manager is set up correctly and registered with the
- gearman server with an appropriate function """
+ def _load_config_fixture(self, config_name='default-config.json'):
+ config_dir = os.path.join(os.path.dirname(__file__), 'fixtures')
+ with open(os.path.join(config_dir, config_name), 'r') as config_stream:
+ self.config = yaml.safe_load(config_stream)
- # Give the gearman server up to 5 seconds to register the function
- for x in range(500):
- time.sleep(0.01)
- if len(self.gearman_server.functions) > 0:
- break
- hostname = os.uname()[1]
- function_name = 'stop:turbo-hipster-manager-%s' % hostname
+class TestWorkerServer(TestWithGearman):
+ def test_plugins_load(self):
+ "Test the configured plugins are loaded"
+
+ self.assertFalse(self.worker_server.stopped())
+ self.assertEqual(3, len(self.worker_server.plugins))
+
+ plugin0_config = {
+ "name": "gate_real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/datasets_devstack_131007",
+ "function": "build:gate-real-db-upgrade_nova_mysql_devstack_131007"
+ }
+ plugin1_config = {
+ "name": "gate_real_db_upgrade",
+ "datasets_dir": "/var/lib/turbo-hipster/datasets_user_001",
+ "function": "build:gate-real-db-upgrade_nova_mysql_user_001"
+ }
+ plugin2_config = {
+ "name": "shell_script",
+ "function": "build:do_something_shelly"
+ }
+
+ self.assertEqual(plugin0_config,
+ self.worker_server.plugins[0]['plugin_config'])
+ self.assertEqual(
+ 'turbo_hipster.task_plugins.gate_real_db_upgrade.task',
+ self.worker_server.plugins[0]['module'].__name__
+ )
+
+ self.assertEqual(plugin1_config,
+ self.worker_server.plugins[1]['plugin_config'])
+ self.assertEqual(
+ 'turbo_hipster.task_plugins.gate_real_db_upgrade.task',
+ self.worker_server.plugins[1]['module'].__name__
+ )
+
+ self.assertEqual(plugin2_config,
+ self.worker_server.plugins[2]['plugin_config'])
+ self.assertEqual(
+ 'turbo_hipster.task_plugins.shell_script.task',
+ self.worker_server.plugins[2]['module'].__name__
+ )
- self.assertIn(function_name, self.gearman_server.functions)
+ def test_zuul_client_started(self):
+ "Test the zuul client has been started"
+ self.assertFalse(self.worker_server.zuul_client.stopped())
- def test_task_registered_with_manager(self):
- """ Check the FakeRealDbUpgradeRunner_worker task is registered """
- self.assertIn('FakeRealDbUpgradeRunner_worker',
- self.gearman_manager.tasks.keys())
+ def test_zuul_manager_started(self):
+ "Test the zuul manager has been started"
+ self.assertFalse(self.worker_server.zuul_manager.stopped())
- def test_stop_task(self):
- """ Check that the manager successfully stops a task when requested
- """
+
+class TestZuulClient(TestWithGearman):
+ def test_setup_gearman_worker(self):
+ "Make sure the client is registered as a worker with gearman"
+ pass
+
+ def test_registered_functions(self):
+ "Test the correct functions are registered with gearman"
+ # The client should have all of the functions defined in the config
+ # registered with gearman
+
+ # We need to wait for all the functions to register with the server..
+ # We'll give it up to 10seconds to do so
+ t0 = time.time()
+ failed = True
+ while time.time() - t0 < 10:
+ # There should be 4 functions. 1 for each plugin + 1 for the
+ # manager
+ if len(self.gearman_server.functions) == 4:
+ failed = False
+ break
+ time.sleep(0.01)
+ if failed:
+ self.log.debug(self.gearman_server.functions)
+ self.fail("The correct number of functions haven't registered with"
+ " gearman")
+
+ self.assertIn('build:gate-real-db-upgrade_nova_mysql_devstack_131007',
+ self.gearman_server.functions)
+ self.assertIn('build:gate-real-db-upgrade_nova_mysql_user_001',
+ self.gearman_server.functions)
+ self.assertIn('build:do_something_shelly',
+ self.gearman_server.functions)
+
+ def test_waiting_for_job(self):
+ "Make sure the client waits for jobs as expected"
pass
+
+ def test_stop(self):
+ "Test sending a stop signal to the client exists correctly"
+ pass
+
+
+class TestZuulManager(TestWithGearman):
+ def test_registered_functions(self):
+ "Test the correct functions are registered with gearman"
+ # We need to wait for all the functions to register with the server..
+ # We'll give it up to 10seconds to do so
+ t0 = time.time()
+ failed = True
+ while time.time() - t0 < 10:
+ # There should be 4 functions. 1 for each plugin + 1 for the
+ # manager
+ if len(self.gearman_server.functions) == 4:
+ failed = False
+ break
+ time.sleep(0.01)
+ if failed:
+ self.log.debug(self.gearman_server.functions)
+ self.fail("The correct number of functions haven't registered with"
+ " gearman")
+
+ hostname = os.uname()[1]
+ self.assertIn('stop:turbo-hipster-manager-%s' % hostname,
+ self.gearman_server.functions)
diff --git a/tox.ini b/tox.ini
index f0e38f0..99af832 100644
--- a/tox.ini
+++ b/tox.ini
@@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
-envlist = pep8, pyflakes, py27
+envlist = pep8, py27
[testenv]
setenv = VIRTUAL_ENV={envdir}
@@ -28,9 +28,6 @@ commands = flake8
commands =
python setup.py testr --coverage
-[testenv:pyflakes]
-commands = pyflakes turbo_hipster setup.py
-
[testenv:venv]
commands = {posargs}
diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py
index 9cbf4e3..6dc5e37 100644
--- a/turbo_hipster/cmd/server.py
+++ b/turbo_hipster/cmd/server.py
@@ -35,7 +35,13 @@ def main(args):
with open(args.config, 'r') as config_stream:
config = yaml.safe_load(config_stream)
+ if not config['debug_log']:
+ # NOTE(mikal): debug logging _must_ be enabled for the log writing
+ # in lib.utils.execute_to_log to work correctly.
+ raise Exception('Debug log not configured')
+
server = worker_server.Server(config)
+ server.setup_logging(config['debug_log'])
def term_handler(signum, frame):
server.stop()
diff --git a/turbo_hipster/worker_manager.py b/turbo_hipster/worker_manager.py
index 3923d35..a220425 100644
--- a/turbo_hipster/worker_manager.py
+++ b/turbo_hipster/worker_manager.py
@@ -57,6 +57,7 @@ class ZuulManager(threading.Thread):
self._stop.set()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
+ self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
@@ -76,8 +77,10 @@ class ZuulManager(threading.Thread):
self.current_step = 0
job = self.gearman_worker.getJob()
self._handle_job(job)
+ except gear.InterruptedError:
+ self.log.debug('We were asked to stop waiting for jobs')
except:
- logging.exception('Exception retrieving log event.')
+ self.log.exception('Unknown exception waiting for job.')
self.log.debug("Finished manager thread")
def _handle_job(self, job):
@@ -137,6 +140,7 @@ class ZuulClient(threading.Thread):
task.stop_working()
# Unblock gearman
self.log.debug("Telling gearman to stop waiting for jobs")
+ self.gearman_worker.stopWaitingForJobs()
self.gearman_worker.shutdown()
def stopped(self):
@@ -155,8 +159,10 @@ class ZuulClient(threading.Thread):
self.log.debug("Waiting for job")
self.job = self.gearman_worker.getJob()
self._handle_job()
+ except gear.InterruptedError:
+ self.log.debug('We were asked to stop waiting for jobs')
except:
- self.log.exception('Exception waiting for job.')
+ self.log.exception('Unknown exception waiting for job.')
self.log.debug("Finished client thread")
def _handle_job(self):
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index a0f3cf2..66225ff 100755
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -78,17 +78,14 @@ class Server(threading.Thread):
self.log.warn("conf_d parameter '%s' isn't a directory" %
(self.config["conf_d"]))
- def setup_logging(self):
- if not self.debug_log:
- raise Exception('Debug log not configured')
-
- # NOTE(mikal): debug logging _must_ be enabled for the log writing
- # in lib.utils.execute_to_log to work correctly.
- if not os.path.isdir(os.path.dirname(self.debug_log)):
- os.makedirs(os.path.dirname(self.debug_log))
+ def setup_logging(self, log_file=None):
+ if log_file:
+ if not os.path.isdir(os.path.dirname(log_file)):
+ os.makedirs(os.path.dirname(log_file))
logging.basicConfig(format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s',
- filename=self.debug_log, level=logging.DEBUG)
+ filename=log_file,
+ level=logging.DEBUG)
def load_plugins(self):
""" Load the available plugins from task_plugins """