diff options
Diffstat (limited to 'alembic/util.py')
-rw-r--r-- | alembic/util.py | 405 |
1 files changed, 0 insertions, 405 deletions
diff --git a/alembic/util.py b/alembic/util.py deleted file mode 100644 index 2e0f731..0000000 --- a/alembic/util.py +++ /dev/null @@ -1,405 +0,0 @@ -import sys -import os -import textwrap -import warnings -import re -import inspect -import uuid -import collections - -from mako.template import Template -from sqlalchemy.engine import url -from sqlalchemy import __version__ - -from .compat import callable, exec_, load_module_py, load_module_pyc, \ - binary_type, string_types, py27 - - -class CommandError(Exception): - pass - - -def _safe_int(value): - try: - return int(value) - except: - return value -_vers = tuple( - [_safe_int(x) for x in re.findall(r'(\d+|[abc]\d)', __version__)]) -sqla_07 = _vers > (0, 7, 2) -sqla_079 = _vers >= (0, 7, 9) -sqla_08 = _vers >= (0, 8, 0) -sqla_083 = _vers >= (0, 8, 3) -sqla_084 = _vers >= (0, 8, 4) -sqla_09 = _vers >= (0, 9, 0) -sqla_092 = _vers >= (0, 9, 2) -sqla_094 = _vers >= (0, 9, 4) -sqla_094 = _vers >= (0, 9, 4) -sqla_099 = _vers >= (0, 9, 9) -sqla_100 = _vers >= (1, 0, 0) -sqla_105 = _vers >= (1, 0, 5) -if not sqla_07: - raise CommandError( - "SQLAlchemy 0.7.3 or greater is required. ") - -from sqlalchemy.util import format_argspec_plus, update_wrapper -from sqlalchemy.util.compat import inspect_getfullargspec - -import logging -log = logging.getLogger(__name__) - -if py27: - # disable "no handler found" errors - logging.getLogger('alembic').addHandler(logging.NullHandler()) - - -try: - import fcntl - import termios - import struct - ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ, - struct.pack('HHHH', 0, 0, 0, 0)) - _h, TERMWIDTH, _hp, _wp = struct.unpack('HHHH', ioctl) - if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty - TERMWIDTH = None -except (ImportError, IOError): - TERMWIDTH = None - - -def template_to_file(template_file, dest, output_encoding, **kw): - with open(dest, 'wb') as f: - template = Template(filename=template_file) - f.write( - template.render_unicode(**kw).encode(output_encoding) - ) - - -def create_module_class_proxy(cls, globals_, locals_): - """Create module level proxy functions for the - methods on a given class. - - The functions will have a compatible signature - as the methods. A proxy is established - using the ``_install_proxy(obj)`` function, - and removed using ``_remove_proxy()``, both - installed by calling this function. - - """ - attr_names = set() - - def _install_proxy(obj): - globals_['_proxy'] = obj - for name in attr_names: - globals_[name] = getattr(obj, name) - - def _remove_proxy(): - globals_['_proxy'] = None - for name in attr_names: - del globals_[name] - - globals_['_install_proxy'] = _install_proxy - globals_['_remove_proxy'] = _remove_proxy - - def _create_op_proxy(name): - fn = getattr(cls, name) - spec = inspect.getargspec(fn) - if spec[0] and spec[0][0] == 'self': - spec[0].pop(0) - args = inspect.formatargspec(*spec) - num_defaults = 0 - if spec[3]: - num_defaults += len(spec[3]) - name_args = spec[0] - if num_defaults: - defaulted_vals = name_args[0 - num_defaults:] - else: - defaulted_vals = () - - apply_kw = inspect.formatargspec( - name_args, spec[1], spec[2], - defaulted_vals, - formatvalue=lambda x: '=' + x) - - def _name_error(name): - raise NameError( - "Can't invoke function '%s', as the proxy object has " - "not yet been " - "established for the Alembic '%s' class. " - "Try placing this code inside a callable." % ( - name, cls.__name__ - )) - globals_['_name_error'] = _name_error - - func_text = textwrap.dedent("""\ - def %(name)s(%(args)s): - %(doc)r - try: - p = _proxy - except NameError: - _name_error('%(name)s') - return _proxy.%(name)s(%(apply_kw)s) - e - """ % { - 'name': name, - 'args': args[1:-1], - 'apply_kw': apply_kw[1:-1], - 'doc': fn.__doc__, - }) - lcl = {} - exec_(func_text, globals_, lcl) - return lcl[name] - - for methname in dir(cls): - if not methname.startswith('_'): - if callable(getattr(cls, methname)): - locals_[methname] = _create_op_proxy(methname) - else: - attr_names.add(methname) - - -def write_outstream(stream, *text): - encoding = getattr(stream, 'encoding', 'ascii') or 'ascii' - for t in text: - if not isinstance(t, binary_type): - t = t.encode(encoding, 'replace') - t = t.decode(encoding) - try: - stream.write(t) - except IOError: - # suppress "broken pipe" errors. - # no known way to handle this on Python 3 however - # as the exception is "ignored" (noisily) in TextIOWrapper. - break - - -def coerce_resource_to_filename(fname): - """Interpret a filename as either a filesystem location or as a package - resource. - - Names that are non absolute paths and contain a colon - are interpreted as resources and coerced to a file location. - - """ - if not os.path.isabs(fname) and ":" in fname: - import pkg_resources - fname = pkg_resources.resource_filename(*fname.split(':')) - return fname - - -def status(_statmsg, fn, *arg, **kw): - msg(_statmsg + " ...", False) - try: - ret = fn(*arg, **kw) - write_outstream(sys.stdout, " done\n") - return ret - except: - write_outstream(sys.stdout, " FAILED\n") - raise - - -def err(message): - log.error(message) - msg("FAILED: %s" % message) - sys.exit(-1) - - -def obfuscate_url_pw(u): - u = url.make_url(u) - if u.password: - u.password = 'XXXXX' - return str(u) - - -def asbool(value): - return value is not None and \ - value.lower() == 'true' - - -def warn(msg): - warnings.warn(msg) - - -def msg(msg, newline=True): - if TERMWIDTH is None: - write_outstream(sys.stdout, msg) - if newline: - write_outstream(sys.stdout, "\n") - else: - # left indent output lines - lines = textwrap.wrap(msg, TERMWIDTH) - if len(lines) > 1: - for line in lines[0:-1]: - write_outstream(sys.stdout, " ", line, "\n") - write_outstream(sys.stdout, " ", lines[-1], ("\n" if newline else "")) - - -def load_python_file(dir_, filename): - """Load a file from the given path as a Python module.""" - - module_id = re.sub(r'\W', "_", filename) - path = os.path.join(dir_, filename) - _, ext = os.path.splitext(filename) - if ext == ".py": - if os.path.exists(path): - module = load_module_py(module_id, path) - elif os.path.exists(simple_pyc_file_from_path(path)): - # look for sourceless load - module = load_module_pyc( - module_id, simple_pyc_file_from_path(path)) - else: - raise ImportError("Can't find Python file %s" % path) - elif ext in (".pyc", ".pyo"): - module = load_module_pyc(module_id, path) - del sys.modules[module_id] - return module - - -def simple_pyc_file_from_path(path): - """Given a python source path, return the so-called - "sourceless" .pyc or .pyo path. - - This just a .pyc or .pyo file where the .py file would be. - - Even with PEP-3147, which normally puts .pyc/.pyo files in __pycache__, - this use case remains supported as a so-called "sourceless module import". - - """ - if sys.flags.optimize: - return path + "o" # e.g. .pyo - else: - return path + "c" # e.g. .pyc - - -def pyc_file_from_path(path): - """Given a python source path, locate the .pyc. - - See http://www.python.org/dev/peps/pep-3147/ - #detecting-pep-3147-availability - http://www.python.org/dev/peps/pep-3147/#file-extension-checks - - """ - import imp - has3147 = hasattr(imp, 'get_tag') - if has3147: - return imp.cache_from_source(path) - else: - return simple_pyc_file_from_path(path) - - -def rev_id(): - val = int(uuid.uuid4()) % 100000000000000 - return hex(val)[2:-1] - - -def to_tuple(x, default=None): - if x is None: - return default - elif isinstance(x, string_types): - return (x, ) - elif isinstance(x, collections.Iterable): - return tuple(x) - else: - raise ValueError("Don't know how to turn %r into a tuple" % x) - - -def format_as_comma(value): - if value is None: - return "" - elif isinstance(value, string_types): - return value - elif isinstance(value, collections.Iterable): - return ", ".join(value) - else: - raise ValueError("Don't know how to comma-format %r" % value) - - -class memoized_property(object): - - """A read-only @property that is only evaluated once.""" - - def __init__(self, fget, doc=None): - self.fget = fget - self.__doc__ = doc or fget.__doc__ - self.__name__ = fget.__name__ - - def __get__(self, obj, cls): - if obj is None: - return self - obj.__dict__[self.__name__] = result = self.fget(obj) - return result - - -class immutabledict(dict): - - def _immutable(self, *arg, **kw): - raise TypeError("%s object is immutable" % self.__class__.__name__) - - __delitem__ = __setitem__ = __setattr__ = \ - clear = pop = popitem = setdefault = \ - update = _immutable - - def __new__(cls, *args): - new = dict.__new__(cls) - dict.__init__(new, *args) - return new - - def __init__(self, *args): - pass - - def __reduce__(self): - return immutabledict, (dict(self), ) - - def union(self, d): - if not self: - return immutabledict(d) - else: - d2 = immutabledict(self) - dict.update(d2, d) - return d2 - - def __repr__(self): - return "immutabledict(%s)" % dict.__repr__(self) - - -def _with_legacy_names(translations): - def decorate(fn): - - spec = inspect_getfullargspec(fn) - metadata = dict(target='target', fn='fn') - metadata.update(format_argspec_plus(spec, grouped=False)) - - has_keywords = bool(spec[2]) - - if not has_keywords: - metadata['args'] += ", **kw" - metadata['apply_kw'] += ", **kw" - - def go(*arg, **kw): - names = set(kw).difference(spec[0]) - for oldname, newname in translations: - if oldname in kw: - kw[newname] = kw.pop(oldname) - names.discard(oldname) - - warnings.warn( - "Argument '%s' is now named '%s' for function '%s'" % - (oldname, newname, fn.__name__)) - if not has_keywords and names: - raise TypeError("Unknown arguments: %s" % ", ".join(names)) - return fn(*arg, **kw) - - code = 'lambda %(args)s: %(target)s(%(apply_kw)s)' % ( - metadata) - decorated = eval(code, {"target": go}) - decorated.__defaults__ = getattr(fn, '__func__', fn).__defaults__ - update_wrapper(decorated, fn) - if hasattr(decorated, '__wrapped__'): - # update_wrapper in py3k applies __wrapped__, which causes - # inspect.getargspec() to ignore the extra arguments on our - # wrapper as of Python 3.4. We need this for the - # "module class proxy" thing though, so just del the __wrapped__ - # for now. See #175 as well as bugs.python.org/issue17482 - del decorated.__wrapped__ - return decorated - - return decorate |