From ea1ea42752e5e04ff0f22f152284ccd9518afcc8 Mon Sep 17 00:00:00 2001 From: Donald Stufft Date: Wed, 24 Jul 2013 22:54:05 -0400 Subject: Use RQ to handle purging asynchronously --- store.py | 111 ++++++++++++++------------------------------------------------- tasks.py | 41 +++++++++++++++++++++++ webui.py | 6 +++- 3 files changed, 71 insertions(+), 87 deletions(-) create mode 100644 tasks.py diff --git a/store.py b/store.py index 6fe14f5..e3d2188 100644 --- a/store.py +++ b/store.py @@ -28,6 +28,8 @@ import urlparse import time from functools import wraps +import tasks + # we import both the old and new (PEP 386) methods of handling versions since # some version strings are not compatible with the new method and we can fall # back on the old version @@ -75,51 +77,6 @@ connection = None keep_trove = True -def retry(ExceptionToCheck, tries=4, delay=3, backoff=2, logger=None): - """Retry calling the decorated function using an exponential backoff. - - http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ - original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry - - :param ExceptionToCheck: the exception to check. may be a tuple of - exceptions to check - :type ExceptionToCheck: Exception or tuple - :param tries: number of times to try (not retry) before giving up - :type tries: int - :param delay: initial delay between retries in seconds - :type delay: int - :param backoff: backoff multiplier e.g. value of 2 will double the delay - each retry - :type backoff: int - :param logger: logger to use. If None, print - :type logger: logging.Logger instance - """ - def deco_retry(f): - - @wraps(f) - def f_retry(*args, **kwargs): - mtries, mdelay = tries, delay - while mtries > 1: - try: - return f(*args, **kwargs) - except ExceptionToCheck as e: - msg = "%s, Retrying in %d seconds..." % (str(e), mdelay) - if logger: - logger.warning(msg) - else: - print(msg) - time.sleep(mdelay) - mtries -= 1 - mdelay *= backoff - if not mtries: - raise - return f(*args, **kwargs) - - return f_retry # true decorator - - return deco_retry - - def normalize_package_name(n): "Return lower-cased version of safe_name of n." return safe_name(n).lower() @@ -275,7 +232,7 @@ class Store: XXX update schema info ... Packages are unique by (name, version). ''' - def __init__(self, config): + def __init__(self, config, queue=None): self.config = config self.username = None self.userip = None @@ -289,8 +246,16 @@ class Store: self.true, self.false = 'TRUE', 'FALSE' self.can_lock = True + self.queue = queue + self._changed_packages = set() + def enqueue(self, func, *args, **kwargs): + if self.queue is None: + func(*args, **kwargs) + else: + self.queue.enqueue(func, *args, **kwargs) + def last_id(self, tablename): ''' Return an SQL expression that returns the last inserted row, where the row is in the given table. @@ -2342,48 +2307,22 @@ class Store: def _add_invalidation(self, package=None): self._changed_packages.add(package) - @retry(Exception, tries=5, delay=1, backoff=1) def _invalidate_cache(self): - # Purge all tags from Fastly if self.config.fastly_api_key: - api_domain = self.config.fastly_api_domain - service_id = self.config.fastly_service_id - - session = requests.session() - count = 0 - - while self._changed_packages and count <= 10: - count += 1 - purges = {} - - headers = { - "X-Fastly-Key": self.config.fastly_api_key, - "Accept": "application/json", - } - - for package in set(self._changed_packages): - if package is None: - tag = "simple-index" - else: - tag = "pkg~%s" % package - - # Issue the purge - url_path = "/service/%s/purge/%s" % (service_id, tag) - url = urlparse.urljoin(api_domain, url_path) - resp = session.post(url, headers=headers) - resp.raise_for_status() - purges[package] = resp.json()["id"] - - for package, pid in purges.iteritems(): - # Ensure that the purge completed successfully - url = urlparse.urljoin(api_domain, "/purge") - status = session.get(url, params={"id": pid}) - status.raise_for_status() - - if status.json().get("results", {}).get("complete", None): - self._changed_packages.remove(package) - else: - self._changed_packages = set() + # Build up a list of tags we want to purge + tags = ["pkg~%s" % pkg if pkg is not None else "simple-index" + for pkg in self._changed_packages] + + # Enqueue the purge + self.enqueue(tasks.purge_fastly_tags, + self.config.fastly_api_domain, + self.config.fastly_api_key, + self.config.fastly_service_id, + *tags + ) + + # Empty our changed packages + self._changed_packages = set() def close(self): if self._conn is None: diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..dab4427 --- /dev/null +++ b/tasks.py @@ -0,0 +1,41 @@ +import urlparse + +import requests + + +def purge_fastly_tags(domain, api_key, service_id, *tags, max_tries=25): + session = requests.session() + headers = {"X-Fastly-Key": api_key, "Accept": "application/json"} + + all_tags = set(tags) + purges = {} + + count = 0 + + while all_tags and not count > max_tries: + try: + for tag in set(all_tags): + # Build the URL + url_path = "/service/%s/purge/%s" % (service_id, tag) + url = urlparse.urljoin(domain, url_path) + + # Issue the Purge + resp = session.post(url, headers=headers) + resp.raise_for_status() + + # Store the Purge ID so we can track it later + purges[tag] = resp.json()["id"] + + for tag, purge_id in purges.iteritems(): + # Ensure that the purge completed successfully + url = urlparse.urljoin(domain, "/purge") + status = session.get(url, params={"id": purge_id}) + status.raise_for_status() + + # If the purge completely successfully remove the tag from + # our list. + if status.json().get("results", {}).get("complete", None): + all_tags.remove(tag) + except Exception: + if count > max_tries: + raise diff --git a/webui.py b/webui.py index f5aad44..8a939cf 100644 --- a/webui.py +++ b/webui.py @@ -14,6 +14,7 @@ from distutils2.metadata import Metadata from xml.etree import cElementTree import itsdangerous import redis +import rq try: import json @@ -257,6 +258,9 @@ class WebUI: self.usercookie = None self.failed = None # error message if initialization already produced a failure + # Queue to handle asynchronous tasks + self.queue = Queue(connection=self.redis) + # XMLRPC request or not? if self.env.get('CONTENT_TYPE') != 'text/xml': fs = cgi.FieldStorage(fp=handler.rfile, environ=env) @@ -310,7 +314,7 @@ class WebUI: # failed during initialization self.fail(self.failed) return - self.store = store.Store(self.config) + self.store = store.Store(self.config, queue=self.queue) try: try: self.store.get_cursor() # make sure we can connect -- cgit v1.2.1