summaryrefslogtreecommitdiff
path: root/Lib/test/test_importlib/test_locks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_importlib/test_locks.py')
-rw-r--r--Lib/test/test_importlib/test_locks.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py
new file mode 100644
index 0000000000..c373b11256
--- /dev/null
+++ b/Lib/test/test_importlib/test_locks.py
@@ -0,0 +1,129 @@
+from importlib import _bootstrap
+import sys
+import time
+import unittest
+import weakref
+
+from test import support
+
+try:
+ import threading
+except ImportError:
+ threading = None
+else:
+ from test import lock_tests
+
+
+LockType = _bootstrap._ModuleLock
+DeadlockError = _bootstrap._DeadlockError
+
+
+if threading is not None:
+ class ModuleLockAsRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(lambda: LockType("some_lock"))
+
+ # _is_owned() unsupported
+ test__is_owned = None
+ # acquire(blocking=False) unsupported
+ test_try_acquire = None
+ test_try_acquire_contended = None
+ # `with` unsupported
+ test_with = None
+ # acquire(timeout=...) unsupported
+ test_timeout = None
+ # _release_save() unsupported
+ test_release_save_unacquired = None
+
+else:
+ class ModuleLockAsRLockTests(unittest.TestCase):
+ pass
+
+
+@unittest.skipUnless(threading, "threads needed for this test")
+class DeadlockAvoidanceTests(unittest.TestCase):
+
+ def setUp(self):
+ try:
+ self.old_switchinterval = sys.getswitchinterval()
+ sys.setswitchinterval(0.000001)
+ except AttributeError:
+ self.old_switchinterval = None
+
+ def tearDown(self):
+ if self.old_switchinterval is not None:
+ sys.setswitchinterval(self.old_switchinterval)
+
+ def run_deadlock_avoidance_test(self, create_deadlock):
+ NLOCKS = 10
+ locks = [LockType(str(i)) for i in range(NLOCKS)]
+ pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
+ if create_deadlock:
+ NTHREADS = NLOCKS
+ else:
+ NTHREADS = NLOCKS - 1
+ barrier = threading.Barrier(NTHREADS)
+ results = []
+ def _acquire(lock):
+ """Try to acquire the lock. Return True on success, False on deadlock."""
+ try:
+ lock.acquire()
+ except DeadlockError:
+ return False
+ else:
+ return True
+ def f():
+ a, b = pairs.pop()
+ ra = _acquire(a)
+ barrier.wait()
+ rb = _acquire(b)
+ results.append((ra, rb))
+ if rb:
+ b.release()
+ if ra:
+ a.release()
+ lock_tests.Bunch(f, NTHREADS).wait_for_finished()
+ self.assertEqual(len(results), NTHREADS)
+ return results
+
+ def test_deadlock(self):
+ results = self.run_deadlock_avoidance_test(True)
+ # At least one of the threads detected a potential deadlock on its
+ # second acquire() call. It may be several of them, because the
+ # deadlock avoidance mechanism is conservative.
+ nb_deadlocks = results.count((True, False))
+ self.assertGreaterEqual(nb_deadlocks, 1)
+ self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
+
+ def test_no_deadlock(self):
+ results = self.run_deadlock_avoidance_test(False)
+ self.assertEqual(results.count((True, False)), 0)
+ self.assertEqual(results.count((True, True)), len(results))
+
+
+class LifetimeTests(unittest.TestCase):
+
+ def test_lock_lifetime(self):
+ name = "xyzzy"
+ self.assertNotIn(name, _bootstrap._module_locks)
+ lock = _bootstrap._get_module_lock(name)
+ self.assertIn(name, _bootstrap._module_locks)
+ wr = weakref.ref(lock)
+ del lock
+ support.gc_collect()
+ self.assertNotIn(name, _bootstrap._module_locks)
+ self.assertIsNone(wr())
+
+ def test_all_locks(self):
+ support.gc_collect()
+ self.assertEqual(0, len(_bootstrap._module_locks), _bootstrap._module_locks)
+
+
+@support.reap_threads
+def test_main():
+ support.run_unittest(ModuleLockAsRLockTests,
+ DeadlockAvoidanceTests,
+ LifetimeTests)
+
+
+if __name__ == '__main__':
+ test_main()