summaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorroberto@debian32 <roberto@debian32>2011-08-04 16:25:17 +0200
committerroberto@debian32 <roberto@debian32>2011-08-04 16:25:17 +0200
commit8c1b48a88d545bbb9ec38d4acc3cbd5b06638535 (patch)
treeaecf3563f5239824a4e1c2c9920f4930ead1bdd5 /contrib
parent9bd3022a5ee120c5f45755058edaca4c55e215f0 (diff)
downloaduwsgi-8c1b48a88d545bbb9ec38d4acc3cbd5b06638535.tar.gz
a new kick-ass feature for the spooler
Diffstat (limited to 'contrib')
-rw-r--r--contrib/spoolqueue/producer.py11
-rw-r--r--contrib/spoolqueue/tasks.py9
-rw-r--r--contrib/spoolqueue/tasksconsumer.py44
3 files changed, 64 insertions, 0 deletions
diff --git a/contrib/spoolqueue/producer.py b/contrib/spoolqueue/producer.py
new file mode 100644
index 00000000..ac6008a7
--- /dev/null
+++ b/contrib/spoolqueue/producer.py
@@ -0,0 +1,11 @@
+from tasksconsumer import enqueue
+
+def application(env, sr):
+
+ sr('200 OK', [('Content-Type','text/html')])
+
+ enqueue(queue='fast', pippo='pluto')
+
+ return "Task enqueued"
+
+
diff --git a/contrib/spoolqueue/tasks.py b/contrib/spoolqueue/tasks.py
new file mode 100644
index 00000000..5d6f55f0
--- /dev/null
+++ b/contrib/spoolqueue/tasks.py
@@ -0,0 +1,9 @@
+from tasksconsumer import *
+
+@queueconsumer('fast', 4)
+def fast_queue(arguments):
+ print "fast", arguments
+
+@queueconsumer('slow')
+def slow_queue(arguments):
+ print "foobar", arguments
diff --git a/contrib/spoolqueue/tasksconsumer.py b/contrib/spoolqueue/tasksconsumer.py
new file mode 100644
index 00000000..3647670b
--- /dev/null
+++ b/contrib/spoolqueue/tasksconsumer.py
@@ -0,0 +1,44 @@
+from uwsgidecorators import *
+import Queue
+from threading import Thread
+
+queues = {}
+
+class queueconsumer(object):
+
+ def __init__(self, name, num=1, **kwargs):
+ self.name = name
+ self.num = num
+ self.queue = Queue.Queue()
+ self.threads = []
+ self.func = None
+ queues[self.name] = self
+
+
+ @staticmethod
+ def consumer(self):
+ while True:
+ req = self.queue.get()
+ print req
+ self.func(req)
+ self.queue.task_done()
+
+ def __call__(self, f):
+ self.func = f
+ for i in range(self.num):
+ t = Thread(target=self.consumer,args=(self,))
+ self.threads.append(t)
+ t.daemon = True
+ t.start()
+
+@spool
+def spooler_enqueuer(arguments):
+ if 'queue' in arguments:
+ queue = arguments['queue']
+ queues[queue].queue.put(arguments)
+ else:
+ raise Exception("You have to specify a queue name")
+
+
+def enqueue(*args, **kwargs):
+ return spooler_enqueuer.spool(*args, **kwargs)