summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClaudiu Popa <pcmanticore@gmail.com>2016-05-31 18:44:55 +0100
committerClaudiu Popa <pcmanticore@gmail.com>2016-05-31 18:44:55 +0100
commit752da0693be421de04a30cdab2f55380875122a7 (patch)
treefb8f4f6fa8365bb85f063623866625e912bce4be
parent0442a3c822e902f40c4bc89d53375780354c7411 (diff)
downloadastroid-git-752da0693be421de04a30cdab2f55380875122a7.tar.gz
Split brain_stdlib into multiple modules and finally kill it.
-rw-r--r--astroid/brain/brain_collections.py35
-rw-r--r--astroid/brain/brain_hashlib.py38
-rw-r--r--astroid/brain/brain_multiprocessing.py104
-rw-r--r--astroid/brain/brain_namedtuple_enum.py (renamed from astroid/brain/brain_stdlib.py)232
-rw-r--r--astroid/brain/brain_subprocess.py78
-rw-r--r--astroid/brain/brain_threading.py20
6 files changed, 276 insertions, 231 deletions
diff --git a/astroid/brain/brain_collections.py b/astroid/brain/brain_collections.py
new file mode 100644
index 00000000..ad378618
--- /dev/null
+++ b/astroid/brain/brain_collections.py
@@ -0,0 +1,35 @@
+# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
+# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER
+
+import astroid
+
+
+def _collections_transform():
+ return astroid.parse('''
+ class defaultdict(dict):
+ default_factory = None
+ def __missing__(self, key): pass
+
+ class deque(object):
+ maxlen = 0
+ def __init__(self, iterable=None, maxlen=None):
+ self.iterable = iterable
+ def append(self, x): pass
+ def appendleft(self, x): pass
+ def clear(self): pass
+ def count(self, x): return 0
+ def extend(self, iterable): pass
+ def extendleft(self, iterable): pass
+ def pop(self): pass
+ def popleft(self): pass
+ def remove(self, value): pass
+ def reverse(self): pass
+ def rotate(self, n): pass
+ def __iter__(self): return self
+ def __reversed__(self): return self.iterable[::-1]
+ def __getitem__(self, index): pass
+ def __setitem__(self, index, value): pass
+ def __delitem__(self, index): pass
+ ''')
+
+astroid.register_module_extender(astroid.MANAGER, 'collections', _collections_transform)
diff --git a/astroid/brain/brain_hashlib.py b/astroid/brain/brain_hashlib.py
new file mode 100644
index 00000000..2ff43858
--- /dev/null
+++ b/astroid/brain/brain_hashlib.py
@@ -0,0 +1,38 @@
+# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
+# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER
+
+import six
+
+import astroid
+
+
+def _hashlib_transform():
+ template = '''
+ class %(name)s(object):
+ def __init__(self, value=''): pass
+ def digest(self):
+ return %(digest)s
+ def copy(self):
+ return self
+ def update(self, value): pass
+ def hexdigest(self):
+ return ''
+ @property
+ def name(self):
+ return %(name)r
+ @property
+ def block_size(self):
+ return 1
+ @property
+ def digest_size(self):
+ return 1
+ '''
+ algorithms = ('md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512')
+ classes = "".join(
+ template % {'name': hashfunc, 'digest': 'b""' if six.PY3 else '""'}
+ for hashfunc in algorithms)
+ return astroid.parse(classes)
+
+
+astroid.register_module_extender(astroid.MANAGER, 'hashlib', _hashlib_transform)
+
diff --git a/astroid/brain/brain_multiprocessing.py b/astroid/brain/brain_multiprocessing.py
new file mode 100644
index 00000000..d0fb0724
--- /dev/null
+++ b/astroid/brain/brain_multiprocessing.py
@@ -0,0 +1,104 @@
+
+# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
+# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER
+
+import sys
+
+import astroid
+from astroid import exceptions
+from astroid.interpreter import objects
+from astroid.tree.node_classes import InterpreterObject
+
+
+PY34 = sys.version_info[:2] >= (3, 4)
+
+
+def _multiprocessing_transform():
+ module = astroid.parse('''
+ from multiprocessing.managers import SyncManager
+ def Manager():
+ return SyncManager()
+ ''')
+ if not PY34:
+ return module
+
+ # On Python 3.4, multiprocessing uses a getattr lookup inside contexts,
+ # in order to get the attributes they need. Since it's extremely
+ # dynamic, we use this approach to fake it.
+ node = astroid.parse('''
+ from multiprocessing.context import DefaultContext, BaseContext
+ default = DefaultContext()
+ base = BaseContext()
+ ''')
+ try:
+ context = next(node['default'].infer())
+ base = next(node['base'].infer())
+ except exceptions.InferenceError:
+ return module
+
+ for node in (context, base):
+ for key, value in node.locals.items():
+ if key.startswith("_"):
+ continue
+
+ value = value[0]
+ if isinstance(value, astroid.FunctionDef):
+ # We need to rebind this, since otherwise
+ # it will have an extra argument (self).
+ value = objects.BoundMethod(value, node)
+ module.body.append(InterpreterObject(object_=value, name=key, parent=module))
+ return module
+
+
+def _multiprocessing_managers_transform():
+ return astroid.parse('''
+ import array
+ import threading
+ import multiprocessing.pool as pool
+
+ import six
+
+ class Namespace(object):
+ pass
+
+ class Value(object):
+ def __init__(self, typecode, value, lock=True):
+ self._typecode = typecode
+ self._value = value
+ def get(self):
+ return self._value
+ def set(self, value):
+ self._value = value
+ def __repr__(self):
+ return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
+ value = property(get, set)
+
+ def Array(typecode, sequence, lock=True):
+ return array.array(typecode, sequence)
+
+ class SyncManager(object):
+ Queue = JoinableQueue = six.moves.queue.Queue
+ Event = threading.Event
+ RLock = threading.RLock
+ BoundedSemaphore = threading.BoundedSemaphore
+ Condition = threading.Condition
+ Barrier = threading.Barrier
+ Pool = pool.Pool
+ list = list
+ dict = dict
+ Value = Value
+ Array = Array
+ Namespace = Namespace
+ __enter__ = lambda self: self
+ __exit__ = lambda *args: args
+
+ def start(self, initializer=None, initargs=None):
+ pass
+ def shutdown(self):
+ pass
+ ''')
+
+
+astroid.register_module_extender(astroid.MANAGER, 'multiprocessing', _multiprocessing_transform)
+astroid.register_module_extender(astroid.MANAGER, 'multiprocessing.managers',
+ _multiprocessing_managers_transform)
diff --git a/astroid/brain/brain_stdlib.py b/astroid/brain/brain_namedtuple_enum.py
index 6144f1bf..38c6c513 100644
--- a/astroid/brain/brain_stdlib.py
+++ b/astroid/brain/brain_namedtuple_enum.py
@@ -10,10 +10,8 @@ import sys
import textwrap
from astroid import (
- MANAGER, UseInferenceDefault, inference_tip,
- InferenceError, register_module_extender)
+ MANAGER, UseInferenceDefault, inference_tip)
from astroid import exceptions
-from astroid.interpreter.objects import BoundMethod
from astroid import nodes
from astroid.builder import AstroidBuilder
from astroid import parse
@@ -36,131 +34,6 @@ def infer_first(node, context):
return value
-# module specific transformation functions #####################################
-
-def hashlib_transform():
- template = '''
-
-class %(name)s(object):
- def __init__(self, value=''): pass
- def digest(self):
- return %(digest)s
- def copy(self):
- return self
- def update(self, value): pass
- def hexdigest(self):
- return ''
- @property
- def name(self):
- return %(name)r
- @property
- def block_size(self):
- return 1
- @property
- def digest_size(self):
- return 1
-'''
- algorithms = ('md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512')
- classes = "".join(
- template % {'name': hashfunc, 'digest': 'b""' if PY3K else '""'}
- for hashfunc in algorithms)
- return AstroidBuilder(MANAGER).string_build(classes)
-
-
-def collections_transform():
- return AstroidBuilder(MANAGER).string_build('''
-
-class defaultdict(dict):
- default_factory = None
- def __missing__(self, key): pass
-
-class deque(object):
- maxlen = 0
- def __init__(self, iterable=None, maxlen=None):
- self.iterable = iterable
- def append(self, x): pass
- def appendleft(self, x): pass
- def clear(self): pass
- def count(self, x): return 0
- def extend(self, iterable): pass
- def extendleft(self, iterable): pass
- def pop(self): pass
- def popleft(self): pass
- def remove(self, value): pass
- def reverse(self): pass
- def rotate(self, n): pass
- def __iter__(self): return self
- def __reversed__(self): return self.iterable[::-1]
- def __getitem__(self, index): pass
- def __setitem__(self, index, value): pass
- def __delitem__(self, index): pass
-''')
-
-
-def subprocess_transform():
- if PY3K:
- communicate = (bytes('string', 'ascii'), bytes('string', 'ascii'))
- communicate_signature = 'def communicate(self, input=None, timeout=None)'
- init = """
- def __init__(self, args, bufsize=0, executable=None,
- stdin=None, stdout=None, stderr=None,
- preexec_fn=None, close_fds=False, shell=False,
- cwd=None, env=None, universal_newlines=False,
- startupinfo=None, creationflags=0, restore_signals=True,
- start_new_session=False, pass_fds=()):
- pass
- """
- else:
- communicate = ('string', 'string')
- communicate_signature = 'def communicate(self, input=None)'
- init = """
- def __init__(self, args, bufsize=0, executable=None,
- stdin=None, stdout=None, stderr=None,
- preexec_fn=None, close_fds=False, shell=False,
- cwd=None, env=None, universal_newlines=False,
- startupinfo=None, creationflags=0):
- pass
- """
- if PY33:
- wait_signature = 'def wait(self, timeout=None)'
- else:
- wait_signature = 'def wait(self)'
- if PY3K:
- ctx_manager = '''
- def __enter__(self): return self
- def __exit__(self, *args): pass
- '''
- else:
- ctx_manager = ''
- code = textwrap.dedent('''
-
- class Popen(object):
- returncode = pid = 0
- stdin = stdout = stderr = file()
-
- %(init)s
-
- %(communicate_signature)s:
- return %(communicate)r
- %(wait_signature)s:
- return self.returncode
- def poll(self):
- return self.returncode
- def send_signal(self, signal):
- pass
- def terminate(self):
- pass
- def kill(self):
- pass
- %(ctx_manager)s
- ''' % {'init': init,
- 'communicate': communicate,
- 'communicate_signature': communicate_signature,
- 'wait_signature': wait_signature,
- 'ctx_manager': ctx_manager})
- return AstroidBuilder(MANAGER).string_build(code)
-
-
# namedtuple and Enum support
def _looks_like(node, name):
@@ -341,112 +214,9 @@ def infer_enum_class(enum_node):
break
return enum_node
-def multiprocessing_transform():
- module = AstroidBuilder(MANAGER).string_build(textwrap.dedent('''
- from multiprocessing.managers import SyncManager
- def Manager():
- return SyncManager()
- '''))
- if not PY34:
- return module
-
- # On Python 3.4, multiprocessing uses a getattr lookup inside contexts,
- # in order to get the attributes they need. Since it's extremely
- # dynamic, we use this approach to fake it.
- node = AstroidBuilder(MANAGER).string_build(textwrap.dedent('''
- from multiprocessing.context import DefaultContext, BaseContext
- default = DefaultContext()
- base = BaseContext()
- '''))
- try:
- context = next(node['default'].infer())
- base = next(node['base'].infer())
- except InferenceError:
- return module
-
- for node in (context, base):
- for key, value in node.locals.items():
- if key.startswith("_"):
- continue
-
- value = value[0]
- if isinstance(value, nodes.FunctionDef):
- # We need to rebind this, since otherwise
- # it will have an extra argument (self).
- value = BoundMethod(value, node)
- module.body.append(nodes.InterpreterObject(object_=value, name=key,
- parent=module))
- return module
-
-def multiprocessing_managers_transform():
- return AstroidBuilder(MANAGER).string_build(textwrap.dedent('''
- import array
- import threading
- import multiprocessing.pool as pool
-
- import six
-
- class Namespace(object):
- pass
-
- class Value(object):
- def __init__(self, typecode, value, lock=True):
- self._typecode = typecode
- self._value = value
- def get(self):
- return self._value
- def set(self, value):
- self._value = value
- def __repr__(self):
- return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
- value = property(get, set)
-
- def Array(typecode, sequence, lock=True):
- return array.array(typecode, sequence)
-
- class SyncManager(object):
- Queue = JoinableQueue = six.moves.queue.Queue
- Event = threading.Event
- RLock = threading.RLock
- BoundedSemaphore = threading.BoundedSemaphore
- Condition = threading.Condition
- Barrier = threading.Barrier
- Pool = pool.Pool
- list = list
- dict = dict
- Value = Value
- Array = Array
- Namespace = Namespace
- __enter__ = lambda self: self
- __exit__ = lambda *args: args
-
- def start(self, initializer=None, initargs=None):
- pass
- def shutdown(self):
- pass
- '''))
-
-def thread_transform():
- return AstroidBuilder(MANAGER).string_build('''
-class lock(object):
- def acquire(self, blocking=True):
- pass
- def release(self):
- pass
-
-def Lock():
- return lock()
-''')
MANAGER.register_transform(nodes.Call, inference_tip(infer_namedtuple),
_looks_like_namedtuple)
MANAGER.register_transform(nodes.Call, inference_tip(infer_enum),
_looks_like_enum)
MANAGER.register_transform(nodes.ClassDef, infer_enum_class)
-register_module_extender(MANAGER, 'hashlib', hashlib_transform)
-register_module_extender(MANAGER, 'collections', collections_transform)
-register_module_extender(MANAGER, 'subprocess', subprocess_transform)
-register_module_extender(MANAGER, 'multiprocessing.managers',
- multiprocessing_managers_transform)
-register_module_extender(MANAGER, 'multiprocessing', multiprocessing_transform)
-register_module_extender(MANAGER, 'threading', thread_transform)
diff --git a/astroid/brain/brain_subprocess.py b/astroid/brain/brain_subprocess.py
new file mode 100644
index 00000000..3136f450
--- /dev/null
+++ b/astroid/brain/brain_subprocess.py
@@ -0,0 +1,78 @@
+
+# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
+# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER
+
+import sys
+
+import astroid
+
+
+PY3K = sys.version_info[0] >= 3
+PY33 = sys.version_info >= (3, 3)
+
+
+def _subprocess_transform():
+ if PY3K:
+ communicate = (bytes('string', 'ascii'), bytes('string', 'ascii'))
+ communicate_signature = 'def communicate(self, input=None, timeout=None)'
+ init = """
+ def __init__(self, args, bufsize=0, executable=None,
+ stdin=None, stdout=None, stderr=None,
+ preexec_fn=None, close_fds=False, shell=False,
+ cwd=None, env=None, universal_newlines=False,
+ startupinfo=None, creationflags=0, restore_signals=True,
+ start_new_session=False, pass_fds=()):
+ pass
+ """
+ else:
+ communicate = ('string', 'string')
+ communicate_signature = 'def communicate(self, input=None)'
+ init = """
+ def __init__(self, args, bufsize=0, executable=None,
+ stdin=None, stdout=None, stderr=None,
+ preexec_fn=None, close_fds=False, shell=False,
+ cwd=None, env=None, universal_newlines=False,
+ startupinfo=None, creationflags=0):
+ pass
+ """
+ if PY33:
+ wait_signature = 'def wait(self, timeout=None)'
+ else:
+ wait_signature = 'def wait(self)'
+ if PY3K:
+ ctx_manager = '''
+ def __enter__(self): return self
+ def __exit__(self, *args): pass
+ '''
+ else:
+ ctx_manager = ''
+ code = '''
+
+ class Popen(object):
+ returncode = pid = 0
+ stdin = stdout = stderr = file()
+
+ %(init)s
+
+ %(communicate_signature)s:
+ return %(communicate)r
+ %(wait_signature)s:
+ return self.returncode
+ def poll(self):
+ return self.returncode
+ def send_signal(self, signal):
+ pass
+ def terminate(self):
+ pass
+ def kill(self):
+ pass
+ %(ctx_manager)s
+ ''' % {'init': init,
+ 'communicate': communicate,
+ 'communicate_signature': communicate_signature,
+ 'wait_signature': wait_signature,
+ 'ctx_manager': ctx_manager}
+ return astroid.parse(code)
+
+
+astroid.register_module_extender(astroid.MANAGER, 'subprocess', _subprocess_transform)
diff --git a/astroid/brain/brain_threading.py b/astroid/brain/brain_threading.py
new file mode 100644
index 00000000..98f7fb0f
--- /dev/null
+++ b/astroid/brain/brain_threading.py
@@ -0,0 +1,20 @@
+# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
+# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER
+
+import astroid
+
+
+def _thread_transform():
+ return astroid.parse('''
+ class lock(object):
+ def acquire(self, blocking=True):
+ pass
+ def release(self):
+ pass
+
+ def Lock():
+ return lock()
+ ''')
+
+
+astroid.register_module_extender(astroid.MANAGER, 'threading', _thread_transform)