1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
from . import util
frozen_init, source_init = util.import_importlib('importlib')
frozen_bootstrap = frozen_init._bootstrap
source_bootstrap = source_init._bootstrap
import sys
import time
import unittest
import weakref
from test import support
try:
import threading
except ImportError:
threading = None
else:
from test import lock_tests
if threading is not None:
class ModuleLockAsRLockTests:
locktype = classmethod(lambda cls: cls.LockType("some_lock"))
# _is_owned() unsupported
test__is_owned = None
# acquire(blocking=False) unsupported
test_try_acquire = None
test_try_acquire_contended = None
# `with` unsupported
test_with = None
# acquire(timeout=...) unsupported
test_timeout = None
# _release_save() unsupported
test_release_save_unacquired = None
class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
LockType = frozen_bootstrap._ModuleLock
class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
LockType = source_bootstrap._ModuleLock
else:
class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
pass
class Source_ModuleLockAsRLockTests(unittest.TestCase):
pass
class DeadlockAvoidanceTests:
def setUp(self):
try:
self.old_switchinterval = sys.getswitchinterval()
sys.setswitchinterval(0.000001)
except AttributeError:
self.old_switchinterval = None
def tearDown(self):
if self.old_switchinterval is not None:
sys.setswitchinterval(self.old_switchinterval)
def run_deadlock_avoidance_test(self, create_deadlock):
NLOCKS = 10
locks = [self.LockType(str(i)) for i in range(NLOCKS)]
pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
if create_deadlock:
NTHREADS = NLOCKS
else:
NTHREADS = NLOCKS - 1
barrier = threading.Barrier(NTHREADS)
results = []
def _acquire(lock):
"""Try to acquire the lock. Return True on success, False on deadlock."""
try:
lock.acquire()
except self.DeadlockError:
return False
else:
return True
def f():
a, b = pairs.pop()
ra = _acquire(a)
barrier.wait()
rb = _acquire(b)
results.append((ra, rb))
if rb:
b.release()
if ra:
a.release()
lock_tests.Bunch(f, NTHREADS).wait_for_finished()
self.assertEqual(len(results), NTHREADS)
return results
def test_deadlock(self):
results = self.run_deadlock_avoidance_test(True)
# At least one of the threads detected a potential deadlock on its
# second acquire() call. It may be several of them, because the
# deadlock avoidance mechanism is conservative.
nb_deadlocks = results.count((True, False))
self.assertGreaterEqual(nb_deadlocks, 1)
self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
def test_no_deadlock(self):
results = self.run_deadlock_avoidance_test(False)
self.assertEqual(results.count((True, False)), 0)
self.assertEqual(results.count((True, True)), len(results))
@unittest.skipUnless(threading, "threads needed for this test")
class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
LockType = frozen_bootstrap._ModuleLock
DeadlockError = frozen_bootstrap._DeadlockError
@unittest.skipUnless(threading, "threads needed for this test")
class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
LockType = source_bootstrap._ModuleLock
DeadlockError = source_bootstrap._DeadlockError
class LifetimeTests:
def test_lock_lifetime(self):
name = "xyzzy"
self.assertNotIn(name, self.bootstrap._module_locks)
lock = self.bootstrap._get_module_lock(name)
self.assertIn(name, self.bootstrap._module_locks)
wr = weakref.ref(lock)
del lock
support.gc_collect()
self.assertNotIn(name, self.bootstrap._module_locks)
self.assertIsNone(wr())
def test_all_locks(self):
support.gc_collect()
self.assertEqual(0, len(self.bootstrap._module_locks),
self.bootstrap._module_locks)
class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase):
bootstrap = frozen_bootstrap
class Source_LifetimeTests(LifetimeTests, unittest.TestCase):
bootstrap = source_bootstrap
@support.reap_threads
def test_main():
support.run_unittest(Frozen_ModuleLockAsRLockTests,
Source_ModuleLockAsRLockTests,
Frozen_DeadlockAvoidanceTests,
Source_DeadlockAvoidanceTests,
Frozen_LifetimeTests,
Source_LifetimeTests)
if __name__ == '__main__':
test_main()
|