summaryrefslogtreecommitdiff
path: root/tests/fakes.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/fakes.py')
-rw-r--r--tests/fakes.py286
1 files changed, 0 insertions, 286 deletions
diff --git a/tests/fakes.py b/tests/fakes.py
index 07bac13..5f78fbf 100644
--- a/tests/fakes.py
+++ b/tests/fakes.py
@@ -14,292 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import gear
-import json
-import threading
-import os
-import re
-import time
-
-from turbo_hipster.worker_manager import ZuulManager
-from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\
- as RealDbUpgradeRunner
-
-
-class FakeZuulManager(ZuulManager):
- def __init__(self, config, tasks, test):
- self.test = test
- super(FakeZuulManager, self).__init__(config, tasks)
-
- def setup_gearman(self):
- hostname = os.uname()[1]
- self.gearman_worker = FakeWorker('turbo-hipster-manager-%s'
- % hostname, self.test)
- self.gearman_worker.addServer(
- self.config['zuul_server']['gearman_host'],
- self.config['zuul_server']['gearman_port']
- )
- self.gearman_worker.registerFunction(
- 'stop:turbo-hipster-manager-%s' % hostname)
-
-
-class FakeWorker(gear.Worker):
- def __init__(self, worker_id, test):
- super(FakeWorker, self).__init__(worker_id)
- self.gearman_jobs = {}
- self.build_history = []
- self.running_builds = []
- self.build_counter = 0
- self.fail_tests = {}
- self.test = test
-
- self.hold_jobs_in_build = False
- self.lock = threading.Lock()
- self.__work_thread = threading.Thread(target=self.work)
- self.__work_thread.daemon = True
- self.__work_thread.start()
-
- def handleJob(self, job):
- parts = job.name.split(":")
- cmd = parts[0]
- name = parts[1]
- if len(parts) > 2:
- node = parts[2]
- else:
- node = None
- if cmd == 'build':
- self.handleBuild(job, name, node)
- elif cmd == 'stop':
- self.handleStop(job, name)
- elif cmd == 'set_description':
- self.handleSetDescription(job, name)
-
- def handleBuild(self, job, name, node):
- build = FakeBuild(self, job, self.build_counter, node)
- job.build = build
- self.gearman_jobs[job.unique] = job
- self.build_counter += 1
-
- self.running_builds.append(build)
- build.start()
-
- def handleStop(self, job, name):
- self.log.debug("handle stop")
- parameters = json.loads(job.arguments)
- name = parameters['name']
- number = parameters['number']
- for build in self.running_builds:
- if build.name == name and build.number == number:
- build.aborted = True
- build.release()
- job.sendWorkComplete()
- return
- job.sendWorkFail()
-
- def handleSetDescription(self, job, name):
- self.log.debug("handle set description")
- parameters = json.loads(job.arguments)
- name = parameters['name']
- number = parameters['number']
- descr = parameters['html_description']
- for build in self.running_builds:
- if build.name == name and build.number == number:
- build.description = descr
- job.sendWorkComplete()
- return
- for build in self.build_history:
- if build.name == name and build.number == number:
- build.description = descr
- job.sendWorkComplete()
- return
- job.sendWorkFail()
-
- def work(self):
- while self.running:
- try:
- job = self.getJob()
- except gear.InterruptedError:
- continue
- try:
- self.handleJob(job)
- except:
- self.log.exception("Worker exception:")
-
- def addFailTest(self, name, change):
- l = self.fail_tests.get(name, [])
- l.append(change)
- self.fail_tests[name] = l
-
- def shouldFailTest(self, name, ref):
- l = self.fail_tests.get(name, [])
- for change in l:
- if self.test.ref_has_change(ref, change):
- return True
- return False
-
- def release(self, regex=None):
- builds = self.running_builds[:]
- self.log.debug("releasing build %s (%s)" % (regex,
- len(self.running_builds)))
- for build in builds:
- if not regex or re.match(regex, build.name):
- self.log.debug("releasing build %s" %
- (build.parameters['ZUUL_UUID']))
- build.release()
- else:
- self.log.debug("not releasing build %s" %
- (build.parameters['ZUUL_UUID']))
- self.log.debug("done releasing builds %s (%s)" %
- (regex, len(self.running_builds)))
-
-
-class FakeRealDbUpgradeRunner(RealDbUpgradeRunner):
- def __init__(self, global_config, plugin_config, worker_name, test):
- self.test = test
- super(FakeRealDbUpgradeRunner, self).__init__(global_config,
- plugin_config,
- worker_name)
-
-
-class BuildHistory(object):
- def __init__(self, **kw):
- self.__dict__.update(kw)
-
- def __repr__(self):
- return ("<Completed build, result: %s name: %s #%s changes: %s>" %
- (self.result, self.name, self.number, self.changes))
-
-
-class FakeBuild(threading.Thread):
- def __init__(self, worker, job, number, node):
- threading.Thread.__init__(self)
- self.daemon = True
- self.worker = worker
- self.job = job
- self.name = job.name.split(':')[1]
- self.number = number
- self.node = node
- self.parameters = json.loads(job.arguments)
- self.unique = self.parameters['ZUUL_UUID']
- self.wait_condition = threading.Condition()
- self.waiting = False
- self.aborted = False
- self.created = time.time()
- self.description = ''
-
- def release(self):
- self.wait_condition.acquire()
- self.wait_condition.notify()
- self.waiting = False
- self.log.debug("Build %s released" % self.unique)
- self.wait_condition.release()
-
- def isWaiting(self):
- self.wait_condition.acquire()
- if self.waiting:
- ret = True
- else:
- ret = False
- self.wait_condition.release()
- return ret
-
- def _wait(self):
- self.wait_condition.acquire()
- self.waiting = True
- self.log.debug("Build %s waiting" % self.unique)
- self.wait_condition.wait()
- self.wait_condition.release()
-
- def run(self):
- data = {
- 'url': 'https://server/job/%s/%s/' % (self.name, self.number),
- 'name': self.name,
- 'number': self.number,
- 'manager': self.worker.worker_id,
- }
-
- self.job.sendWorkData(json.dumps(data))
- self.job.sendWorkStatus(0, 100)
-
- if self.worker.hold_jobs_in_build:
- self._wait()
- self.log.debug("Build %s continuing" % self.unique)
-
- self.worker.lock.acquire()
-
- result = 'SUCCESS'
- if (('ZUUL_REF' in self.parameters) and
- self.worker.shouldFailTest(self.name,
- self.parameters['ZUUL_REF'])):
- result = 'FAILURE'
- if self.aborted:
- result = 'ABORTED'
-
- data = {'result': result}
- changes = None
- if 'ZUUL_CHANGE_IDS' in self.parameters:
- changes = self.parameters['ZUUL_CHANGE_IDS']
-
- self.worker.build_history.append(
- BuildHistory(name=self.name, number=self.number,
- result=result, changes=changes, node=self.node,
- uuid=self.unique, description=self.description,
- pipeline=self.parameters['ZUUL_PIPELINE'])
- )
-
- self.job.sendWorkComplete(json.dumps(data))
- del self.worker.gearman_jobs[self.job.unique]
- self.worker.running_builds.remove(self)
- self.worker.lock.release()
-
-
-class FakeGearmanServer(gear.Server):
- def __init__(self, port=4730):
- self.hold_jobs_in_queue = False
- super(FakeGearmanServer, self).__init__(port)
-
- def getJobForConnection(self, connection, peek=False):
- for queue in [self.high_queue, self.normal_queue, self.low_queue]:
- for job in queue:
- if not hasattr(job, 'waiting'):
- if job.name.startswith('build:'):
- job.waiting = self.hold_jobs_in_queue
- else:
- job.waiting = False
- if job.waiting:
- continue
- if job.name in connection.functions:
- if not peek:
- queue.remove(job)
- connection.related_jobs[job.handle] = job
- job.worker_connection = connection
- job.running = True
- return job
- return None
-
- def release(self, regex=None):
- released = False
- qlen = (len(self.high_queue) + len(self.normal_queue) +
- len(self.low_queue))
- self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
- for job in self.getQueue():
- cmd, name = job.name.split(':')
- if cmd != 'build':
- continue
- if not regex or re.match(regex, name):
- self.log.debug("releasing queued job %s" %
- job.unique)
- job.waiting = False
- released = True
- else:
- self.log.debug("not releasing queued job %s" %
- job.unique)
- if released:
- self.wakeConnections()
- qlen = (len(self.high_queue) + len(self.normal_queue) +
- len(self.low_queue))
- self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
-
class FakeJob(object):
def __init__(self):