summaryrefslogtreecommitdiff
path: root/alembic/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'alembic/util.py')
-rw-r--r--alembic/util.py405
1 files changed, 0 insertions, 405 deletions
diff --git a/alembic/util.py b/alembic/util.py
deleted file mode 100644
index 2e0f731..0000000
--- a/alembic/util.py
+++ /dev/null
@@ -1,405 +0,0 @@
-import sys
-import os
-import textwrap
-import warnings
-import re
-import inspect
-import uuid
-import collections
-
-from mako.template import Template
-from sqlalchemy.engine import url
-from sqlalchemy import __version__
-
-from .compat import callable, exec_, load_module_py, load_module_pyc, \
- binary_type, string_types, py27
-
-
-class CommandError(Exception):
- pass
-
-
-def _safe_int(value):
- try:
- return int(value)
- except:
- return value
-_vers = tuple(
- [_safe_int(x) for x in re.findall(r'(\d+|[abc]\d)', __version__)])
-sqla_07 = _vers > (0, 7, 2)
-sqla_079 = _vers >= (0, 7, 9)
-sqla_08 = _vers >= (0, 8, 0)
-sqla_083 = _vers >= (0, 8, 3)
-sqla_084 = _vers >= (0, 8, 4)
-sqla_09 = _vers >= (0, 9, 0)
-sqla_092 = _vers >= (0, 9, 2)
-sqla_094 = _vers >= (0, 9, 4)
-sqla_094 = _vers >= (0, 9, 4)
-sqla_099 = _vers >= (0, 9, 9)
-sqla_100 = _vers >= (1, 0, 0)
-sqla_105 = _vers >= (1, 0, 5)
-if not sqla_07:
- raise CommandError(
- "SQLAlchemy 0.7.3 or greater is required. ")
-
-from sqlalchemy.util import format_argspec_plus, update_wrapper
-from sqlalchemy.util.compat import inspect_getfullargspec
-
-import logging
-log = logging.getLogger(__name__)
-
-if py27:
- # disable "no handler found" errors
- logging.getLogger('alembic').addHandler(logging.NullHandler())
-
-
-try:
- import fcntl
- import termios
- import struct
- ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ,
- struct.pack('HHHH', 0, 0, 0, 0))
- _h, TERMWIDTH, _hp, _wp = struct.unpack('HHHH', ioctl)
- if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty
- TERMWIDTH = None
-except (ImportError, IOError):
- TERMWIDTH = None
-
-
-def template_to_file(template_file, dest, output_encoding, **kw):
- with open(dest, 'wb') as f:
- template = Template(filename=template_file)
- f.write(
- template.render_unicode(**kw).encode(output_encoding)
- )
-
-
-def create_module_class_proxy(cls, globals_, locals_):
- """Create module level proxy functions for the
- methods on a given class.
-
- The functions will have a compatible signature
- as the methods. A proxy is established
- using the ``_install_proxy(obj)`` function,
- and removed using ``_remove_proxy()``, both
- installed by calling this function.
-
- """
- attr_names = set()
-
- def _install_proxy(obj):
- globals_['_proxy'] = obj
- for name in attr_names:
- globals_[name] = getattr(obj, name)
-
- def _remove_proxy():
- globals_['_proxy'] = None
- for name in attr_names:
- del globals_[name]
-
- globals_['_install_proxy'] = _install_proxy
- globals_['_remove_proxy'] = _remove_proxy
-
- def _create_op_proxy(name):
- fn = getattr(cls, name)
- spec = inspect.getargspec(fn)
- if spec[0] and spec[0][0] == 'self':
- spec[0].pop(0)
- args = inspect.formatargspec(*spec)
- num_defaults = 0
- if spec[3]:
- num_defaults += len(spec[3])
- name_args = spec[0]
- if num_defaults:
- defaulted_vals = name_args[0 - num_defaults:]
- else:
- defaulted_vals = ()
-
- apply_kw = inspect.formatargspec(
- name_args, spec[1], spec[2],
- defaulted_vals,
- formatvalue=lambda x: '=' + x)
-
- def _name_error(name):
- raise NameError(
- "Can't invoke function '%s', as the proxy object has "
- "not yet been "
- "established for the Alembic '%s' class. "
- "Try placing this code inside a callable." % (
- name, cls.__name__
- ))
- globals_['_name_error'] = _name_error
-
- func_text = textwrap.dedent("""\
- def %(name)s(%(args)s):
- %(doc)r
- try:
- p = _proxy
- except NameError:
- _name_error('%(name)s')
- return _proxy.%(name)s(%(apply_kw)s)
- e
- """ % {
- 'name': name,
- 'args': args[1:-1],
- 'apply_kw': apply_kw[1:-1],
- 'doc': fn.__doc__,
- })
- lcl = {}
- exec_(func_text, globals_, lcl)
- return lcl[name]
-
- for methname in dir(cls):
- if not methname.startswith('_'):
- if callable(getattr(cls, methname)):
- locals_[methname] = _create_op_proxy(methname)
- else:
- attr_names.add(methname)
-
-
-def write_outstream(stream, *text):
- encoding = getattr(stream, 'encoding', 'ascii') or 'ascii'
- for t in text:
- if not isinstance(t, binary_type):
- t = t.encode(encoding, 'replace')
- t = t.decode(encoding)
- try:
- stream.write(t)
- except IOError:
- # suppress "broken pipe" errors.
- # no known way to handle this on Python 3 however
- # as the exception is "ignored" (noisily) in TextIOWrapper.
- break
-
-
-def coerce_resource_to_filename(fname):
- """Interpret a filename as either a filesystem location or as a package
- resource.
-
- Names that are non absolute paths and contain a colon
- are interpreted as resources and coerced to a file location.
-
- """
- if not os.path.isabs(fname) and ":" in fname:
- import pkg_resources
- fname = pkg_resources.resource_filename(*fname.split(':'))
- return fname
-
-
-def status(_statmsg, fn, *arg, **kw):
- msg(_statmsg + " ...", False)
- try:
- ret = fn(*arg, **kw)
- write_outstream(sys.stdout, " done\n")
- return ret
- except:
- write_outstream(sys.stdout, " FAILED\n")
- raise
-
-
-def err(message):
- log.error(message)
- msg("FAILED: %s" % message)
- sys.exit(-1)
-
-
-def obfuscate_url_pw(u):
- u = url.make_url(u)
- if u.password:
- u.password = 'XXXXX'
- return str(u)
-
-
-def asbool(value):
- return value is not None and \
- value.lower() == 'true'
-
-
-def warn(msg):
- warnings.warn(msg)
-
-
-def msg(msg, newline=True):
- if TERMWIDTH is None:
- write_outstream(sys.stdout, msg)
- if newline:
- write_outstream(sys.stdout, "\n")
- else:
- # left indent output lines
- lines = textwrap.wrap(msg, TERMWIDTH)
- if len(lines) > 1:
- for line in lines[0:-1]:
- write_outstream(sys.stdout, " ", line, "\n")
- write_outstream(sys.stdout, " ", lines[-1], ("\n" if newline else ""))
-
-
-def load_python_file(dir_, filename):
- """Load a file from the given path as a Python module."""
-
- module_id = re.sub(r'\W', "_", filename)
- path = os.path.join(dir_, filename)
- _, ext = os.path.splitext(filename)
- if ext == ".py":
- if os.path.exists(path):
- module = load_module_py(module_id, path)
- elif os.path.exists(simple_pyc_file_from_path(path)):
- # look for sourceless load
- module = load_module_pyc(
- module_id, simple_pyc_file_from_path(path))
- else:
- raise ImportError("Can't find Python file %s" % path)
- elif ext in (".pyc", ".pyo"):
- module = load_module_pyc(module_id, path)
- del sys.modules[module_id]
- return module
-
-
-def simple_pyc_file_from_path(path):
- """Given a python source path, return the so-called
- "sourceless" .pyc or .pyo path.
-
- This just a .pyc or .pyo file where the .py file would be.
-
- Even with PEP-3147, which normally puts .pyc/.pyo files in __pycache__,
- this use case remains supported as a so-called "sourceless module import".
-
- """
- if sys.flags.optimize:
- return path + "o" # e.g. .pyo
- else:
- return path + "c" # e.g. .pyc
-
-
-def pyc_file_from_path(path):
- """Given a python source path, locate the .pyc.
-
- See http://www.python.org/dev/peps/pep-3147/
- #detecting-pep-3147-availability
- http://www.python.org/dev/peps/pep-3147/#file-extension-checks
-
- """
- import imp
- has3147 = hasattr(imp, 'get_tag')
- if has3147:
- return imp.cache_from_source(path)
- else:
- return simple_pyc_file_from_path(path)
-
-
-def rev_id():
- val = int(uuid.uuid4()) % 100000000000000
- return hex(val)[2:-1]
-
-
-def to_tuple(x, default=None):
- if x is None:
- return default
- elif isinstance(x, string_types):
- return (x, )
- elif isinstance(x, collections.Iterable):
- return tuple(x)
- else:
- raise ValueError("Don't know how to turn %r into a tuple" % x)
-
-
-def format_as_comma(value):
- if value is None:
- return ""
- elif isinstance(value, string_types):
- return value
- elif isinstance(value, collections.Iterable):
- return ", ".join(value)
- else:
- raise ValueError("Don't know how to comma-format %r" % value)
-
-
-class memoized_property(object):
-
- """A read-only @property that is only evaluated once."""
-
- def __init__(self, fget, doc=None):
- self.fget = fget
- self.__doc__ = doc or fget.__doc__
- self.__name__ = fget.__name__
-
- def __get__(self, obj, cls):
- if obj is None:
- return self
- obj.__dict__[self.__name__] = result = self.fget(obj)
- return result
-
-
-class immutabledict(dict):
-
- def _immutable(self, *arg, **kw):
- raise TypeError("%s object is immutable" % self.__class__.__name__)
-
- __delitem__ = __setitem__ = __setattr__ = \
- clear = pop = popitem = setdefault = \
- update = _immutable
-
- def __new__(cls, *args):
- new = dict.__new__(cls)
- dict.__init__(new, *args)
- return new
-
- def __init__(self, *args):
- pass
-
- def __reduce__(self):
- return immutabledict, (dict(self), )
-
- def union(self, d):
- if not self:
- return immutabledict(d)
- else:
- d2 = immutabledict(self)
- dict.update(d2, d)
- return d2
-
- def __repr__(self):
- return "immutabledict(%s)" % dict.__repr__(self)
-
-
-def _with_legacy_names(translations):
- def decorate(fn):
-
- spec = inspect_getfullargspec(fn)
- metadata = dict(target='target', fn='fn')
- metadata.update(format_argspec_plus(spec, grouped=False))
-
- has_keywords = bool(spec[2])
-
- if not has_keywords:
- metadata['args'] += ", **kw"
- metadata['apply_kw'] += ", **kw"
-
- def go(*arg, **kw):
- names = set(kw).difference(spec[0])
- for oldname, newname in translations:
- if oldname in kw:
- kw[newname] = kw.pop(oldname)
- names.discard(oldname)
-
- warnings.warn(
- "Argument '%s' is now named '%s' for function '%s'" %
- (oldname, newname, fn.__name__))
- if not has_keywords and names:
- raise TypeError("Unknown arguments: %s" % ", ".join(names))
- return fn(*arg, **kw)
-
- code = 'lambda %(args)s: %(target)s(%(apply_kw)s)' % (
- metadata)
- decorated = eval(code, {"target": go})
- decorated.__defaults__ = getattr(fn, '__func__', fn).__defaults__
- update_wrapper(decorated, fn)
- if hasattr(decorated, '__wrapped__'):
- # update_wrapper in py3k applies __wrapped__, which causes
- # inspect.getargspec() to ignore the extra arguments on our
- # wrapper as of Python 3.4. We need this for the
- # "module class proxy" thing though, so just del the __wrapped__
- # for now. See #175 as well as bugs.python.org/issue17482
- del decorated.__wrapped__
- return decorated
-
- return decorate