summaryrefslogtreecommitdiff
path: root/Lib/test/test_tracemalloc.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_tracemalloc.py')
-rw-r--r--Lib/test/test_tracemalloc.py219
1 files changed, 192 insertions, 27 deletions
diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py
index da89a9a871..790ab7ee60 100644
--- a/Lib/test/test_tracemalloc.py
+++ b/Lib/test/test_tracemalloc.py
@@ -11,9 +11,15 @@ try:
import threading
except ImportError:
threading = None
+try:
+ import _testcapi
+except ImportError:
+ _testcapi = None
+
EMPTY_STRING_SIZE = sys.getsizeof(b'')
+
def get_frames(nframe, lineno_delta):
frames = []
frame = sys._getframe(1)
@@ -37,28 +43,31 @@ def allocate_bytes(size):
def create_snapshots():
traceback_limit = 2
+ # _tracemalloc._get_traces() returns a list of (domain, size,
+ # traceback_frames) tuples. traceback_frames is a tuple of (filename,
+ # line_number) tuples.
raw_traces = [
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
- (2, (('a.py', 5), ('b.py', 4))),
+ (1, 2, (('a.py', 5), ('b.py', 4))),
- (66, (('b.py', 1),)),
+ (2, 66, (('b.py', 1),)),
- (7, (('<unknown>', 0),)),
+ (3, 7, (('<unknown>', 0),)),
]
snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
raw_traces2 = [
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
- (2, (('a.py', 5), ('b.py', 4))),
- (5000, (('a.py', 5), ('b.py', 4))),
+ (2, 2, (('a.py', 5), ('b.py', 4))),
+ (2, 5000, (('a.py', 5), ('b.py', 4))),
- (400, (('c.py', 578),)),
+ (4, 400, (('c.py', 578),)),
]
snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
@@ -126,7 +135,7 @@ class TestTracemallocEnabled(unittest.TestCase):
def find_trace(self, traces, traceback):
for trace in traces:
- if trace[1] == traceback._frames:
+ if trace[2] == traceback._frames:
return trace
self.fail("trace not found")
@@ -140,7 +149,7 @@ class TestTracemallocEnabled(unittest.TestCase):
trace = self.find_trace(traces, obj_traceback)
self.assertIsInstance(trace, tuple)
- size, traceback = trace
+ domain, size, traceback = trace
self.assertEqual(size, obj_size)
self.assertEqual(traceback, obj_traceback._frames)
@@ -167,9 +176,8 @@ class TestTracemallocEnabled(unittest.TestCase):
trace1 = self.find_trace(traces, obj1_traceback)
trace2 = self.find_trace(traces, obj2_traceback)
- size1, traceback1 = trace1
- size2, traceback2 = trace2
- self.assertEqual(traceback2, traceback1)
+ domain1, size1, traceback1 = trace1
+ domain2, size2, traceback2 = trace2
self.assertIs(traceback2, traceback1)
def test_get_traced_memory(self):
@@ -292,7 +300,7 @@ class TestSnapshot(unittest.TestCase):
maxDiff = 4000
def test_create_snapshot(self):
- raw_traces = [(5, (('a.py', 2),))]
+ raw_traces = [(0, 5, (('a.py', 2),))]
with contextlib.ExitStack() as stack:
stack.enter_context(patch.object(tracemalloc, 'is_tracing',
@@ -322,11 +330,11 @@ class TestSnapshot(unittest.TestCase):
# exclude b.py
snapshot3 = snapshot.filter_traces((filter1,))
self.assertEqual(snapshot3.traces._traces, [
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
- (2, (('a.py', 5), ('b.py', 4))),
- (7, (('<unknown>', 0),)),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (1, 2, (('a.py', 5), ('b.py', 4))),
+ (3, 7, (('<unknown>', 0),)),
])
# filter_traces() must not touch the original snapshot
@@ -335,10 +343,10 @@ class TestSnapshot(unittest.TestCase):
# only include two lines of a.py
snapshot4 = snapshot3.filter_traces((filter2, filter3))
self.assertEqual(snapshot4.traces._traces, [
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
- (10, (('a.py', 2), ('b.py', 4))),
- (2, (('a.py', 5), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (1, 2, (('a.py', 5), ('b.py', 4))),
])
# No filter: just duplicate the snapshot
@@ -349,6 +357,54 @@ class TestSnapshot(unittest.TestCase):
self.assertRaises(TypeError, snapshot.filter_traces, filter1)
+ def test_filter_traces_domain(self):
+ snapshot, snapshot2 = create_snapshots()
+ filter1 = tracemalloc.Filter(False, "a.py", domain=1)
+ filter2 = tracemalloc.Filter(True, "a.py", domain=1)
+
+ original_traces = list(snapshot.traces._traces)
+
+ # exclude a.py of domain 1
+ snapshot3 = snapshot.filter_traces((filter1,))
+ self.assertEqual(snapshot3.traces._traces, [
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (2, 66, (('b.py', 1),)),
+ (3, 7, (('<unknown>', 0),)),
+ ])
+
+ # include domain 1
+ snapshot3 = snapshot.filter_traces((filter1,))
+ self.assertEqual(snapshot3.traces._traces, [
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (2, 66, (('b.py', 1),)),
+ (3, 7, (('<unknown>', 0),)),
+ ])
+
+ def test_filter_traces_domain_filter(self):
+ snapshot, snapshot2 = create_snapshots()
+ filter1 = tracemalloc.DomainFilter(False, domain=3)
+ filter2 = tracemalloc.DomainFilter(True, domain=3)
+
+ # exclude domain 2
+ snapshot3 = snapshot.filter_traces((filter1,))
+ self.assertEqual(snapshot3.traces._traces, [
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (0, 10, (('a.py', 2), ('b.py', 4))),
+ (1, 2, (('a.py', 5), ('b.py', 4))),
+ (2, 66, (('b.py', 1),)),
+ ])
+
+ # include domain 2
+ snapshot3 = snapshot.filter_traces((filter2,))
+ self.assertEqual(snapshot3.traces._traces, [
+ (3, 7, (('<unknown>', 0),)),
+ ])
+
def test_snapshot_group_by_line(self):
snapshot, snapshot2 = create_snapshots()
tb_0 = traceback_lineno('<unknown>', 0)
@@ -816,12 +872,121 @@ class TestCommandLine(unittest.TestCase):
assert_python_ok('-X', 'tracemalloc', '-c', code)
+@unittest.skipIf(_testcapi is None, 'need _testcapi')
+class TestCAPI(unittest.TestCase):
+ maxDiff = 80 * 20
+
+ def setUp(self):
+ if tracemalloc.is_tracing():
+ self.skipTest("tracemalloc must be stopped before the test")
+
+ self.domain = 5
+ self.size = 123
+ self.obj = allocate_bytes(self.size)[0]
+
+ # for the type "object", id(obj) is the address of its memory block.
+ # This type is not tracked by the garbage collector
+ self.ptr = id(self.obj)
+
+ def tearDown(self):
+ tracemalloc.stop()
+
+ def get_traceback(self):
+ frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
+ if frames is not None:
+ return tracemalloc.Traceback(frames)
+ else:
+ return None
+
+ def track(self, release_gil=False, nframe=1):
+ frames = get_frames(nframe, 2)
+ _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
+ release_gil)
+ return frames
+
+ def untrack(self):
+ _testcapi.tracemalloc_untrack(self.domain, self.ptr)
+
+ def get_traced_memory(self):
+ # Get the traced size in the domain
+ snapshot = tracemalloc.take_snapshot()
+ domain_filter = tracemalloc.DomainFilter(True, self.domain)
+ snapshot = snapshot.filter_traces([domain_filter])
+ return sum(trace.size for trace in snapshot.traces)
+
+ def check_track(self, release_gil):
+ nframe = 5
+ tracemalloc.start(nframe)
+
+ size = tracemalloc.get_traced_memory()[0]
+
+ frames = self.track(release_gil, nframe)
+ self.assertEqual(self.get_traceback(),
+ tracemalloc.Traceback(frames))
+
+ self.assertEqual(self.get_traced_memory(), self.size)
+
+ def test_track(self):
+ self.check_track(False)
+
+ def test_track_without_gil(self):
+ # check that calling _PyTraceMalloc_Track() without holding the GIL
+ # works too
+ self.check_track(True)
+
+ def test_track_already_tracked(self):
+ nframe = 5
+ tracemalloc.start(nframe)
+
+ # track a first time
+ self.track()
+
+ # calling _PyTraceMalloc_Track() must remove the old trace and add
+ # a new trace with the new traceback
+ frames = self.track(nframe=nframe)
+ self.assertEqual(self.get_traceback(),
+ tracemalloc.Traceback(frames))
+
+ def test_untrack(self):
+ tracemalloc.start()
+
+ self.track()
+ self.assertIsNotNone(self.get_traceback())
+ self.assertEqual(self.get_traced_memory(), self.size)
+
+ # untrack must remove the trace
+ self.untrack()
+ self.assertIsNone(self.get_traceback())
+ self.assertEqual(self.get_traced_memory(), 0)
+
+ # calling _PyTraceMalloc_Untrack() multiple times must not crash
+ self.untrack()
+ self.untrack()
+
+ def test_stop_track(self):
+ tracemalloc.start()
+ tracemalloc.stop()
+
+ with self.assertRaises(RuntimeError):
+ self.track()
+ self.assertIsNone(self.get_traceback())
+
+ def test_stop_untrack(self):
+ tracemalloc.start()
+ self.track()
+
+ tracemalloc.stop()
+ with self.assertRaises(RuntimeError):
+ self.untrack()
+
+
def test_main():
support.run_unittest(
TestTracemallocEnabled,
TestSnapshot,
TestFilters,
TestCommandLine,
+ TestCAPI,
)
if __name__ == "__main__":