summaryrefslogtreecommitdiff
path: root/redis/commands/graph/execution_plan.py
blob: a9b4ad076a6ce1131153c319ee0c421373c480ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import re


class ProfileStats:
    """
    ProfileStats, runtime execution statistics of operation.
    """

    def __init__(self, records_produced, execution_time):
        self.records_produced = records_produced
        self.execution_time = execution_time


class Operation:
    """
    Operation, single operation within execution plan.
    """

    def __init__(self, name, args=None, profile_stats=None):
        """
        Create a new operation.

        Args:
            name: string that represents the name of the operation
            args: operation arguments
            profile_stats: profile statistics
        """
        self.name = name
        self.args = args
        self.profile_stats = profile_stats
        self.children = []

    def append_child(self, child):
        if not isinstance(child, Operation) or self is child:
            raise Exception("child must be Operation")

        self.children.append(child)
        return self

    def child_count(self):
        return len(self.children)

    def __eq__(self, o: object) -> bool:
        if not isinstance(o, Operation):
            return False

        return self.name == o.name and self.args == o.args

    def __str__(self) -> str:
        args_str = "" if self.args is None else " | " + self.args
        return f"{self.name}{args_str}"


class ExecutionPlan:
    """
    ExecutionPlan, collection of operations.
    """

    def __init__(self, plan):
        """
        Create a new execution plan.

        Args:
            plan: array of strings that represents the collection operations
                  the output from GRAPH.EXPLAIN
        """
        if not isinstance(plan, list):
            raise Exception("plan must be an array")

        self.plan = plan
        self.structured_plan = self._operation_tree()

    def _compare_operations(self, root_a, root_b):
        """
        Compare execution plan operation tree

        Return: True if operation trees are equal, False otherwise
        """

        # compare current root
        if root_a != root_b:
            return False

        # make sure root have the same number of children
        if root_a.child_count() != root_b.child_count():
            return False

        # recursively compare children
        for i in range(root_a.child_count()):
            if not self._compare_operations(root_a.children[i], root_b.children[i]):
                return False

        return True

    def __str__(self) -> str:
        def aggraget_str(str_children):
            return "\n".join(
                [
                    "    " + line
                    for str_child in str_children
                    for line in str_child.splitlines()
                ]
            )

        def combine_str(x, y):
            return f"{x}\n{y}"

        return self._operation_traverse(
            self.structured_plan, str, aggraget_str, combine_str
        )

    def __eq__(self, o: object) -> bool:
        """Compares two execution plans

        Return: True if the two plans are equal False otherwise
        """
        # make sure 'o' is an execution-plan
        if not isinstance(o, ExecutionPlan):
            return False

        # get root for both plans
        root_a = self.structured_plan
        root_b = o.structured_plan

        # compare execution trees
        return self._compare_operations(root_a, root_b)

    def _operation_traverse(self, op, op_f, aggregate_f, combine_f):
        """
        Traverse operation tree recursively applying functions

        Args:
            op: operation to traverse
            op_f: function applied for each operation
            aggregate_f: aggregation function applied for all children of a single operation
            combine_f: combine function applied for the operation result and the children result
        """  # noqa
        # apply op_f for each operation
        op_res = op_f(op)
        if len(op.children) == 0:
            return op_res  # no children return
        else:
            # apply _operation_traverse recursively
            children = [
                self._operation_traverse(child, op_f, aggregate_f, combine_f)
                for child in op.children
            ]
            # combine the operation result with the children aggregated result
            return combine_f(op_res, aggregate_f(children))

    def _operation_tree(self):
        """Build the operation tree from the string representation"""

        # initial state
        i = 0
        level = 0
        stack = []
        current = None

        def _create_operation(args):
            profile_stats = None
            name = args[0].strip()
            args.pop(0)
            if len(args) > 0 and "Records produced" in args[-1]:
                records_produced = int(
                    re.search("Records produced: (\\d+)", args[-1]).group(1)
                )
                execution_time = float(
                    re.search("Execution time: (\\d+.\\d+) ms", args[-1]).group(1)
                )
                profile_stats = ProfileStats(records_produced, execution_time)
                args.pop(-1)
            return Operation(
                name, None if len(args) == 0 else args[0].strip(), profile_stats
            )

        # iterate plan operations
        while i < len(self.plan):
            current_op = self.plan[i]
            op_level = current_op.count("    ")
            if op_level == level:
                # if the operation level equal to the current level
                # set the current operation and move next
                child = _create_operation(current_op.split("|"))
                if current:
                    current = stack.pop()
                    current.append_child(child)
                current = child
                i += 1
            elif op_level == level + 1:
                # if the operation is child of the current operation
                # add it as child and set as current operation
                child = _create_operation(current_op.split("|"))
                current.append_child(child)
                stack.append(current)
                current = child
                level += 1
                i += 1
            elif op_level < level:
                # if the operation is not child of current operation
                # go back to it's parent operation
                levels_back = level - op_level + 1
                for _ in range(levels_back):
                    current = stack.pop()
                level -= levels_back
            else:
                raise Exception("corrupted plan")
        return stack[0]