summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDonald Stufft <donald@stufft.io>2013-07-24 22:54:05 -0400
committerDonald Stufft <donald@stufft.io>2013-07-24 22:54:05 -0400
commitea1ea42752e5e04ff0f22f152284ccd9518afcc8 (patch)
tree6694b2b9d56e6a5c51ec0b401ab75334fb5abf87
parent3317ec0b3cee60db516909ba407289b0eaf1cb92 (diff)
downloaddecorator-ea1ea42752e5e04ff0f22f152284ccd9518afcc8.tar.gz
Use RQ to handle purging asynchronously
-rw-r--r--store.py111
-rw-r--r--tasks.py41
-rw-r--r--webui.py6
3 files changed, 71 insertions, 87 deletions
diff --git a/store.py b/store.py
index 6fe14f5..e3d2188 100644
--- a/store.py
+++ b/store.py
@@ -28,6 +28,8 @@ import urlparse
import time
from functools import wraps
+import tasks
+
# we import both the old and new (PEP 386) methods of handling versions since
# some version strings are not compatible with the new method and we can fall
# back on the old version
@@ -75,51 +77,6 @@ connection = None
keep_trove = True
-def retry(ExceptionToCheck, tries=4, delay=3, backoff=2, logger=None):
- """Retry calling the decorated function using an exponential backoff.
-
- http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
- original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
-
- :param ExceptionToCheck: the exception to check. may be a tuple of
- exceptions to check
- :type ExceptionToCheck: Exception or tuple
- :param tries: number of times to try (not retry) before giving up
- :type tries: int
- :param delay: initial delay between retries in seconds
- :type delay: int
- :param backoff: backoff multiplier e.g. value of 2 will double the delay
- each retry
- :type backoff: int
- :param logger: logger to use. If None, print
- :type logger: logging.Logger instance
- """
- def deco_retry(f):
-
- @wraps(f)
- def f_retry(*args, **kwargs):
- mtries, mdelay = tries, delay
- while mtries > 1:
- try:
- return f(*args, **kwargs)
- except ExceptionToCheck as e:
- msg = "%s, Retrying in %d seconds..." % (str(e), mdelay)
- if logger:
- logger.warning(msg)
- else:
- print(msg)
- time.sleep(mdelay)
- mtries -= 1
- mdelay *= backoff
- if not mtries:
- raise
- return f(*args, **kwargs)
-
- return f_retry # true decorator
-
- return deco_retry
-
-
def normalize_package_name(n):
"Return lower-cased version of safe_name of n."
return safe_name(n).lower()
@@ -275,7 +232,7 @@ class Store:
XXX update schema info ...
Packages are unique by (name, version).
'''
- def __init__(self, config):
+ def __init__(self, config, queue=None):
self.config = config
self.username = None
self.userip = None
@@ -289,8 +246,16 @@ class Store:
self.true, self.false = 'TRUE', 'FALSE'
self.can_lock = True
+ self.queue = queue
+
self._changed_packages = set()
+ def enqueue(self, func, *args, **kwargs):
+ if self.queue is None:
+ func(*args, **kwargs)
+ else:
+ self.queue.enqueue(func, *args, **kwargs)
+
def last_id(self, tablename):
''' Return an SQL expression that returns the last inserted row,
where the row is in the given table.
@@ -2342,48 +2307,22 @@ class Store:
def _add_invalidation(self, package=None):
self._changed_packages.add(package)
- @retry(Exception, tries=5, delay=1, backoff=1)
def _invalidate_cache(self):
- # Purge all tags from Fastly
if self.config.fastly_api_key:
- api_domain = self.config.fastly_api_domain
- service_id = self.config.fastly_service_id
-
- session = requests.session()
- count = 0
-
- while self._changed_packages and count <= 10:
- count += 1
- purges = {}
-
- headers = {
- "X-Fastly-Key": self.config.fastly_api_key,
- "Accept": "application/json",
- }
-
- for package in set(self._changed_packages):
- if package is None:
- tag = "simple-index"
- else:
- tag = "pkg~%s" % package
-
- # Issue the purge
- url_path = "/service/%s/purge/%s" % (service_id, tag)
- url = urlparse.urljoin(api_domain, url_path)
- resp = session.post(url, headers=headers)
- resp.raise_for_status()
- purges[package] = resp.json()["id"]
-
- for package, pid in purges.iteritems():
- # Ensure that the purge completed successfully
- url = urlparse.urljoin(api_domain, "/purge")
- status = session.get(url, params={"id": pid})
- status.raise_for_status()
-
- if status.json().get("results", {}).get("complete", None):
- self._changed_packages.remove(package)
- else:
- self._changed_packages = set()
+ # Build up a list of tags we want to purge
+ tags = ["pkg~%s" % pkg if pkg is not None else "simple-index"
+ for pkg in self._changed_packages]
+
+ # Enqueue the purge
+ self.enqueue(tasks.purge_fastly_tags,
+ self.config.fastly_api_domain,
+ self.config.fastly_api_key,
+ self.config.fastly_service_id,
+ *tags
+ )
+
+ # Empty our changed packages
+ self._changed_packages = set()
def close(self):
if self._conn is None:
diff --git a/tasks.py b/tasks.py
new file mode 100644
index 0000000..dab4427
--- /dev/null
+++ b/tasks.py
@@ -0,0 +1,41 @@
+import urlparse
+
+import requests
+
+
+def purge_fastly_tags(domain, api_key, service_id, *tags, max_tries=25):
+ session = requests.session()
+ headers = {"X-Fastly-Key": api_key, "Accept": "application/json"}
+
+ all_tags = set(tags)
+ purges = {}
+
+ count = 0
+
+ while all_tags and not count > max_tries:
+ try:
+ for tag in set(all_tags):
+ # Build the URL
+ url_path = "/service/%s/purge/%s" % (service_id, tag)
+ url = urlparse.urljoin(domain, url_path)
+
+ # Issue the Purge
+ resp = session.post(url, headers=headers)
+ resp.raise_for_status()
+
+ # Store the Purge ID so we can track it later
+ purges[tag] = resp.json()["id"]
+
+ for tag, purge_id in purges.iteritems():
+ # Ensure that the purge completed successfully
+ url = urlparse.urljoin(domain, "/purge")
+ status = session.get(url, params={"id": purge_id})
+ status.raise_for_status()
+
+ # If the purge completely successfully remove the tag from
+ # our list.
+ if status.json().get("results", {}).get("complete", None):
+ all_tags.remove(tag)
+ except Exception:
+ if count > max_tries:
+ raise
diff --git a/webui.py b/webui.py
index f5aad44..8a939cf 100644
--- a/webui.py
+++ b/webui.py
@@ -14,6 +14,7 @@ from distutils2.metadata import Metadata
from xml.etree import cElementTree
import itsdangerous
import redis
+import rq
try:
import json
@@ -257,6 +258,9 @@ class WebUI:
self.usercookie = None
self.failed = None # error message if initialization already produced a failure
+ # Queue to handle asynchronous tasks
+ self.queue = Queue(connection=self.redis)
+
# XMLRPC request or not?
if self.env.get('CONTENT_TYPE') != 'text/xml':
fs = cgi.FieldStorage(fp=handler.rfile, environ=env)
@@ -310,7 +314,7 @@ class WebUI:
# failed during initialization
self.fail(self.failed)
return
- self.store = store.Store(self.config)
+ self.store = store.Store(self.config, queue=self.queue)
try:
try:
self.store.get_cursor() # make sure we can connect