summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Hesketh <josh@nitrotech.org>2014-03-05 14:15:31 +1100
committerJoshua Hesketh <josh@nitrotech.org>2014-03-05 14:30:45 +1100
commit1d8c4e5375fe3d6aefc6c4e47a60dcc2894de14f (patch)
treec1a64cb87c04fd902acbcbc94a82728a3c7d79b8
parent81b5fb85317e5b346876c5814d75d2a1fa6af7fe (diff)
downloadturbo-hipster-1d8c4e5375fe3d6aefc6c4e47a60dcc2894de14f.tar.gz
Thread the server object
Instead of the server being the main process allow it to be threaded to make testing and importing easier. Change-Id: I5950b29188ed41a280f467f06bb4d3eb1f65b12a
-rw-r--r--turbo_hipster/cmd/server.py44
-rwxr-xr-xturbo_hipster/worker_server.py46
2 files changed, 51 insertions, 39 deletions
diff --git a/turbo_hipster/cmd/server.py b/turbo_hipster/cmd/server.py
index 44d2604..d54978d 100644
--- a/turbo_hipster/cmd/server.py
+++ b/turbo_hipster/cmd/server.py
@@ -20,6 +20,7 @@ import daemon
import extras
import json
import os
+import signal
import sys
from turbo_hipster import worker_server
@@ -29,7 +30,32 @@ from turbo_hipster import worker_server
PID_FILE_MODULE = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
-def main():
+def main(args):
+
+ with open(args.config, 'r') as config_stream:
+ config = json.load(config_stream)
+
+ server = worker_server.Server(config)
+
+ def term_handler(signum, frame):
+ server.stop()
+ signal.signal(signal.SIGTERM, term_handler)
+
+ if args.background:
+ server.daemon = True
+ server.start()
+
+ while not server.stopped():
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print "Ctrl + C: asking tasks to exit nicely...\n"
+ server.stop()
+
+
+if __name__ == '__main__':
+ sys.path.insert(0, os.path.abspath(
+ os.path.join(os.path.dirname(__file__), '../')))
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config',
default=
@@ -42,21 +68,9 @@ def main():
'turbo-hipster-worker-server.pid',
help='PID file to lock during daemonization.')
args = parser.parse_args()
-
- with open(args.config, 'r') as config_stream:
- config = json.load(config_stream)
-
- server = worker_server.Server(config)
-
if args.background:
pidfile = PID_FILE_MODULE.TimeoutPIDLockFile(args.pidfile, 10)
with daemon.DaemonContext(pidfile=pidfile):
- server.main()
+ main(args)
else:
- server.main()
-
-
-if __name__ == '__main__':
- sys.path.insert(0, os.path.abspath(
- os.path.join(os.path.dirname(__file__), '../')))
- main()
+ main(args)
diff --git a/turbo_hipster/worker_server.py b/turbo_hipster/worker_server.py
index dd13c08..fa6f7c0 100755
--- a/turbo_hipster/worker_server.py
+++ b/turbo_hipster/worker_server.py
@@ -20,21 +20,23 @@ task_plugins. """
import logging
import os
-import signal
-import sys
+import threading
import worker_manager
-class Server(object):
+class Server(threading.Thread):
""" This is the worker server object to be daemonized """
log = logging.getLogger("worker_server.Server")
def __init__(self, config):
+ super(Server, self).__init__()
+ self._stop = threading.Event()
self.config = config
# Python logging output file.
self.debug_log = self.config['debug_log']
+ self.setup_logging()
# Config init
self.zuul_manager = None
@@ -56,7 +58,8 @@ class Server(object):
# in lib.utils.execute_to_log to work correctly.
if not os.path.isdir(os.path.dirname(self.debug_log)):
os.makedirs(os.path.dirname(self.debug_log))
- logging.basicConfig(format='%(asctime)s %(name)s %(message)s',
+ logging.basicConfig(format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s',
filename=self.debug_log, level=logging.DEBUG)
def load_plugins(self):
@@ -73,9 +76,9 @@ class Server(object):
})
self.log.debug('Plugin %s loaded' % plugin['name'])
- def start_gearman_workers(self):
+ def start_zuul_client(self):
""" Run the tasks """
- self.log.debug('Starting gearman workers')
+ self.log.debug('Starting zuul client')
self.zuul_client = worker_manager.ZuulClient(self.config,
self.worker_name)
@@ -92,28 +95,23 @@ class Server(object):
self.tasks[job_name])
self.zuul_client.register_functions()
- self.zuul_client.daemon = True
self.zuul_client.start()
+ def start_zuul_manager(self):
self.zuul_manager = worker_manager.ZuulManager(self.config, self.tasks)
- self.zuul_manager.daemon = True
self.zuul_manager.start()
- def exit_handler(self, signum):
+ def stop(self):
+ self._stop.set()
self.log.debug('Exiting...')
- signal.signal(signal.SIGUSR1, signal.SIG_IGN)
- for task_name, task in self.tasks.items():
- task.stop()
- self.manager.stop()
- sys.exit(0)
+ self.zuul_client.stop()
+ self.zuul_manager.stop()
- def main(self):
- self.setup_logging()
- self.start_gearman_workers()
-
- while True:
- try:
- signal.pause()
- except KeyboardInterrupt:
- print "Ctrl + C: asking tasks to exit nicely...\n"
- self.exit_handler(signal.SIGINT)
+ def stopped(self):
+ return self._stop.isSet()
+
+ def run(self):
+ self.start_zuul_client()
+ self.start_zuul_manager()
+ while not self.stopped():
+ self._stop.wait()