summaryrefslogtreecommitdiff
path: root/test/test_memory_leaks.py
blob: a43e941eb56e9e9aa54ab72d4f6296d23247370f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
#!/usr/bin/env python

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""
A test script which attempts to detect memory leaks by calling C
functions many times and compare process memory usage before and
after the calls.  It might produce false positives.
"""

import functools
import gc
import os
import socket
import sys
import threading
import time

import psutil
import psutil._common
from psutil._compat import callable
from psutil._compat import xrange
from test_psutil import FREEBSD
from test_psutil import get_test_subprocess
from test_psutil import LINUX
from test_psutil import OPENBSD
from test_psutil import OSX
from test_psutil import POSIX
from test_psutil import reap_children
from test_psutil import RLIMIT_SUPPORT
from test_psutil import safe_remove
from test_psutil import SUNOS
from test_psutil import supports_ipv6
from test_psutil import TESTFN
from test_psutil import TRAVIS
from test_psutil import WINDOWS

if sys.version_info < (2, 7):
    import unittest2 as unittest  # https://pypi.python.org/pypi/unittest2
else:
    import unittest


LOOPS = 1000
MEMORY_TOLERANCE = 4096
SKIP_PYTHON_IMPL = True


def skip_if_linux():
    return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
                           "not worth being tested on LINUX (pure python)")


class Base(unittest.TestCase):
    proc = psutil.Process()

    def execute(self, function, *args, **kwargs):
        def call_many_times():
            for x in xrange(LOOPS - 1):
                self.call(function, *args, **kwargs)
            del x
            gc.collect()
            return self.get_mem()

        self.call(function, *args, **kwargs)
        self.assertEqual(gc.garbage, [])
        self.assertEqual(threading.active_count(), 1)

        # RSS comparison
        # step 1
        rss1 = call_many_times()
        # step 2
        rss2 = call_many_times()

        difference = rss2 - rss1
        if difference > MEMORY_TOLERANCE:
            # This doesn't necessarily mean we have a leak yet.
            # At this point we assume that after having called the
            # function so many times the memory usage is stabilized
            # and if there are no leaks it should not increase any
            # more.
            # Let's keep calling fun for 3 more seconds and fail if
            # we notice any difference.
            stop_at = time.time() + 3
            while True:
                self.call(function, *args, **kwargs)
                if time.time() >= stop_at:
                    break
            del stop_at
            gc.collect()
            rss3 = self.get_mem()
            difference = rss3 - rss2
            if rss3 > rss2:
                self.fail("rss2=%s, rss3=%s, difference=%s"
                          % (rss2, rss3, difference))

    def execute_w_exc(self, exc, function, *args, **kwargs):
        kwargs['_exc'] = exc
        self.execute(function, *args, **kwargs)

    def get_mem(self):
        return psutil.Process().memory_info()[0]

    def call(self, function, *args, **kwargs):
        raise NotImplementedError("must be implemented in subclass")


class TestProcessObjectLeaks(Base):
    """Test leaks of Process class methods and properties"""

    def setUp(self):
        gc.collect()

    def tearDown(self):
        reap_children()

    def call(self, function, *args, **kwargs):
        if callable(function):
            if '_exc' in kwargs:
                exc = kwargs.pop('_exc')
                self.assertRaises(exc, function, *args, **kwargs)
            else:
                try:
                    function(*args, **kwargs)
                except psutil.Error:
                    pass
        else:
            meth = getattr(self.proc, function)
            if '_exc' in kwargs:
                exc = kwargs.pop('_exc')
                self.assertRaises(exc, meth, *args, **kwargs)
            else:
                try:
                    meth(*args, **kwargs)
                except psutil.Error:
                    pass

    @skip_if_linux()
    def test_name(self):
        self.execute('name')

    @skip_if_linux()
    def test_cmdline(self):
        self.execute('cmdline')

    @skip_if_linux()
    def test_exe(self):
        self.execute('exe')

    @skip_if_linux()
    def test_ppid(self):
        self.execute('ppid')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_uids(self):
        self.execute('uids')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_gids(self):
        self.execute('gids')

    @skip_if_linux()
    def test_status(self):
        self.execute('status')

    def test_nice_get(self):
        self.execute('nice')

    def test_nice_set(self):
        niceness = psutil.Process().nice()
        self.execute('nice', niceness)

    @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
                         "Linux and Windows Vista only")
    def test_ionice_get(self):
        self.execute('ionice')

    @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
                         "Linux and Windows Vista only")
    def test_ionice_set(self):
        if WINDOWS:
            value = psutil.Process().ionice()
            self.execute('ionice', value)
        else:
            from psutil._pslinux import cext
            self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
            fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
            self.execute_w_exc(OSError, fun)

    @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform")
    @skip_if_linux()
    def test_io_counters(self):
        self.execute('io_counters')

    @unittest.skipUnless(WINDOWS, "not worth being tested on posix")
    def test_username(self):
        self.execute('username')

    @skip_if_linux()
    def test_create_time(self):
        self.execute('create_time')

    @skip_if_linux()
    def test_num_threads(self):
        self.execute('num_threads')

    @unittest.skipUnless(WINDOWS, "Windows only")
    def test_num_handles(self):
        self.execute('num_handles')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_num_fds(self):
        self.execute('num_fds')

    @skip_if_linux()
    def test_threads(self):
        self.execute('threads')

    @skip_if_linux()
    def test_cpu_times(self):
        self.execute('cpu_times')

    @skip_if_linux()
    def test_memory_info(self):
        self.execute('memory_info')

    @skip_if_linux()
    def test_memory_info_ex(self):
        self.execute('memory_info_ex')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_terminal(self):
        self.execute('terminal')

    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
                     "not worth being tested on POSIX (pure python)")
    def test_resume(self):
        self.execute('resume')

    @skip_if_linux()
    def test_cwd(self):
        self.execute('cwd')

    @unittest.skipUnless(WINDOWS or LINUX or FREEBSD,
                         "Windows or Linux or BSD only")
    def test_cpu_affinity_get(self):
        self.execute('cpu_affinity')

    @unittest.skipUnless(WINDOWS or LINUX or FREEBSD,
                         "Windows or Linux or BSD only")
    def test_cpu_affinity_set(self):
        affinity = psutil.Process().cpu_affinity()
        self.execute('cpu_affinity', affinity)
        if not TRAVIS:
            self.execute_w_exc(ValueError, 'cpu_affinity', [-1])

    @skip_if_linux()
    def test_open_files(self):
        safe_remove(TESTFN)  # needed after UNIX socket test has run
        with open(TESTFN, 'w'):
            self.execute('open_files')

    # OSX implementation is unbelievably slow
    @unittest.skipIf(OSX, "OSX implementation is too slow")
    @unittest.skipIf(OPENBSD, "not implemented on OpenBSD")
    @skip_if_linux()
    def test_memory_maps(self):
        self.execute('memory_maps')

    @unittest.skipUnless(LINUX, "Linux only")
    @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
                         "only available on Linux >= 2.6.36")
    def test_rlimit_get(self):
        self.execute('rlimit', psutil.RLIMIT_NOFILE)

    @unittest.skipUnless(LINUX, "Linux only")
    @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
                         "only available on Linux >= 2.6.36")
    def test_rlimit_set(self):
        limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
        self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
        self.execute_w_exc(OSError, 'rlimit', -1)

    @skip_if_linux()
    # Windows implementation is based on a single system-wide function
    @unittest.skipIf(WINDOWS, "tested later")
    def test_connections(self):
        def create_socket(family, type):
            sock = socket.socket(family, type)
            sock.bind(('', 0))
            if type == socket.SOCK_STREAM:
                sock.listen(1)
            return sock

        socks = []
        socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
        socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
        if supports_ipv6():
            socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
            socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
        if hasattr(socket, 'AF_UNIX'):
            safe_remove(TESTFN)
            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            s.bind(TESTFN)
            s.listen(1)
            socks.append(s)
        kind = 'all'
        # TODO: UNIX sockets are temporarily implemented by parsing
        # 'pfiles' cmd  output; we don't want that part of the code to
        # be executed.
        if SUNOS:
            kind = 'inet'
        try:
            self.execute('connections', kind=kind)
        finally:
            for s in socks:
                s.close()

    @unittest.skipUnless(hasattr(psutil.Process, 'environ'),
                         "Linux and OSX")
    def test_environ(self):
        self.execute("environ")


p = get_test_subprocess()
DEAD_PROC = psutil.Process(p.pid)
DEAD_PROC.kill()
DEAD_PROC.wait()
del p


class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
    """Same as above but looks for leaks occurring when dealing with
    zombie processes raising NoSuchProcess exception.
    """
    proc = DEAD_PROC

    def call(self, *args, **kwargs):
        try:
            TestProcessObjectLeaks.call(self, *args, **kwargs)
        except psutil.NoSuchProcess:
            pass

    if not POSIX:
        def test_kill(self):
            self.execute('kill')

        def test_terminate(self):
            self.execute('terminate')

        def test_suspend(self):
            self.execute('suspend')

        def test_resume(self):
            self.execute('resume')

        def test_wait(self):
            self.execute('wait')


class TestModuleFunctionsLeaks(Base):
    """Test leaks of psutil module functions."""

    def setUp(self):
        gc.collect()

    def call(self, function, *args, **kwargs):
        fun = getattr(psutil, function)
        fun(*args, **kwargs)

    @skip_if_linux()
    def test_cpu_count_logical(self):
        self.execute('cpu_count', logical=True)

    @skip_if_linux()
    def test_cpu_count_physical(self):
        self.execute('cpu_count', logical=False)

    @skip_if_linux()
    def test_boot_time(self):
        self.execute('boot_time')

    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
                     "not worth being tested on POSIX (pure python)")
    def test_pid_exists(self):
        self.execute('pid_exists', os.getpid())

    def test_virtual_memory(self):
        self.execute('virtual_memory')

    # TODO: remove this skip when this gets fixed
    @unittest.skipIf(SUNOS,
                     "not worth being tested on SUNOS (uses a subprocess)")
    def test_swap_memory(self):
        self.execute('swap_memory')

    @skip_if_linux()
    def test_cpu_times(self):
        self.execute('cpu_times')

    @skip_if_linux()
    def test_per_cpu_times(self):
        self.execute('cpu_times', percpu=True)

    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
                     "not worth being tested on POSIX (pure python)")
    def test_disk_usage(self):
        self.execute('disk_usage', '.')

    def test_disk_partitions(self):
        self.execute('disk_partitions')

    @skip_if_linux()
    def test_net_io_counters(self):
        self.execute('net_io_counters')

    @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
                     '/proc/diskstats not available on this Linux version')
    @skip_if_linux()
    def test_disk_io_counters(self):
        self.execute('disk_io_counters')

    # XXX - on Windows this produces a false positive
    @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
    def test_users(self):
        self.execute('users')

    @unittest.skipIf(LINUX,
                     "not worth being tested on Linux (pure python)")
    @unittest.skipIf(OSX and os.getuid() != 0, "need root access")
    def test_net_connections(self):
        self.execute('net_connections')

    def test_net_if_addrs(self):
        self.execute('net_if_addrs')

    @unittest.skipIf(TRAVIS, "EPERM on travis")
    def test_net_if_stats(self):
        self.execute('net_if_stats')


def main():
    test_suite = unittest.TestSuite()
    tests = [TestProcessObjectLeaksZombie,
             TestProcessObjectLeaks,
             TestModuleFunctionsLeaks]
    for test in tests:
        test_suite.addTest(unittest.makeSuite(test))
    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
    return result.wasSuccessful()

if __name__ == '__main__':
    if not main():
        sys.exit(1)