summaryrefslogtreecommitdiff
path: root/Lib/test/test_gc.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_gc.py')
-rw-r--r--Lib/test/test_gc.py143
1 files changed, 141 insertions, 2 deletions
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index e1c124d160..c59b72eacf 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -1,5 +1,6 @@
import unittest
-from test.support import verbose, run_unittest, strip_python_stderr
+from test.support import (verbose, refcount_test, run_unittest,
+ strip_python_stderr)
import sys
import time
import gc
@@ -37,6 +38,20 @@ class GC_Detector(object):
# gc collects it.
self.wr = weakref.ref(C1055820(666), it_happened)
+class Uncollectable(object):
+ """Create a reference cycle with multiple __del__ methods.
+
+ An object in a reference cycle will never have zero references,
+ and so must be garbage collected. If one or more objects in the
+ cycle have __del__ methods, the gc refuses to guess an order,
+ and leaves the cycle uncollected."""
+ def __init__(self, partner=None):
+ if partner is None:
+ self.partner = Uncollectable(partner=self)
+ else:
+ self.partner = partner
+ def __del__(self):
+ pass
### Tests
###############################################################################
@@ -181,6 +196,7 @@ class GCTests(unittest.TestCase):
del d
self.assertEqual(gc.collect(), 2)
+ @refcount_test
def test_frame(self):
def f():
frame = sys._getframe()
@@ -248,6 +264,7 @@ class GCTests(unittest.TestCase):
# For example, disposed tuples are not freed, but reused.
# To minimize variations, though, we first store the get_count() results
# and check them at the end.
+ @refcount_test
def test_get_count(self):
gc.collect()
a, b, c = gc.get_count()
@@ -261,6 +278,7 @@ class GCTests(unittest.TestCase):
# created (the list).
self.assertGreater(d, a)
+ @refcount_test
def test_collect_generations(self):
gc.collect()
# This object will "trickle" into generation N + 1 after
@@ -593,6 +611,127 @@ class GCTests(unittest.TestCase):
self.assertNotIn(b"uncollectable objects at shutdown", stderr)
+class GCCallbackTests(unittest.TestCase):
+ def setUp(self):
+ # Save gc state and disable it.
+ self.enabled = gc.isenabled()
+ gc.disable()
+ self.debug = gc.get_debug()
+ gc.set_debug(0)
+ gc.callbacks.append(self.cb1)
+ gc.callbacks.append(self.cb2)
+ self.othergarbage = []
+
+ def tearDown(self):
+ # Restore gc state
+ del self.visit
+ gc.callbacks.remove(self.cb1)
+ gc.callbacks.remove(self.cb2)
+ gc.set_debug(self.debug)
+ if self.enabled:
+ gc.enable()
+ # destroy any uncollectables
+ gc.collect()
+ for obj in gc.garbage:
+ if isinstance(obj, Uncollectable):
+ obj.partner = None
+ del gc.garbage[:]
+ del self.othergarbage
+ gc.collect()
+
+ def preclean(self):
+ # Remove all fluff from the system. Invoke this function
+ # manually rather than through self.setUp() for maximum
+ # safety.
+ self.visit = []
+ gc.collect()
+ garbage, gc.garbage[:] = gc.garbage[:], []
+ self.othergarbage.append(garbage)
+ self.visit = []
+
+ def cb1(self, phase, info):
+ self.visit.append((1, phase, dict(info)))
+
+ def cb2(self, phase, info):
+ self.visit.append((2, phase, dict(info)))
+ if phase == "stop" and hasattr(self, "cleanup"):
+ # Clean Uncollectable from garbage
+ uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
+ gc.garbage[:] = [e for e in gc.garbage
+ if not isinstance(e, Uncollectable)]
+ for e in uc:
+ e.partner = None
+
+ def test_collect(self):
+ self.preclean()
+ gc.collect()
+ # Algorithmically verify the contents of self.visit
+ # because it is long and tortuous.
+
+ # Count the number of visits to each callback
+ n = [v[0] for v in self.visit]
+ n1 = [i for i in n if i == 1]
+ n2 = [i for i in n if i == 2]
+ self.assertEqual(n1, [1]*2)
+ self.assertEqual(n2, [2]*2)
+
+ # Count that we got the right number of start and stop callbacks.
+ n = [v[1] for v in self.visit]
+ n1 = [i for i in n if i == "start"]
+ n2 = [i for i in n if i == "stop"]
+ self.assertEqual(n1, ["start"]*2)
+ self.assertEqual(n2, ["stop"]*2)
+
+ # Check that we got the right info dict for all callbacks
+ for v in self.visit:
+ info = v[2]
+ self.assertTrue("generation" in info)
+ self.assertTrue("collected" in info)
+ self.assertTrue("uncollectable" in info)
+
+ def test_collect_generation(self):
+ self.preclean()
+ gc.collect(2)
+ for v in self.visit:
+ info = v[2]
+ self.assertEqual(info["generation"], 2)
+
+ def test_collect_garbage(self):
+ self.preclean()
+ # Each of these cause four objects to be garbage: Two
+ # Uncolectables and their instance dicts.
+ Uncollectable()
+ Uncollectable()
+ C1055820(666)
+ gc.collect()
+ for v in self.visit:
+ if v[1] != "stop":
+ continue
+ info = v[2]
+ self.assertEqual(info["collected"], 2)
+ self.assertEqual(info["uncollectable"], 8)
+
+ # We should now have the Uncollectables in gc.garbage
+ self.assertEqual(len(gc.garbage), 4)
+ for e in gc.garbage:
+ self.assertIsInstance(e, Uncollectable)
+
+ # Now, let our callback handle the Uncollectable instances
+ self.cleanup=True
+ self.visit = []
+ gc.garbage[:] = []
+ gc.collect()
+ for v in self.visit:
+ if v[1] != "stop":
+ continue
+ info = v[2]
+ self.assertEqual(info["collected"], 0)
+ self.assertEqual(info["uncollectable"], 4)
+
+ # Uncollectables should be gone
+ self.assertEqual(len(gc.garbage), 0)
+
+
class GCTogglingTests(unittest.TestCase):
def setUp(self):
gc.enable()
@@ -746,7 +885,7 @@ def test_main():
try:
gc.collect() # Delete 2nd generation garbage
- run_unittest(GCTests, GCTogglingTests)
+ run_unittest(GCTests, GCTogglingTests, GCCallbackTests)
finally:
gc.set_debug(debug)
# test gc.enable() even if GC is disabled by default