summaryrefslogtreecommitdiff
path: root/tests/memdbg.py
blob: 0dd8c318e4ef51c8ff74a39b6ae7443f8f368dd7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import sys
import traceback

from cffi import api as _api


sys.modules["ssl"] = None
sys.modules["_hashlib"] = None


_ffi = _api.FFI()
_ffi.cdef(
    """
    void *malloc(size_t size);
    void free(void *ptr);
    void *realloc(void *ptr, size_t size);

    int  CRYPTO_set_mem_functions(
        void *(*m)(size_t),void *(*r)(void *,size_t), void (*f)(void *));

    int backtrace(void **buffer, int size);
    char **backtrace_symbols(void *const *buffer, int size);
    void backtrace_symbols_fd(void *const *buffer, int size, int fd);
    """
)  # noqa
_api = _ffi.verify(
    """
    #include <openssl/crypto.h>
    #include <stdlib.h>
    #include <execinfo.h>
    """,
    libraries=["crypto"],
)
C = _ffi.dlopen(None)

verbose = False


def log(s):
    if verbose:
        print(s)


def _backtrace():
    buf = _ffi.new("void*[]", 64)
    result = _api.backtrace(buf, len(buf))
    strings = _api.backtrace_symbols(buf, result)
    stack = [_ffi.string(strings[i]) for i in range(result)]
    C.free(strings)
    return stack


@_ffi.callback("void*(*)(size_t)")
def malloc(n):
    memory = C.malloc(n)
    python_stack = traceback.extract_stack(limit=3)
    c_stack = _backtrace()
    heap[memory] = [(n, python_stack, c_stack)]
    log("malloc(%d) -> %s" % (n, memory))
    return memory


@_ffi.callback("void*(*)(void*, size_t)")
def realloc(p, n):
    memory = C.realloc(p, n)
    old = heap.pop(p)

    python_stack = traceback.extract_stack(limit=3)
    c_stack = _backtrace()

    old.append((n, python_stack, c_stack))
    heap[memory] = old
    log("realloc(0x%x, %d) -> %s" % (int(_ffi.cast("int", p)), n, memory))
    return memory


@_ffi.callback("void(*)(void*)")
def free(p):
    if p != _ffi.NULL:
        C.free(p)
        del heap[p]
        log("free(0x%x)" % (int(_ffi.cast("int", p)),))


if _api.CRYPTO_set_mem_functions(malloc, realloc, free):
    log("Enabled memory debugging")
    heap = {}
else:
    log("Failed to enable memory debugging")
    heap = None