diff options
author | Claudiu Popa <pcmanticore@gmail.com> | 2016-05-31 18:44:55 +0100 |
---|---|---|
committer | Claudiu Popa <pcmanticore@gmail.com> | 2016-05-31 18:44:55 +0100 |
commit | 752da0693be421de04a30cdab2f55380875122a7 (patch) | |
tree | fb8f4f6fa8365bb85f063623866625e912bce4be | |
parent | 0442a3c822e902f40c4bc89d53375780354c7411 (diff) | |
download | astroid-git-752da0693be421de04a30cdab2f55380875122a7.tar.gz |
Split brain_stdlib into multiple modules and finally kill it.
-rw-r--r-- | astroid/brain/brain_collections.py | 35 | ||||
-rw-r--r-- | astroid/brain/brain_hashlib.py | 38 | ||||
-rw-r--r-- | astroid/brain/brain_multiprocessing.py | 104 | ||||
-rw-r--r-- | astroid/brain/brain_namedtuple_enum.py (renamed from astroid/brain/brain_stdlib.py) | 232 | ||||
-rw-r--r-- | astroid/brain/brain_subprocess.py | 78 | ||||
-rw-r--r-- | astroid/brain/brain_threading.py | 20 |
6 files changed, 276 insertions, 231 deletions
diff --git a/astroid/brain/brain_collections.py b/astroid/brain/brain_collections.py new file mode 100644 index 00000000..ad378618 --- /dev/null +++ b/astroid/brain/brain_collections.py @@ -0,0 +1,35 @@ +# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html +# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER + +import astroid + + +def _collections_transform(): + return astroid.parse(''' + class defaultdict(dict): + default_factory = None + def __missing__(self, key): pass + + class deque(object): + maxlen = 0 + def __init__(self, iterable=None, maxlen=None): + self.iterable = iterable + def append(self, x): pass + def appendleft(self, x): pass + def clear(self): pass + def count(self, x): return 0 + def extend(self, iterable): pass + def extendleft(self, iterable): pass + def pop(self): pass + def popleft(self): pass + def remove(self, value): pass + def reverse(self): pass + def rotate(self, n): pass + def __iter__(self): return self + def __reversed__(self): return self.iterable[::-1] + def __getitem__(self, index): pass + def __setitem__(self, index, value): pass + def __delitem__(self, index): pass + ''') + +astroid.register_module_extender(astroid.MANAGER, 'collections', _collections_transform) diff --git a/astroid/brain/brain_hashlib.py b/astroid/brain/brain_hashlib.py new file mode 100644 index 00000000..2ff43858 --- /dev/null +++ b/astroid/brain/brain_hashlib.py @@ -0,0 +1,38 @@ +# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html +# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER + +import six + +import astroid + + +def _hashlib_transform(): + template = ''' + class %(name)s(object): + def __init__(self, value=''): pass + def digest(self): + return %(digest)s + def copy(self): + return self + def update(self, value): pass + def hexdigest(self): + return '' + @property + def name(self): + return %(name)r + @property + def block_size(self): + return 1 + @property + def digest_size(self): + return 1 + ''' + algorithms = ('md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512') + classes = "".join( + template % {'name': hashfunc, 'digest': 'b""' if six.PY3 else '""'} + for hashfunc in algorithms) + return astroid.parse(classes) + + +astroid.register_module_extender(astroid.MANAGER, 'hashlib', _hashlib_transform) + diff --git a/astroid/brain/brain_multiprocessing.py b/astroid/brain/brain_multiprocessing.py new file mode 100644 index 00000000..d0fb0724 --- /dev/null +++ b/astroid/brain/brain_multiprocessing.py @@ -0,0 +1,104 @@ + +# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html +# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER + +import sys + +import astroid +from astroid import exceptions +from astroid.interpreter import objects +from astroid.tree.node_classes import InterpreterObject + + +PY34 = sys.version_info[:2] >= (3, 4) + + +def _multiprocessing_transform(): + module = astroid.parse(''' + from multiprocessing.managers import SyncManager + def Manager(): + return SyncManager() + ''') + if not PY34: + return module + + # On Python 3.4, multiprocessing uses a getattr lookup inside contexts, + # in order to get the attributes they need. Since it's extremely + # dynamic, we use this approach to fake it. + node = astroid.parse(''' + from multiprocessing.context import DefaultContext, BaseContext + default = DefaultContext() + base = BaseContext() + ''') + try: + context = next(node['default'].infer()) + base = next(node['base'].infer()) + except exceptions.InferenceError: + return module + + for node in (context, base): + for key, value in node.locals.items(): + if key.startswith("_"): + continue + + value = value[0] + if isinstance(value, astroid.FunctionDef): + # We need to rebind this, since otherwise + # it will have an extra argument (self). + value = objects.BoundMethod(value, node) + module.body.append(InterpreterObject(object_=value, name=key, parent=module)) + return module + + +def _multiprocessing_managers_transform(): + return astroid.parse(''' + import array + import threading + import multiprocessing.pool as pool + + import six + + class Namespace(object): + pass + + class Value(object): + def __init__(self, typecode, value, lock=True): + self._typecode = typecode + self._value = value + def get(self): + return self._value + def set(self, value): + self._value = value + def __repr__(self): + return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) + value = property(get, set) + + def Array(typecode, sequence, lock=True): + return array.array(typecode, sequence) + + class SyncManager(object): + Queue = JoinableQueue = six.moves.queue.Queue + Event = threading.Event + RLock = threading.RLock + BoundedSemaphore = threading.BoundedSemaphore + Condition = threading.Condition + Barrier = threading.Barrier + Pool = pool.Pool + list = list + dict = dict + Value = Value + Array = Array + Namespace = Namespace + __enter__ = lambda self: self + __exit__ = lambda *args: args + + def start(self, initializer=None, initargs=None): + pass + def shutdown(self): + pass + ''') + + +astroid.register_module_extender(astroid.MANAGER, 'multiprocessing', _multiprocessing_transform) +astroid.register_module_extender(astroid.MANAGER, 'multiprocessing.managers', + _multiprocessing_managers_transform) diff --git a/astroid/brain/brain_stdlib.py b/astroid/brain/brain_namedtuple_enum.py index 6144f1bf..38c6c513 100644 --- a/astroid/brain/brain_stdlib.py +++ b/astroid/brain/brain_namedtuple_enum.py @@ -10,10 +10,8 @@ import sys import textwrap from astroid import ( - MANAGER, UseInferenceDefault, inference_tip, - InferenceError, register_module_extender) + MANAGER, UseInferenceDefault, inference_tip) from astroid import exceptions -from astroid.interpreter.objects import BoundMethod from astroid import nodes from astroid.builder import AstroidBuilder from astroid import parse @@ -36,131 +34,6 @@ def infer_first(node, context): return value -# module specific transformation functions ##################################### - -def hashlib_transform(): - template = ''' - -class %(name)s(object): - def __init__(self, value=''): pass - def digest(self): - return %(digest)s - def copy(self): - return self - def update(self, value): pass - def hexdigest(self): - return '' - @property - def name(self): - return %(name)r - @property - def block_size(self): - return 1 - @property - def digest_size(self): - return 1 -''' - algorithms = ('md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512') - classes = "".join( - template % {'name': hashfunc, 'digest': 'b""' if PY3K else '""'} - for hashfunc in algorithms) - return AstroidBuilder(MANAGER).string_build(classes) - - -def collections_transform(): - return AstroidBuilder(MANAGER).string_build(''' - -class defaultdict(dict): - default_factory = None - def __missing__(self, key): pass - -class deque(object): - maxlen = 0 - def __init__(self, iterable=None, maxlen=None): - self.iterable = iterable - def append(self, x): pass - def appendleft(self, x): pass - def clear(self): pass - def count(self, x): return 0 - def extend(self, iterable): pass - def extendleft(self, iterable): pass - def pop(self): pass - def popleft(self): pass - def remove(self, value): pass - def reverse(self): pass - def rotate(self, n): pass - def __iter__(self): return self - def __reversed__(self): return self.iterable[::-1] - def __getitem__(self, index): pass - def __setitem__(self, index, value): pass - def __delitem__(self, index): pass -''') - - -def subprocess_transform(): - if PY3K: - communicate = (bytes('string', 'ascii'), bytes('string', 'ascii')) - communicate_signature = 'def communicate(self, input=None, timeout=None)' - init = """ - def __init__(self, args, bufsize=0, executable=None, - stdin=None, stdout=None, stderr=None, - preexec_fn=None, close_fds=False, shell=False, - cwd=None, env=None, universal_newlines=False, - startupinfo=None, creationflags=0, restore_signals=True, - start_new_session=False, pass_fds=()): - pass - """ - else: - communicate = ('string', 'string') - communicate_signature = 'def communicate(self, input=None)' - init = """ - def __init__(self, args, bufsize=0, executable=None, - stdin=None, stdout=None, stderr=None, - preexec_fn=None, close_fds=False, shell=False, - cwd=None, env=None, universal_newlines=False, - startupinfo=None, creationflags=0): - pass - """ - if PY33: - wait_signature = 'def wait(self, timeout=None)' - else: - wait_signature = 'def wait(self)' - if PY3K: - ctx_manager = ''' - def __enter__(self): return self - def __exit__(self, *args): pass - ''' - else: - ctx_manager = '' - code = textwrap.dedent(''' - - class Popen(object): - returncode = pid = 0 - stdin = stdout = stderr = file() - - %(init)s - - %(communicate_signature)s: - return %(communicate)r - %(wait_signature)s: - return self.returncode - def poll(self): - return self.returncode - def send_signal(self, signal): - pass - def terminate(self): - pass - def kill(self): - pass - %(ctx_manager)s - ''' % {'init': init, - 'communicate': communicate, - 'communicate_signature': communicate_signature, - 'wait_signature': wait_signature, - 'ctx_manager': ctx_manager}) - return AstroidBuilder(MANAGER).string_build(code) - - # namedtuple and Enum support def _looks_like(node, name): @@ -341,112 +214,9 @@ def infer_enum_class(enum_node): break return enum_node -def multiprocessing_transform(): - module = AstroidBuilder(MANAGER).string_build(textwrap.dedent(''' - from multiprocessing.managers import SyncManager - def Manager(): - return SyncManager() - ''')) - if not PY34: - return module - - # On Python 3.4, multiprocessing uses a getattr lookup inside contexts, - # in order to get the attributes they need. Since it's extremely - # dynamic, we use this approach to fake it. - node = AstroidBuilder(MANAGER).string_build(textwrap.dedent(''' - from multiprocessing.context import DefaultContext, BaseContext - default = DefaultContext() - base = BaseContext() - ''')) - try: - context = next(node['default'].infer()) - base = next(node['base'].infer()) - except InferenceError: - return module - - for node in (context, base): - for key, value in node.locals.items(): - if key.startswith("_"): - continue - - value = value[0] - if isinstance(value, nodes.FunctionDef): - # We need to rebind this, since otherwise - # it will have an extra argument (self). - value = BoundMethod(value, node) - module.body.append(nodes.InterpreterObject(object_=value, name=key, - parent=module)) - return module - -def multiprocessing_managers_transform(): - return AstroidBuilder(MANAGER).string_build(textwrap.dedent(''' - import array - import threading - import multiprocessing.pool as pool - - import six - - class Namespace(object): - pass - - class Value(object): - def __init__(self, typecode, value, lock=True): - self._typecode = typecode - self._value = value - def get(self): - return self._value - def set(self, value): - self._value = value - def __repr__(self): - return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) - value = property(get, set) - - def Array(typecode, sequence, lock=True): - return array.array(typecode, sequence) - - class SyncManager(object): - Queue = JoinableQueue = six.moves.queue.Queue - Event = threading.Event - RLock = threading.RLock - BoundedSemaphore = threading.BoundedSemaphore - Condition = threading.Condition - Barrier = threading.Barrier - Pool = pool.Pool - list = list - dict = dict - Value = Value - Array = Array - Namespace = Namespace - __enter__ = lambda self: self - __exit__ = lambda *args: args - - def start(self, initializer=None, initargs=None): - pass - def shutdown(self): - pass - ''')) - -def thread_transform(): - return AstroidBuilder(MANAGER).string_build(''' -class lock(object): - def acquire(self, blocking=True): - pass - def release(self): - pass - -def Lock(): - return lock() -''') MANAGER.register_transform(nodes.Call, inference_tip(infer_namedtuple), _looks_like_namedtuple) MANAGER.register_transform(nodes.Call, inference_tip(infer_enum), _looks_like_enum) MANAGER.register_transform(nodes.ClassDef, infer_enum_class) -register_module_extender(MANAGER, 'hashlib', hashlib_transform) -register_module_extender(MANAGER, 'collections', collections_transform) -register_module_extender(MANAGER, 'subprocess', subprocess_transform) -register_module_extender(MANAGER, 'multiprocessing.managers', - multiprocessing_managers_transform) -register_module_extender(MANAGER, 'multiprocessing', multiprocessing_transform) -register_module_extender(MANAGER, 'threading', thread_transform) diff --git a/astroid/brain/brain_subprocess.py b/astroid/brain/brain_subprocess.py new file mode 100644 index 00000000..3136f450 --- /dev/null +++ b/astroid/brain/brain_subprocess.py @@ -0,0 +1,78 @@ + +# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html +# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER + +import sys + +import astroid + + +PY3K = sys.version_info[0] >= 3 +PY33 = sys.version_info >= (3, 3) + + +def _subprocess_transform(): + if PY3K: + communicate = (bytes('string', 'ascii'), bytes('string', 'ascii')) + communicate_signature = 'def communicate(self, input=None, timeout=None)' + init = """ + def __init__(self, args, bufsize=0, executable=None, + stdin=None, stdout=None, stderr=None, + preexec_fn=None, close_fds=False, shell=False, + cwd=None, env=None, universal_newlines=False, + startupinfo=None, creationflags=0, restore_signals=True, + start_new_session=False, pass_fds=()): + pass + """ + else: + communicate = ('string', 'string') + communicate_signature = 'def communicate(self, input=None)' + init = """ + def __init__(self, args, bufsize=0, executable=None, + stdin=None, stdout=None, stderr=None, + preexec_fn=None, close_fds=False, shell=False, + cwd=None, env=None, universal_newlines=False, + startupinfo=None, creationflags=0): + pass + """ + if PY33: + wait_signature = 'def wait(self, timeout=None)' + else: + wait_signature = 'def wait(self)' + if PY3K: + ctx_manager = ''' + def __enter__(self): return self + def __exit__(self, *args): pass + ''' + else: + ctx_manager = '' + code = ''' + + class Popen(object): + returncode = pid = 0 + stdin = stdout = stderr = file() + + %(init)s + + %(communicate_signature)s: + return %(communicate)r + %(wait_signature)s: + return self.returncode + def poll(self): + return self.returncode + def send_signal(self, signal): + pass + def terminate(self): + pass + def kill(self): + pass + %(ctx_manager)s + ''' % {'init': init, + 'communicate': communicate, + 'communicate_signature': communicate_signature, + 'wait_signature': wait_signature, + 'ctx_manager': ctx_manager} + return astroid.parse(code) + + +astroid.register_module_extender(astroid.MANAGER, 'subprocess', _subprocess_transform) diff --git a/astroid/brain/brain_threading.py b/astroid/brain/brain_threading.py new file mode 100644 index 00000000..98f7fb0f --- /dev/null +++ b/astroid/brain/brain_threading.py @@ -0,0 +1,20 @@ +# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html +# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER + +import astroid + + +def _thread_transform(): + return astroid.parse(''' + class lock(object): + def acquire(self, blocking=True): + pass + def release(self): + pass + + def Lock(): + return lock() + ''') + + +astroid.register_module_extender(astroid.MANAGER, 'threading', _thread_transform) |