summaryrefslogtreecommitdiff
path: root/js/src/tests/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/tests/workers.py')
-rw-r--r--js/src/tests/workers.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/js/src/tests/workers.py b/js/src/tests/workers.py
new file mode 100644
index 0000000..0535238
--- /dev/null
+++ b/js/src/tests/workers.py
@@ -0,0 +1,91 @@
+# Multiprocess activities with a push-driven divide-process-collect model.
+
+import os, sys, time
+from threading import Thread, Lock
+from Queue import Queue, Empty
+from datetime import datetime
+
+class Source:
+ def __init__(self, task_list, results, verbose = False):
+ self.tasks = Queue()
+ for task in task_list:
+ self.tasks.put_nowait(task)
+
+ self.results = results
+ self.verbose = verbose
+
+ def start(self, worker_count):
+ t0 = datetime.now()
+
+ sink = Sink(self.results)
+ self.workers = [ Worker(_+1, self.tasks, sink, self.verbose) for _ in range(worker_count) ]
+ if self.verbose: print '[P] Starting workers.'
+ for w in self.workers:
+ w.t0 = t0
+ w.start()
+ ans = self.join_workers()
+ if self.verbose: print '[P] Finished.'
+
+ t1 = datetime.now()
+ dt = t1-t0
+
+ return ans
+
+ def join_workers(self):
+ try:
+ for w in self.workers:
+ w.thread.join(20000)
+ return True
+ except KeyboardInterrupt:
+ for w in self.workers:
+ w.stop = True
+ return False
+
+class Sink:
+ def __init__(self, results):
+ self.results = results
+ self.lock = Lock()
+
+ def push(self, result):
+ self.lock.acquire()
+ try:
+ self.results.push(result)
+ finally:
+ self.lock.release()
+
+class Worker(object):
+ def __init__(self, id, tasks, sink, verbose):
+ self.id = id
+ self.tasks = tasks
+ self.sink = sink
+ self.verbose = verbose
+
+ self.thread = None
+ self.stop = False
+
+ def log(self, msg):
+ dd = datetime.now() - self.t0
+ dt = dd.seconds + 1e-6 * dd.microseconds
+
+ if self.verbose:
+ print '[W%d %.3f] %s' % (self.id, dt, msg)
+
+ def start(self):
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ try:
+ while True:
+ if self.stop:
+ break
+ self.log('Get next task.')
+ task = self.tasks.get(False)
+ self.log('Start task %s.'%str(task))
+ result = task()
+ self.log('Finished task.')
+ self.sink.push(result)
+ self.log('Pushed result.')
+ except Empty:
+ pass