summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-28 16:38:42 +0000
committerPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-28 16:38:42 +0000
commit425015014eab514b377e6454771540d284e48a58 (patch)
tree933e1f2a72b6c86d0b3f2adf89524cb579b49529
parent8a8b0d7110841f602559d9511abf75da1bf74eed (diff)
downloadmorph-425015014eab514b377e6454771540d284e48a58.tar.gz
Start creating RequestsManager to listen to requests
Change-Id: I5b660939333c57a6a47f384dc4f346a48334ffe9
-rw-r--r--gear/client.py71
1 files changed, 61 insertions, 10 deletions
diff --git a/gear/client.py b/gear/client.py
index dea67a0d..9dc1df60 100644
--- a/gear/client.py
+++ b/gear/client.py
@@ -3,6 +3,8 @@ import sys
import json
import threading
import requests
+import time
+import signal
import urlparse
import distbuild
#TODO: values from settings
@@ -263,17 +265,20 @@ class BuilderClient(gear.Client):
job.data = []
return job
-class RequestsController():
+class RequestsController(threading.Thread):
def __init__(self):
+ super(RequestsController, self).__init__()
self.building_list = []
self.next_id = 1
self.new_request_lock = threading.Lock()
self.lock_queue = threading.Condition()
self.build_requests = []
self.build_status_lock = threading.Lock()
+ self.stopped = False
def add_request(self, request):
json_request = json.dumps(request)
+ print "DEBUG: adding request - %s" % json_request
request_data = {}
with self.new_request_lock:
request_data['id'] = self.next_id
@@ -325,20 +330,66 @@ class RequestsController():
artifact.state = BUILDING
print "TO %s: Artifact %s building" % (request['id'],artifact.name)
- #consider chunks case
+ def shutdown(self):
+ self.stopped = True
+ with self.lock_queue:
+ self.lock_queue.notify()
- def loop(self):
- while True:
+ def run(self):
+ while not self.stopped:
with self.lock_queue:
- self.lock_queue.wait(20)
- self.queue_if_possible()
+ self.lock_queue.wait()
+ if not self.stopped:
+ self.queue_if_possible()
+
+
+class RequestsManager(threading.Thread):
+ def __init__(self, requests_controller):
+ super(RequestsManager, self).__init__()
+ self.requests_controller = requests_controller
+ self.worker = gear.Worker('controller')
+ self.worker.addServer('localhost')
+ self.worker.registerFunction("build-request")
+ self.stopped = False
+
+ def run(self):
+ while not self.stopped:
+ try:
+ print "DEBUG: Waiting for job"
+ job = self.worker.getJob()
+ self._handle_job(job)
+ except gear.InterruptedError:
+ print 'We were asked to stop waiting for jobs'
+
+ def shutdown(self):
+ self.stopped = True
+ self.worker.stopWaitingForJobs()
+
+ def _handle_job(self, job):
+ build_request=json.loads(job.arguments)
+ self.requests_controller.add_request(build_request)
request = {}
request['repo'] = "baserock:baserock/definitions"
request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2"
request['system'] = "systems/minimal-system-x86_64-generic.morph"
-requests_controller = RequestsController()
-requests_controller.add_request(request)
-requests_controller.add_request(request)
-requests_controller.loop()
+# Command line
+requests_controller = RequestsController()
+requests_manager = RequestsManager(requests_controller)
+
+def term_handler(signum, frame):
+ requests_manager.shutdown()
+ requests_controller.shutdown()
+signal.signal(signal.SIGTERM, term_handler)
+
+requests_controller.start()
+requests_manager.start()
+
+while not requests_controller.stopped:
+ try:
+ time.sleep(3)
+ except KeyboardInterrupt:
+ print "Ctrl + C: asking tasks to exit nicely...\n"
+ requests_manager.shutdown()
+ requests_controller.shutdown()