diff options
Diffstat (limited to 'tests/fakes.py')
-rw-r--r-- | tests/fakes.py | 286 |
1 files changed, 0 insertions, 286 deletions
diff --git a/tests/fakes.py b/tests/fakes.py index 07bac13..5f78fbf 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -14,292 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import gear -import json -import threading -import os -import re -import time - -from turbo_hipster.worker_manager import ZuulManager -from turbo_hipster.task_plugins.gate_real_db_upgrade.task import Runner\ - as RealDbUpgradeRunner - - -class FakeZuulManager(ZuulManager): - def __init__(self, config, tasks, test): - self.test = test - super(FakeZuulManager, self).__init__(config, tasks) - - def setup_gearman(self): - hostname = os.uname()[1] - self.gearman_worker = FakeWorker('turbo-hipster-manager-%s' - % hostname, self.test) - self.gearman_worker.addServer( - self.config['zuul_server']['gearman_host'], - self.config['zuul_server']['gearman_port'] - ) - self.gearman_worker.registerFunction( - 'stop:turbo-hipster-manager-%s' % hostname) - - -class FakeWorker(gear.Worker): - def __init__(self, worker_id, test): - super(FakeWorker, self).__init__(worker_id) - self.gearman_jobs = {} - self.build_history = [] - self.running_builds = [] - self.build_counter = 0 - self.fail_tests = {} - self.test = test - - self.hold_jobs_in_build = False - self.lock = threading.Lock() - self.__work_thread = threading.Thread(target=self.work) - self.__work_thread.daemon = True - self.__work_thread.start() - - def handleJob(self, job): - parts = job.name.split(":") - cmd = parts[0] - name = parts[1] - if len(parts) > 2: - node = parts[2] - else: - node = None - if cmd == 'build': - self.handleBuild(job, name, node) - elif cmd == 'stop': - self.handleStop(job, name) - elif cmd == 'set_description': - self.handleSetDescription(job, name) - - def handleBuild(self, job, name, node): - build = FakeBuild(self, job, self.build_counter, node) - job.build = build - self.gearman_jobs[job.unique] = job - self.build_counter += 1 - - self.running_builds.append(build) - build.start() - - def handleStop(self, job, name): - self.log.debug("handle stop") - parameters = json.loads(job.arguments) - name = parameters['name'] - number = parameters['number'] - for build in self.running_builds: - if build.name == name and build.number == number: - build.aborted = True - build.release() - job.sendWorkComplete() - return - job.sendWorkFail() - - def handleSetDescription(self, job, name): - self.log.debug("handle set description") - parameters = json.loads(job.arguments) - name = parameters['name'] - number = parameters['number'] - descr = parameters['html_description'] - for build in self.running_builds: - if build.name == name and build.number == number: - build.description = descr - job.sendWorkComplete() - return - for build in self.build_history: - if build.name == name and build.number == number: - build.description = descr - job.sendWorkComplete() - return - job.sendWorkFail() - - def work(self): - while self.running: - try: - job = self.getJob() - except gear.InterruptedError: - continue - try: - self.handleJob(job) - except: - self.log.exception("Worker exception:") - - def addFailTest(self, name, change): - l = self.fail_tests.get(name, []) - l.append(change) - self.fail_tests[name] = l - - def shouldFailTest(self, name, ref): - l = self.fail_tests.get(name, []) - for change in l: - if self.test.ref_has_change(ref, change): - return True - return False - - def release(self, regex=None): - builds = self.running_builds[:] - self.log.debug("releasing build %s (%s)" % (regex, - len(self.running_builds))) - for build in builds: - if not regex or re.match(regex, build.name): - self.log.debug("releasing build %s" % - (build.parameters['ZUUL_UUID'])) - build.release() - else: - self.log.debug("not releasing build %s" % - (build.parameters['ZUUL_UUID'])) - self.log.debug("done releasing builds %s (%s)" % - (regex, len(self.running_builds))) - - -class FakeRealDbUpgradeRunner(RealDbUpgradeRunner): - def __init__(self, global_config, plugin_config, worker_name, test): - self.test = test - super(FakeRealDbUpgradeRunner, self).__init__(global_config, - plugin_config, - worker_name) - - -class BuildHistory(object): - def __init__(self, **kw): - self.__dict__.update(kw) - - def __repr__(self): - return ("<Completed build, result: %s name: %s #%s changes: %s>" % - (self.result, self.name, self.number, self.changes)) - - -class FakeBuild(threading.Thread): - def __init__(self, worker, job, number, node): - threading.Thread.__init__(self) - self.daemon = True - self.worker = worker - self.job = job - self.name = job.name.split(':')[1] - self.number = number - self.node = node - self.parameters = json.loads(job.arguments) - self.unique = self.parameters['ZUUL_UUID'] - self.wait_condition = threading.Condition() - self.waiting = False - self.aborted = False - self.created = time.time() - self.description = '' - - def release(self): - self.wait_condition.acquire() - self.wait_condition.notify() - self.waiting = False - self.log.debug("Build %s released" % self.unique) - self.wait_condition.release() - - def isWaiting(self): - self.wait_condition.acquire() - if self.waiting: - ret = True - else: - ret = False - self.wait_condition.release() - return ret - - def _wait(self): - self.wait_condition.acquire() - self.waiting = True - self.log.debug("Build %s waiting" % self.unique) - self.wait_condition.wait() - self.wait_condition.release() - - def run(self): - data = { - 'url': 'https://server/job/%s/%s/' % (self.name, self.number), - 'name': self.name, - 'number': self.number, - 'manager': self.worker.worker_id, - } - - self.job.sendWorkData(json.dumps(data)) - self.job.sendWorkStatus(0, 100) - - if self.worker.hold_jobs_in_build: - self._wait() - self.log.debug("Build %s continuing" % self.unique) - - self.worker.lock.acquire() - - result = 'SUCCESS' - if (('ZUUL_REF' in self.parameters) and - self.worker.shouldFailTest(self.name, - self.parameters['ZUUL_REF'])): - result = 'FAILURE' - if self.aborted: - result = 'ABORTED' - - data = {'result': result} - changes = None - if 'ZUUL_CHANGE_IDS' in self.parameters: - changes = self.parameters['ZUUL_CHANGE_IDS'] - - self.worker.build_history.append( - BuildHistory(name=self.name, number=self.number, - result=result, changes=changes, node=self.node, - uuid=self.unique, description=self.description, - pipeline=self.parameters['ZUUL_PIPELINE']) - ) - - self.job.sendWorkComplete(json.dumps(data)) - del self.worker.gearman_jobs[self.job.unique] - self.worker.running_builds.remove(self) - self.worker.lock.release() - - -class FakeGearmanServer(gear.Server): - def __init__(self, port=4730): - self.hold_jobs_in_queue = False - super(FakeGearmanServer, self).__init__(port) - - def getJobForConnection(self, connection, peek=False): - for queue in [self.high_queue, self.normal_queue, self.low_queue]: - for job in queue: - if not hasattr(job, 'waiting'): - if job.name.startswith('build:'): - job.waiting = self.hold_jobs_in_queue - else: - job.waiting = False - if job.waiting: - continue - if job.name in connection.functions: - if not peek: - queue.remove(job) - connection.related_jobs[job.handle] = job - job.worker_connection = connection - job.running = True - return job - return None - - def release(self, regex=None): - released = False - qlen = (len(self.high_queue) + len(self.normal_queue) + - len(self.low_queue)) - self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) - for job in self.getQueue(): - cmd, name = job.name.split(':') - if cmd != 'build': - continue - if not regex or re.match(regex, name): - self.log.debug("releasing queued job %s" % - job.unique) - job.waiting = False - released = True - else: - self.log.debug("not releasing queued job %s" % - job.unique) - if released: - self.wakeConnections() - qlen = (len(self.high_queue) + len(self.normal_queue) + - len(self.low_queue)) - self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen)) - class FakeJob(object): def __init__(self): |