summaryrefslogtreecommitdiff
path: root/mako/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'mako/util.py')
-rw-r--r--mako/util.py91
1 files changed, 54 insertions, 37 deletions
diff --git a/mako/util.py b/mako/util.py
index 2f089ff..cee911b 100644
--- a/mako/util.py
+++ b/mako/util.py
@@ -4,12 +4,13 @@
# This module is part of Mako and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-import re
-import collections
import codecs
+import collections
+import operator
import os
+import re
+
from mako import compat
-import operator
def update_wrapper(decorated, fn):
@@ -19,7 +20,6 @@ def update_wrapper(decorated, fn):
class PluginLoader(object):
-
def __init__(self, group):
self.group = group
self.impls = {}
@@ -29,16 +29,16 @@ class PluginLoader(object):
return self.impls[name]()
else:
import pkg_resources
- for impl in pkg_resources.iter_entry_points(
- self.group,
- name):
+
+ for impl in pkg_resources.iter_entry_points(self.group, name):
self.impls[name] = impl.load
return impl.load()
else:
from mako import exceptions
+
raise exceptions.RuntimeException(
- "Can't load plugin %s %s" %
- (self.group, name))
+ "Can't load plugin %s %s" % (self.group, name)
+ )
def register(self, name, modulepath, objname):
def load():
@@ -46,18 +46,19 @@ class PluginLoader(object):
for token in modulepath.split(".")[1:]:
mod = getattr(mod, token)
return getattr(mod, objname)
+
self.impls[name] = load
-def verify_directory(dir):
+def verify_directory(dir_):
"""create and/or verify a filesystem directory."""
tries = 0
- while not os.path.exists(dir):
+ while not os.path.exists(dir_):
try:
tries += 1
- os.makedirs(dir, compat.octal("0775"))
+ os.makedirs(dir_, compat.octal("0775"))
except:
if tries > 5:
raise
@@ -109,11 +110,15 @@ class memoized_instancemethod(object):
def oneshot(*args, **kw):
result = self.fget(obj, *args, **kw)
- memo = lambda *a, **kw: result
+
+ def memo(*a, **kw):
+ return result
+
memo.__name__ = self.__name__
memo.__doc__ = self.__doc__
obj.__dict__[self.__name__] = memo
return result
+
oneshot.__name__ = self.__name__
oneshot.__doc__ = self.__doc__
return oneshot
@@ -137,13 +142,13 @@ class FastEncodingBuffer(object):
"""a very rudimentary buffer that is faster than StringIO,
but doesn't crash on unicode data like cStringIO."""
- def __init__(self, encoding=None, errors='strict', as_unicode=False):
+ def __init__(self, encoding=None, errors="strict", as_unicode=False):
self.data = collections.deque()
self.encoding = encoding
if as_unicode:
- self.delim = compat.u('')
+ self.delim = compat.u("")
else:
- self.delim = ''
+ self.delim = ""
self.as_unicode = as_unicode
self.errors = errors
self.write = self.data.append
@@ -154,8 +159,9 @@ class FastEncodingBuffer(object):
def getvalue(self):
if self.encoding:
- return self.delim.join(self.data).encode(self.encoding,
- self.errors)
+ return self.delim.join(self.data).encode(
+ self.encoding, self.errors
+ )
else:
return self.delim.join(self.data)
@@ -171,7 +177,6 @@ class LRUCache(dict):
"""
class _Item(object):
-
def __init__(self, key, value):
self.key = key
self.value = value
@@ -180,7 +185,7 @@ class LRUCache(dict):
def __repr__(self):
return repr(self.value)
- def __init__(self, capacity, threshold=.5):
+ def __init__(self, capacity, threshold=0.5):
self.capacity = capacity
self.threshold = threshold
@@ -210,9 +215,12 @@ class LRUCache(dict):
def _manage_size(self):
while len(self) > self.capacity + self.capacity * self.threshold:
- bytime = sorted(dict.values(self),
- key=operator.attrgetter('timestamp'), reverse=True)
- for item in bytime[self.capacity:]:
+ bytime = sorted(
+ dict.values(self),
+ key=operator.attrgetter("timestamp"),
+ reverse=True,
+ )
+ for item in bytime[self.capacity :]:
try:
del self[item.key]
except KeyError:
@@ -220,10 +228,11 @@ class LRUCache(dict):
# broke in on us. loop around and try again
break
+
# Regexp to match python magic encoding line
_PYTHON_MAGIC_COMMENT_re = re.compile(
- r'[ \t\f]* \# .* coding[=:][ \t]*([-\w.]+)',
- re.VERBOSE)
+ r"[ \t\f]* \# .* coding[=:][ \t]*([-\w.]+)", re.VERBOSE
+)
def parse_encoding(fp):
@@ -242,13 +251,14 @@ def parse_encoding(fp):
line1 = fp.readline()
has_bom = line1.startswith(codecs.BOM_UTF8)
if has_bom:
- line1 = line1[len(codecs.BOM_UTF8):]
+ line1 = line1[len(codecs.BOM_UTF8) :]
- m = _PYTHON_MAGIC_COMMENT_re.match(line1.decode('ascii', 'ignore'))
+ m = _PYTHON_MAGIC_COMMENT_re.match(line1.decode("ascii", "ignore"))
if not m:
try:
import parser
- parser.suite(line1.decode('ascii', 'ignore'))
+
+ parser.suite(line1.decode("ascii", "ignore"))
except (ImportError, SyntaxError):
# Either it's a real syntax error, in which case the source
# is not valid python source, or line2 is a continuation of
@@ -258,14 +268,16 @@ def parse_encoding(fp):
else:
line2 = fp.readline()
m = _PYTHON_MAGIC_COMMENT_re.match(
- line2.decode('ascii', 'ignore'))
+ line2.decode("ascii", "ignore")
+ )
if has_bom:
if m:
raise SyntaxError(
"python refuses to compile code with both a UTF8"
- " byte-order-mark and a magic encoding comment")
- return 'utf_8'
+ " byte-order-mark and a magic encoding comment"
+ )
+ return "utf_8"
elif m:
return m.group(1)
else:
@@ -289,10 +301,11 @@ def restore__ast(_ast):
"""Attempt to restore the required classes to the _ast module if it
appears to be missing them
"""
- if hasattr(_ast, 'AST'):
+ if hasattr(_ast, "AST"):
return
_ast.PyCF_ONLY_AST = 2 << 9
- m = compile("""\
+ m = compile(
+ """\
def foo(): pass
class Bar(object): pass
if False: pass
@@ -305,13 +318,17 @@ baz = 'mako'
baz and 'foo' or 'bar'
(mako is baz == baz) is not baz != mako
mako > baz < mako >= baz <= mako
-mako in baz not in mako""", '<unknown>', 'exec', _ast.PyCF_ONLY_AST)
+mako in baz not in mako""",
+ "<unknown>",
+ "exec",
+ _ast.PyCF_ONLY_AST,
+ )
_ast.Module = type(m)
for cls in _ast.Module.__mro__:
- if cls.__name__ == 'mod':
+ if cls.__name__ == "mod":
_ast.mod = cls
- elif cls.__name__ == 'AST':
+ elif cls.__name__ == "AST":
_ast.AST = cls
_ast.FunctionDef = type(m.body[0])
@@ -361,7 +378,7 @@ mako in baz not in mako""", '<unknown>', 'exec', _ast.PyCF_ONLY_AST)
_ast.NotIn = type(m.body[12].value.ops[1])
-def read_file(path, mode='rb'):
+def read_file(path, mode="rb"):
fp = open(path, mode)
try:
data = fp.read()