summaryrefslogtreecommitdiff
path: root/alembic/util/langhelpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'alembic/util/langhelpers.py')
-rw-r--r--alembic/util/langhelpers.py275
1 files changed, 275 insertions, 0 deletions
diff --git a/alembic/util/langhelpers.py b/alembic/util/langhelpers.py
new file mode 100644
index 0000000..904848c
--- /dev/null
+++ b/alembic/util/langhelpers.py
@@ -0,0 +1,275 @@
+import textwrap
+import warnings
+import inspect
+import uuid
+import collections
+
+from .compat import callable, exec_, string_types, with_metaclass
+
+from sqlalchemy.util import format_argspec_plus, update_wrapper
+from sqlalchemy.util.compat import inspect_getfullargspec
+
+
+class _ModuleClsMeta(type):
+ def __setattr__(cls, key, value):
+ super(_ModuleClsMeta, cls).__setattr__(key, value)
+ cls._update_module_proxies(key)
+
+
+class ModuleClsProxy(with_metaclass(_ModuleClsMeta)):
+ """Create module level proxy functions for the
+ methods on a given class.
+
+ The functions will have a compatible signature
+ as the methods.
+
+ """
+
+ _setups = collections.defaultdict(lambda: (set(), []))
+
+ @classmethod
+ def _update_module_proxies(cls, name):
+ attr_names, modules = cls._setups[cls]
+ for globals_, locals_ in modules:
+ cls._add_proxied_attribute(name, globals_, locals_, attr_names)
+
+ def _install_proxy(self):
+ attr_names, modules = self._setups[self.__class__]
+ for globals_, locals_ in modules:
+ globals_['_proxy'] = self
+ for attr_name in attr_names:
+ globals_[attr_name] = getattr(self, attr_name)
+
+ def _remove_proxy(self):
+ attr_names, modules = self._setups[self.__class__]
+ for globals_, locals_ in modules:
+ globals_['_proxy'] = None
+ for attr_name in attr_names:
+ del globals_[attr_name]
+
+ @classmethod
+ def create_module_class_proxy(cls, globals_, locals_):
+ attr_names, modules = cls._setups[cls]
+ modules.append(
+ (globals_, locals_)
+ )
+ cls._setup_proxy(globals_, locals_, attr_names)
+
+ @classmethod
+ def _setup_proxy(cls, globals_, locals_, attr_names):
+ for methname in dir(cls):
+ cls._add_proxied_attribute(methname, globals_, locals_, attr_names)
+
+ @classmethod
+ def _add_proxied_attribute(cls, methname, globals_, locals_, attr_names):
+ if not methname.startswith('_'):
+ meth = getattr(cls, methname)
+ if callable(meth):
+ locals_[methname] = cls._create_method_proxy(
+ methname, globals_, locals_)
+ else:
+ attr_names.add(methname)
+
+ @classmethod
+ def _create_method_proxy(cls, name, globals_, locals_):
+ fn = getattr(cls, name)
+ spec = inspect.getargspec(fn)
+ if spec[0] and spec[0][0] == 'self':
+ spec[0].pop(0)
+ args = inspect.formatargspec(*spec)
+ num_defaults = 0
+ if spec[3]:
+ num_defaults += len(spec[3])
+ name_args = spec[0]
+ if num_defaults:
+ defaulted_vals = name_args[0 - num_defaults:]
+ else:
+ defaulted_vals = ()
+
+ apply_kw = inspect.formatargspec(
+ name_args, spec[1], spec[2],
+ defaulted_vals,
+ formatvalue=lambda x: '=' + x)
+
+ def _name_error(name):
+ raise NameError(
+ "Can't invoke function '%s', as the proxy object has "
+ "not yet been "
+ "established for the Alembic '%s' class. "
+ "Try placing this code inside a callable." % (
+ name, cls.__name__
+ ))
+ globals_['_name_error'] = _name_error
+
+ func_text = textwrap.dedent("""\
+ def %(name)s(%(args)s):
+ %(doc)r
+ try:
+ p = _proxy
+ except NameError:
+ _name_error('%(name)s')
+ return _proxy.%(name)s(%(apply_kw)s)
+ e
+ """ % {
+ 'name': name,
+ 'args': args[1:-1],
+ 'apply_kw': apply_kw[1:-1],
+ 'doc': fn.__doc__,
+ })
+ lcl = {}
+ exec_(func_text, globals_, lcl)
+ return lcl[name]
+
+
+def asbool(value):
+ return value is not None and \
+ value.lower() == 'true'
+
+
+def rev_id():
+ val = int(uuid.uuid4()) % 100000000000000
+ return hex(val)[2:-1]
+
+
+def to_list(x, default=None):
+ if x is None:
+ return default
+ elif isinstance(x, string_types):
+ return [x]
+ elif isinstance(x, collections.Iterable):
+ return list(x)
+ else:
+ raise ValueError("Don't know how to turn %r into a list" % x)
+
+
+def to_tuple(x, default=None):
+ if x is None:
+ return default
+ elif isinstance(x, string_types):
+ return (x, )
+ elif isinstance(x, collections.Iterable):
+ return tuple(x)
+ else:
+ raise ValueError("Don't know how to turn %r into a tuple" % x)
+
+
+class memoized_property(object):
+
+ """A read-only @property that is only evaluated once."""
+
+ def __init__(self, fget, doc=None):
+ self.fget = fget
+ self.__doc__ = doc or fget.__doc__
+ self.__name__ = fget.__name__
+
+ def __get__(self, obj, cls):
+ if obj is None:
+ return self
+ obj.__dict__[self.__name__] = result = self.fget(obj)
+ return result
+
+
+class immutabledict(dict):
+
+ def _immutable(self, *arg, **kw):
+ raise TypeError("%s object is immutable" % self.__class__.__name__)
+
+ __delitem__ = __setitem__ = __setattr__ = \
+ clear = pop = popitem = setdefault = \
+ update = _immutable
+
+ def __new__(cls, *args):
+ new = dict.__new__(cls)
+ dict.__init__(new, *args)
+ return new
+
+ def __init__(self, *args):
+ pass
+
+ def __reduce__(self):
+ return immutabledict, (dict(self), )
+
+ def union(self, d):
+ if not self:
+ return immutabledict(d)
+ else:
+ d2 = immutabledict(self)
+ dict.update(d2, d)
+ return d2
+
+ def __repr__(self):
+ return "immutabledict(%s)" % dict.__repr__(self)
+
+
+def _with_legacy_names(translations):
+ def decorate(fn):
+
+ spec = inspect_getfullargspec(fn)
+ metadata = dict(target='target', fn='fn')
+ metadata.update(format_argspec_plus(spec, grouped=False))
+
+ has_keywords = bool(spec[2])
+
+ if not has_keywords:
+ metadata['args'] += ", **kw"
+ metadata['apply_kw'] += ", **kw"
+
+ def go(*arg, **kw):
+ names = set(kw).difference(spec[0])
+ for oldname, newname in translations:
+ if oldname in kw:
+ kw[newname] = kw.pop(oldname)
+ names.discard(oldname)
+
+ warnings.warn(
+ "Argument '%s' is now named '%s' for function '%s'" %
+ (oldname, newname, fn.__name__))
+ if not has_keywords and names:
+ raise TypeError("Unknown arguments: %s" % ", ".join(names))
+ return fn(*arg, **kw)
+
+ code = 'lambda %(args)s: %(target)s(%(apply_kw)s)' % (
+ metadata)
+ decorated = eval(code, {"target": go})
+ decorated.__defaults__ = getattr(fn, '__func__', fn).__defaults__
+ update_wrapper(decorated, fn)
+ if hasattr(decorated, '__wrapped__'):
+ # update_wrapper in py3k applies __wrapped__, which causes
+ # inspect.getargspec() to ignore the extra arguments on our
+ # wrapper as of Python 3.4. We need this for the
+ # "module class proxy" thing though, so just del the __wrapped__
+ # for now. See #175 as well as bugs.python.org/issue17482
+ del decorated.__wrapped__
+ return decorated
+
+ return decorate
+
+
+class Dispatcher(object):
+ def __init__(self):
+ self._registry = {}
+
+ def dispatch_for(self, target, qualifier='default'):
+ def decorate(fn):
+ assert isinstance(target, type)
+ assert target not in self._registry
+ self._registry[(target, qualifier)] = fn
+ return fn
+ return decorate
+
+ def dispatch(self, obj, qualifier='default'):
+ for spcls in type(obj).__mro__:
+ if qualifier != 'default' and (spcls, qualifier) in self._registry:
+ return self._registry[(spcls, qualifier)]
+ elif (spcls, 'default') in self._registry:
+ return self._registry[(spcls, 'default')]
+ else:
+ raise ValueError("no dispatch function for object: %s" % obj)
+
+ def branch(self):
+ """Return a copy of this dispatcher that is independently
+ writable."""
+
+ d = Dispatcher()
+ d._registry.update(self._registry)
+ return d