summaryrefslogtreecommitdiff
path: root/test/testlib/orm.py
blob: d9664f52f2cfe105f13d4f66f83aec85b3bdc41f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import inspect, re
from testlib import config, testing
from testlib.compat import sorted

sa = None
orm = None


__all__ = 'mapper',


_whitespace = re.compile(r'^(\s+)')

def _find_pragma(lines, current):
    m = _whitespace.match(lines[current])
    basis = m and m.group() or ''

    for line in reversed(lines[0:current]):
        if 'testlib.pragma' in line:
            return line
        m = _whitespace.match(line)
        indent = m and m.group() or ''

        # simplistic detection:

        # >> # testlib.pragma foo
        # >> center_line()
        if indent == basis:
            break
        # >> # testlib.pragma foo
        # >> if fleem:
        # >>     center_line()
        if line.endswith(':'):
            break
    return None

def _make_blocker(method_name, fallback):
    """Creates tripwired variant of a method, raising when called.

    To excempt an invocation from blockage, there are two options.

    1) add a pragma in a comment::

        # testlib.pragma exempt:methodname
        offending_line()

    2) add a magic cookie to the function's namespace::
        __sa_baremethodname_exempt__ = True
        ...
        offending_line()
        another_offending_lines()

    The second is useful for testing and development.
    """

    if method_name.startswith('__') and method_name.endswith('__'):
        frame_marker = '__sa_%s_exempt__' % method_name[2:-2]
    else:
        frame_marker = '__sa_%s_exempt__' % method_name
    pragma_marker = 'exempt:' + method_name

    def method(self, *args, **kw):
        frame_r = None
        try:
            frame = inspect.stack()[1][0]
            frame_r = inspect.getframeinfo(frame, 9)

            module = frame.f_globals.get('__name__', '')

            type_ = type(self)

            pragma = _find_pragma(*frame_r[3:5])

            exempt = (
                (not module.startswith('sqlalchemy')) or
                (pragma and pragma_marker in pragma) or
                (frame_marker in frame.f_locals) or
                ('self' in frame.f_locals and
                 getattr(frame.f_locals['self'], frame_marker, False)))

            if exempt:
                supermeth = getattr(super(type_, self), method_name, None)
                if (supermeth is None or
                    getattr(supermeth, 'im_func', None) is method):
                    return fallback(self, *args, **kw)
                else:
                    return supermeth(*args, **kw)
            else:
                raise AssertionError(
                    "%s.%s called in %s, line %s in %s" % (
                    type_.__name__, method_name, module, frame_r[1], frame_r[2]))
        finally:
            del frame
    method.__name__ = method_name
    return method

def mapper(type_, *args, **kw):
    global orm
    if orm is None:
        from sqlalchemy import orm

    forbidden = [
        ('__hash__', 'unhashable', lambda s: id(s)),
        ('__eq__', 'noncomparable', lambda s, o: s is o),
        ('__ne__', 'noncomparable', lambda s, o: s is not o),
        ('__cmp__', 'noncomparable', lambda s, o: object.__cmp__(s, o)),
        ('__le__', 'noncomparable', lambda s, o: object.__le__(s, o)),
        ('__lt__', 'noncomparable', lambda s, o: object.__lt__(s, o)),
        ('__ge__', 'noncomparable', lambda s, o: object.__ge__(s, o)),
        ('__gt__', 'noncomparable', lambda s, o: object.__gt__(s, o)),
        ('__nonzero__', 'truthless', lambda s: 1), ]

    if type_.__bases__ == (object,):
        for method_name, option, fallback in forbidden:
            if (getattr(config.options, option, False) and
                method_name not in type_.__dict__):
                setattr(type_, method_name, _make_blocker(method_name, fallback))

    return orm.mapper(type_, *args, **kw)