diff options
Diffstat (limited to 'deps/v8/third_party/jinja2/bccache.py')
-rw-r--r-- | deps/v8/third_party/jinja2/bccache.py | 190 |
1 files changed, 123 insertions, 67 deletions
diff --git a/deps/v8/third_party/jinja2/bccache.py b/deps/v8/third_party/jinja2/bccache.py index 9c0661030f..d0ddf56ef6 100644 --- a/deps/v8/third_party/jinja2/bccache.py +++ b/deps/v8/third_party/jinja2/bccache.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- """The optional bytecode cache system. This is useful if you have very complex template situations and the compilation of all those templates slows down your application too much. @@ -8,22 +7,30 @@ are initialized on the first request. """ import errno import fnmatch +import marshal import os +import pickle import stat import sys import tempfile +import typing as t from hashlib import sha1 -from os import listdir -from os import path +from io import BytesIO +from types import CodeType -from ._compat import BytesIO -from ._compat import marshal_dump -from ._compat import marshal_load -from ._compat import pickle -from ._compat import text_type -from .utils import open_if_exists +if t.TYPE_CHECKING: + import typing_extensions as te + from .environment import Environment -bc_version = 4 + class _MemcachedClient(te.Protocol): + def get(self, key: str) -> bytes: + ... + + def set(self, key: str, value: bytes, timeout: t.Optional[int] = None) -> None: + ... + + +bc_version = 5 # Magic bytes to identify Jinja bytecode cache files. Contains the # Python major and minor version to avoid loading incompatible bytecode # if a project upgrades its Python version. @@ -34,7 +41,7 @@ bc_magic = ( ) -class Bucket(object): +class Bucket: """Buckets are used to store the bytecode for one template. It's created and initialized by the bytecode cache and passed to the loading functions. @@ -43,17 +50,17 @@ class Bucket(object): cache subclasses don't have to care about cache invalidation. """ - def __init__(self, environment, key, checksum): + def __init__(self, environment: "Environment", key: str, checksum: str) -> None: self.environment = environment self.key = key self.checksum = checksum self.reset() - def reset(self): + def reset(self) -> None: """Resets the bucket (unloads the bytecode).""" - self.code = None + self.code: t.Optional[CodeType] = None - def load_bytecode(self, f): + def load_bytecode(self, f: t.BinaryIO) -> None: """Loads bytecode from a file or file like object.""" # make sure the magic header is correct magic = f.read(len(bc_magic)) @@ -67,31 +74,31 @@ class Bucket(object): return # if marshal_load fails then we need to reload try: - self.code = marshal_load(f) + self.code = marshal.load(f) except (EOFError, ValueError, TypeError): self.reset() return - def write_bytecode(self, f): + def write_bytecode(self, f: t.IO[bytes]) -> None: """Dump the bytecode into the file or file like object passed.""" if self.code is None: raise TypeError("can't write empty bucket") f.write(bc_magic) pickle.dump(self.checksum, f, 2) - marshal_dump(self.code, f) + marshal.dump(self.code, f) - def bytecode_from_string(self, string): - """Load bytecode from a string.""" + def bytecode_from_string(self, string: bytes) -> None: + """Load bytecode from bytes.""" self.load_bytecode(BytesIO(string)) - def bytecode_to_string(self): - """Return the bytecode as string.""" + def bytecode_to_string(self) -> bytes: + """Return the bytecode as bytes.""" out = BytesIO() self.write_bytecode(out) return out.getvalue() -class BytecodeCache(object): +class BytecodeCache: """To implement your own bytecode cache you have to subclass this class and override :meth:`load_bytecode` and :meth:`dump_bytecode`. Both of these methods are passed a :class:`~jinja2.bccache.Bucket`. @@ -120,41 +127,48 @@ class BytecodeCache(object): Jinja. """ - def load_bytecode(self, bucket): + def load_bytecode(self, bucket: Bucket) -> None: """Subclasses have to override this method to load bytecode into a bucket. If they are not able to find code in the cache for the bucket, it must not do anything. """ raise NotImplementedError() - def dump_bytecode(self, bucket): + def dump_bytecode(self, bucket: Bucket) -> None: """Subclasses have to override this method to write the bytecode from a bucket back to the cache. If it unable to do so it must not fail silently but raise an exception. """ raise NotImplementedError() - def clear(self): + def clear(self) -> None: """Clears the cache. This method is not used by Jinja but should be implemented to allow applications to clear the bytecode cache used by a particular environment. """ - def get_cache_key(self, name, filename=None): + def get_cache_key( + self, name: str, filename: t.Optional[t.Union[str]] = None + ) -> str: """Returns the unique hash key for this template name.""" hash = sha1(name.encode("utf-8")) + if filename is not None: - filename = "|" + filename - if isinstance(filename, text_type): - filename = filename.encode("utf-8") - hash.update(filename) + hash.update(f"|{filename}".encode()) + return hash.hexdigest() - def get_source_checksum(self, source): + def get_source_checksum(self, source: str) -> str: """Returns a checksum for the source.""" return sha1(source.encode("utf-8")).hexdigest() - def get_bucket(self, environment, name, filename, source): + def get_bucket( + self, + environment: "Environment", + name: str, + filename: t.Optional[str], + source: str, + ) -> Bucket: """Return a cache bucket for the given template. All arguments are mandatory but filename may be `None`. """ @@ -164,7 +178,7 @@ class BytecodeCache(object): self.load_bytecode(bucket) return bucket - def set_bucket(self, bucket): + def set_bucket(self, bucket: Bucket) -> None: """Put the bucket into the cache.""" self.dump_bytecode(bucket) @@ -187,14 +201,16 @@ class FileSystemBytecodeCache(BytecodeCache): This bytecode cache supports clearing of the cache using the clear method. """ - def __init__(self, directory=None, pattern="__jinja2_%s.cache"): + def __init__( + self, directory: t.Optional[str] = None, pattern: str = "__jinja2_%s.cache" + ) -> None: if directory is None: directory = self._get_default_cache_dir() self.directory = directory self.pattern = pattern - def _get_default_cache_dir(self): - def _unsafe_dir(): + def _get_default_cache_dir(self) -> str: + def _unsafe_dir() -> "te.NoReturn": raise RuntimeError( "Cannot determine safe temp directory. You " "need to explicitly provide one." @@ -209,7 +225,7 @@ class FileSystemBytecodeCache(BytecodeCache): if not hasattr(os, "getuid"): _unsafe_dir() - dirname = "_jinja2-cache-%d" % os.getuid() + dirname = f"_jinja2-cache-{os.getuid()}" actual_dir = os.path.join(tmpdir, dirname) try: @@ -240,34 +256,72 @@ class FileSystemBytecodeCache(BytecodeCache): return actual_dir - def _get_cache_filename(self, bucket): - return path.join(self.directory, self.pattern % bucket.key) + def _get_cache_filename(self, bucket: Bucket) -> str: + return os.path.join(self.directory, self.pattern % (bucket.key,)) + + def load_bytecode(self, bucket: Bucket) -> None: + filename = self._get_cache_filename(bucket) - def load_bytecode(self, bucket): - f = open_if_exists(self._get_cache_filename(bucket), "rb") - if f is not None: + # Don't test for existence before opening the file, since the + # file could disappear after the test before the open. + try: + f = open(filename, "rb") + except (FileNotFoundError, IsADirectoryError, PermissionError): + # PermissionError can occur on Windows when an operation is + # in progress, such as calling clear(). + return + + with f: + bucket.load_bytecode(f) + + def dump_bytecode(self, bucket: Bucket) -> None: + # Write to a temporary file, then rename to the real name after + # writing. This avoids another process reading the file before + # it is fully written. + name = self._get_cache_filename(bucket) + f = tempfile.NamedTemporaryFile( + mode="wb", + dir=os.path.dirname(name), + prefix=os.path.basename(name), + suffix=".tmp", + delete=False, + ) + + def remove_silent() -> None: try: - bucket.load_bytecode(f) - finally: - f.close() + os.remove(f.name) + except OSError: + # Another process may have called clear(). On Windows, + # another program may be holding the file open. + pass - def dump_bytecode(self, bucket): - f = open(self._get_cache_filename(bucket), "wb") try: - bucket.write_bytecode(f) - finally: - f.close() + with f: + bucket.write_bytecode(f) + except BaseException: + remove_silent() + raise - def clear(self): + try: + os.replace(f.name, name) + except OSError: + # Another process may have called clear(). On Windows, + # another program may be holding the file open. + remove_silent() + except BaseException: + remove_silent() + raise + + def clear(self) -> None: # imported lazily here because google app-engine doesn't support # write access on the file system and the function does not exist # normally. from os import remove - files = fnmatch.filter(listdir(self.directory), self.pattern % "*") + files = fnmatch.filter(os.listdir(self.directory), self.pattern % ("*",)) for filename in files: try: - remove(path.join(self.directory, filename)) + remove(os.path.join(self.directory, filename)) except OSError: pass @@ -284,7 +338,7 @@ class MemcachedBytecodeCache(BytecodeCache): - `python-memcached <https://pypi.org/project/python-memcached/>`_ (Unfortunately the django cache interface is not compatible because it - does not support storing binary data, only unicode. You can however pass + does not support storing binary data, only text. You can however pass the underlying cache client to the bytecode cache which is available as `django.core.cache.cache._client`.) @@ -319,32 +373,34 @@ class MemcachedBytecodeCache(BytecodeCache): def __init__( self, - client, - prefix="jinja2/bytecode/", - timeout=None, - ignore_memcache_errors=True, + client: "_MemcachedClient", + prefix: str = "jinja2/bytecode/", + timeout: t.Optional[int] = None, + ignore_memcache_errors: bool = True, ): self.client = client self.prefix = prefix self.timeout = timeout self.ignore_memcache_errors = ignore_memcache_errors - def load_bytecode(self, bucket): + def load_bytecode(self, bucket: Bucket) -> None: try: code = self.client.get(self.prefix + bucket.key) except Exception: if not self.ignore_memcache_errors: raise - code = None - if code is not None: + else: bucket.bytecode_from_string(code) - def dump_bytecode(self, bucket): - args = (self.prefix + bucket.key, bucket.bytecode_to_string()) - if self.timeout is not None: - args += (self.timeout,) + def dump_bytecode(self, bucket: Bucket) -> None: + key = self.prefix + bucket.key + value = bucket.bytecode_to_string() + try: - self.client.set(*args) + if self.timeout is not None: + self.client.set(key, value, self.timeout) + else: + self.client.set(key, value) except Exception: if not self.ignore_memcache_errors: raise |