1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""OS information for testing."""
from coverage import env
if env.WINDOWS:
# Windows implementation
def process_ram():
"""How much RAM is this process using? (Windows)"""
import ctypes
# From: http://lists.ubuntu.com/archives/bazaar-commits/2009-February/011990.html
class PROCESS_MEMORY_COUNTERS_EX(ctypes.Structure):
"""Used by GetProcessMemoryInfo"""
_fields_ = [
('cb', ctypes.c_ulong),
('PageFaultCount', ctypes.c_ulong),
('PeakWorkingSetSize', ctypes.c_size_t),
('WorkingSetSize', ctypes.c_size_t),
('QuotaPeakPagedPoolUsage', ctypes.c_size_t),
('QuotaPagedPoolUsage', ctypes.c_size_t),
('QuotaPeakNonPagedPoolUsage', ctypes.c_size_t),
('QuotaNonPagedPoolUsage', ctypes.c_size_t),
('PagefileUsage', ctypes.c_size_t),
('PeakPagefileUsage', ctypes.c_size_t),
('PrivateUsage', ctypes.c_size_t),
]
mem_struct = PROCESS_MEMORY_COUNTERS_EX()
ret = ctypes.windll.psapi.GetProcessMemoryInfo(
ctypes.windll.kernel32.GetCurrentProcess(),
ctypes.byref(mem_struct),
ctypes.sizeof(mem_struct)
)
if not ret: # pragma: part covered
return 0 # pragma: cant happen
return mem_struct.PrivateUsage
elif env.LINUX:
# Linux implementation
import os
_scale = {'kb': 1024, 'mb': 1024*1024}
def _VmB(key):
"""Read the /proc/PID/status file to find memory use."""
try:
# Get pseudo file /proc/<pid>/status
with open('/proc/%d/status' % os.getpid()) as t:
v = t.read()
except OSError: # pragma: cant happen
return 0 # non-Linux?
# Get VmKey line e.g. 'VmRSS: 9999 kB\n ...'
i = v.index(key)
v = v[i:].split(None, 3)
if len(v) < 3: # pragma: part covered
return 0 # pragma: cant happen
# Convert Vm value to bytes.
return int(float(v[1]) * _scale[v[2].lower()])
def process_ram():
"""How much RAM is this process using? (Linux implementation)"""
return _VmB('VmRSS')
else:
# Generic implementation.
def process_ram():
"""How much RAM is this process using? (stdlib implementation)"""
import resource
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|