summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorPierre Sassoulas <pierre.sassoulas@gmail.com>2023-02-12 17:12:23 +0100
committerPierre Sassoulas <pierre.sassoulas@gmail.com>2023-02-12 20:07:53 +0100
commitd57e34fd24df75ddf9876f911c1665c9c36b8b05 (patch)
tree5f24ea6329c940ab921037cfdc5927949b82debb /tests
parente809036dec464c49679768ab468066d544d06ad0 (diff)
downloadastroid-git-d57e34fd24df75ddf9876f911c1665c9c36b8b05.tar.gz
[brain tests] Burst threading from the main file
Diffstat (limited to 'tests')
-rw-r--r--tests/brain/test_brain.py43
-rw-r--r--tests/brain/test_exceptions.py0
-rw-r--r--tests/brain/test_threading.py54
3 files changed, 54 insertions, 43 deletions
diff --git a/tests/brain/test_brain.py b/tests/brain/test_brain.py
index 316e742f..dc12ea28 100644
--- a/tests/brain/test_brain.py
+++ b/tests/brain/test_brain.py
@@ -100,49 +100,6 @@ class ModuleExtenderTest(unittest.TestCase):
extender(n)
-class ThreadingBrainTest(unittest.TestCase):
- def test_lock(self) -> None:
- lock_instance = builder.extract_node(
- """
- import threading
- threading.Lock()
- """
- )
- inferred = next(lock_instance.infer())
- self.assert_is_valid_lock(inferred)
-
- acquire_method = inferred.getattr("acquire")[0]
- parameters = [param.name for param in acquire_method.args.args[1:]]
- assert parameters == ["blocking", "timeout"]
-
- assert inferred.getattr("locked")
-
- def test_rlock(self) -> None:
- self._test_lock_object("RLock")
-
- def test_semaphore(self) -> None:
- self._test_lock_object("Semaphore")
-
- def test_boundedsemaphore(self) -> None:
- self._test_lock_object("BoundedSemaphore")
-
- def _test_lock_object(self, object_name: str) -> None:
- lock_instance = builder.extract_node(
- f"""
- import threading
- threading.{object_name}()
- """
- )
- inferred = next(lock_instance.infer())
- self.assert_is_valid_lock(inferred)
-
- def assert_is_valid_lock(self, inferred: Instance) -> None:
- self.assertIsInstance(inferred, astroid.Instance)
- self.assertEqual(inferred.root().name, "threading")
- for method in ("acquire", "release", "__enter__", "__exit__"):
- self.assertIsInstance(next(inferred.igetattr(method)), astroid.BoundMethod)
-
-
def streams_are_fine():
"""Check if streams are being overwritten,
for example, by pytest
diff --git a/tests/brain/test_exceptions.py b/tests/brain/test_exceptions.py
deleted file mode 100644
index e69de29b..00000000
--- a/tests/brain/test_exceptions.py
+++ /dev/null
diff --git a/tests/brain/test_threading.py b/tests/brain/test_threading.py
new file mode 100644
index 00000000..f7da03d0
--- /dev/null
+++ b/tests/brain/test_threading.py
@@ -0,0 +1,54 @@
+# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
+# For details: https://github.com/PyCQA/astroid/blob/main/LICENSE
+# Copyright (c) https://github.com/PyCQA/astroid/blob/main/CONTRIBUTORS.txt
+
+from __future__ import annotations
+
+import unittest
+
+import astroid
+from astroid import builder
+from astroid.bases import Instance
+
+
+class ThreadingBrainTest(unittest.TestCase):
+ def test_lock(self) -> None:
+ lock_instance = builder.extract_node(
+ """
+ import threading
+ threading.Lock()
+ """
+ )
+ inferred = next(lock_instance.infer())
+ self.assert_is_valid_lock(inferred)
+
+ acquire_method = inferred.getattr("acquire")[0]
+ parameters = [param.name for param in acquire_method.args.args[1:]]
+ assert parameters == ["blocking", "timeout"]
+
+ assert inferred.getattr("locked")
+
+ def test_rlock(self) -> None:
+ self._test_lock_object("RLock")
+
+ def test_semaphore(self) -> None:
+ self._test_lock_object("Semaphore")
+
+ def test_boundedsemaphore(self) -> None:
+ self._test_lock_object("BoundedSemaphore")
+
+ def _test_lock_object(self, object_name: str) -> None:
+ lock_instance = builder.extract_node(
+ f"""
+ import threading
+ threading.{object_name}()
+ """
+ )
+ inferred = next(lock_instance.infer())
+ self.assert_is_valid_lock(inferred)
+
+ def assert_is_valid_lock(self, inferred: Instance) -> None:
+ self.assertIsInstance(inferred, astroid.Instance)
+ self.assertEqual(inferred.root().name, "threading")
+ for method in ("acquire", "release", "__enter__", "__exit__"):
+ self.assertIsInstance(next(inferred.igetattr(method)), astroid.BoundMethod)