diff options
author | roberto@debian32 <roberto@debian32> | 2011-08-04 16:25:17 +0200 |
---|---|---|
committer | roberto@debian32 <roberto@debian32> | 2011-08-04 16:25:17 +0200 |
commit | 8c1b48a88d545bbb9ec38d4acc3cbd5b06638535 (patch) | |
tree | aecf3563f5239824a4e1c2c9920f4930ead1bdd5 /contrib | |
parent | 9bd3022a5ee120c5f45755058edaca4c55e215f0 (diff) | |
download | uwsgi-8c1b48a88d545bbb9ec38d4acc3cbd5b06638535.tar.gz |
a new kick-ass feature for the spooler
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/spoolqueue/producer.py | 11 | ||||
-rw-r--r-- | contrib/spoolqueue/tasks.py | 9 | ||||
-rw-r--r-- | contrib/spoolqueue/tasksconsumer.py | 44 |
3 files changed, 64 insertions, 0 deletions
diff --git a/contrib/spoolqueue/producer.py b/contrib/spoolqueue/producer.py new file mode 100644 index 00000000..ac6008a7 --- /dev/null +++ b/contrib/spoolqueue/producer.py @@ -0,0 +1,11 @@ +from tasksconsumer import enqueue + +def application(env, sr): + + sr('200 OK', [('Content-Type','text/html')]) + + enqueue(queue='fast', pippo='pluto') + + return "Task enqueued" + + diff --git a/contrib/spoolqueue/tasks.py b/contrib/spoolqueue/tasks.py new file mode 100644 index 00000000..5d6f55f0 --- /dev/null +++ b/contrib/spoolqueue/tasks.py @@ -0,0 +1,9 @@ +from tasksconsumer import * + +@queueconsumer('fast', 4) +def fast_queue(arguments): + print "fast", arguments + +@queueconsumer('slow') +def slow_queue(arguments): + print "foobar", arguments diff --git a/contrib/spoolqueue/tasksconsumer.py b/contrib/spoolqueue/tasksconsumer.py new file mode 100644 index 00000000..3647670b --- /dev/null +++ b/contrib/spoolqueue/tasksconsumer.py @@ -0,0 +1,44 @@ +from uwsgidecorators import * +import Queue +from threading import Thread + +queues = {} + +class queueconsumer(object): + + def __init__(self, name, num=1, **kwargs): + self.name = name + self.num = num + self.queue = Queue.Queue() + self.threads = [] + self.func = None + queues[self.name] = self + + + @staticmethod + def consumer(self): + while True: + req = self.queue.get() + print req + self.func(req) + self.queue.task_done() + + def __call__(self, f): + self.func = f + for i in range(self.num): + t = Thread(target=self.consumer,args=(self,)) + self.threads.append(t) + t.daemon = True + t.start() + +@spool +def spooler_enqueuer(arguments): + if 'queue' in arguments: + queue = arguments['queue'] + queues[queue].queue.put(arguments) + else: + raise Exception("You have to specify a queue name") + + +def enqueue(*args, **kwargs): + return spooler_enqueuer.spool(*args, **kwargs) |