diff options
Diffstat (limited to 'extra/stack_analyzer')
-rw-r--r-- | extra/stack_analyzer/example_annotation.yaml | 8 | ||||
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer.py | 500 | ||||
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer_unittest.py | 161 |
3 files changed, 468 insertions, 201 deletions
diff --git a/extra/stack_analyzer/example_annotation.yaml b/extra/stack_analyzer/example_annotation.yaml index 906342e762..3b96d90b89 100644 --- a/extra/stack_analyzer/example_annotation.yaml +++ b/extra/stack_analyzer/example_annotation.yaml @@ -20,4 +20,10 @@ remove: # Remove all callsites pointing to panic_assert_fail. - panic_assert_fail - panic -- software_panic +- [software_panic] +# Remove some invalid paths. +- [pd_send_request_msg, set_state, pd_power_supply_reset] +- [__tx_char, __tx_char] +- [pd_send_request_msg, set_state, set_state] +- [pd_send_request_msg, set_state, set_state, pd_power_supply_reset] +- [set_state, set_state, set_state] diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py index 58677eac80..30aac28728 100755 --- a/extra/stack_analyzer/stack_analyzer.py +++ b/extra/stack_analyzer/stack_analyzer.py @@ -177,12 +177,11 @@ class Callsite(object): if self.callee is None: return other.callee is None - else: - if other.callee is None: - return False + elif other.callee is None: + return False - # Assume the addresses of functions are unique. - return self.callee.address == other.callee.address + # Assume the addresses of functions are unique. + return self.callee.address == other.callee.address class Function(object): @@ -194,9 +193,7 @@ class Function(object): stack_frame: Size of stack frame. callsites: Callsite list. stack_max_usage: Max stack usage. None if it hasn't been analyzed. - stack_successor: Successor on the max stack usage path. None if it hasn't - been analyzed or it's the end. - cycle_index: Index of the cycle group. None if it hasn't been analyzed. + stack_max_path: Max stack usage path. None if it hasn't been analyzed. """ def __init__(self, address, name, stack_frame, callsites): @@ -213,14 +210,7 @@ class Function(object): self.stack_frame = stack_frame self.callsites = callsites self.stack_max_usage = None - self.stack_successor = None - # Node attributes for Tarjan's strongly connected components algorithm. - # TODO(cheyuw): The SCC node attributes should be moved out from the - # Function class. - self.scc_index = None - self.scc_lowlink = None - self.scc_onstack = False - self.cycle_index = None + self.stack_max_path = None def __eq__(self, other): """Function equality. @@ -234,26 +224,27 @@ class Function(object): if not isinstance(other, Function): return False - # TODO(cheyuw): Don't compare SCC node attributes here. if not (self.address == other.address and self.name == other.name and self.stack_frame == other.stack_frame and self.callsites == other.callsites and - self.stack_max_usage == other.stack_max_usage and - self.scc_index == other.scc_index and - self.scc_lowlink == other.scc_lowlink and - self.scc_onstack == other.scc_onstack and - self.cycle_index == other.cycle_index): + self.stack_max_usage == other.stack_max_usage): return False - if self.stack_successor is None: - return other.stack_successor is None - else: - if other.stack_successor is None: - return False + if self.stack_max_path is None: + return other.stack_max_path is None + elif other.stack_max_path is None: + return False + if len(self.stack_max_path) != len(other.stack_max_path): + return False + + for self_func, other_func in zip(self.stack_max_path, other.stack_max_path): # Assume the addresses of functions are unique. - return self.stack_successor.address == other.stack_successor.address + if self_func.address != other_func.address: + return False + + return True class ArmAnalyzer(object): @@ -756,7 +747,7 @@ class StackAnalyzer(object): return (name_result.group('name').strip(), path, linenum) add_rules = collections.defaultdict(set) - remove_rules = set() + remove_rules = list() invalid_sigtxts = set() if 'add' in self.annotation and self.annotation['add'] is not None: @@ -774,12 +765,21 @@ class StackAnalyzer(object): add_rules[src_sig].add(dst_sig) if 'remove' in self.annotation and self.annotation['remove'] is not None: - for remove_sigtxt in self.annotation['remove']: - remove_sig = NormalizeSignature(remove_sigtxt) - if remove_sig is None: - invalid_sigtxts.add(remove_sigtxt) - else: - remove_rules.add(remove_sig) + for remove_sigtxts in self.annotation['remove']: + if isinstance(remove_sigtxts, str): + remove_sigtxts = [remove_sigtxts] + + remove_path = [] + for remove_sigtxt in remove_sigtxts: + remove_sig = NormalizeSignature(remove_sigtxt) + if remove_sig is None: + invalid_sigtxts.add(remove_sigtxt) + else: + remove_path.append(remove_sig) + + if len(remove_path) == len(remove_sigtxts): + # All signatures are normalized. The remove path has no error. + remove_rules.append(remove_path) return (add_rules, remove_rules, invalid_sigtxts) @@ -790,16 +790,39 @@ class StackAnalyzer(object): function_map: Function map. Returns: - Set of added call edges, set of invalid paths, set of eliminated + Set of added call edges, list of remove paths, set of eliminated callsite addresses, set of annotation signatures which can't be resolved. """ + def StringifySignature(signature): + """Stringify the tupled signature. + + Args: + signature: Tupled signature. + + Returns: + Signature string. + """ + (name, path, linenum) = signature + bracket_text = '' + if path is not None: + path = os.path.relpath(path) + if linenum is None: + bracket_text = '[{}]'.format(path) + else: + bracket_text = '[{}:{}]'.format(path, linenum) + + return name + bracket_text + (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation() - signature_set = set(remove_rules) + signature_set = set() for src_sig, dst_sigs in add_rules.items(): signature_set.add(src_sig) signature_set.update(dst_sigs) + for remove_sigs in remove_rules: + signature_set.update(remove_sigs) + # Map signatures to functions. (signature_map, sig_error_map) = self.MapAnnotation(function_map, signature_set) @@ -826,7 +849,7 @@ class StackAnalyzer(object): # Generate the annotation sets. add_set = set() - remove_set = set() + remove_list = list() eliminated_addrs = set() for src_sig, dst_sigs in add_rules.items(): @@ -860,157 +883,297 @@ class StackAnalyzer(object): for dst_func in dst_funcs: add_set.add((src_func, dst_func)) - for remove_sig in remove_rules: - remove_funcs = signature_map.get(remove_sig) - if remove_funcs is not None: - # Add all the same functions. - remove_set.update(remove_funcs) + for remove_sigs in remove_rules: + # Since each signature can be mapped to multiple functions, generate + # multiple remove paths from all the combinations of these functions. + remove_paths = [[]] + skip_flag = False + for remove_sig in remove_sigs: + # Transform each signature to the corresponding functions. + remove_funcs = signature_map.get(remove_sig) + if remove_funcs is None: + # There is an unresolved signature in the remove path. Ignore the + # whole broken remove path. + skip_flag = True + break + else: + paths = [] + for remove_path in remove_paths: + # Append each function of the current signature to the all previous + # remove paths. + for remove_func in remove_funcs: + paths.append(remove_path + [remove_func]) + remove_paths = paths + + if skip_flag: + # Ignore the broken remove path. + continue + + for remove_path in remove_paths: + skip_flag = False + # Deduplicate the remove paths. + if remove_path not in remove_list: + remove_list.append(remove_path) + + # Format the error messages. failed_sigtxts = set() for sigtxt in invalid_sigtxts: failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID)) - # Translate the tupled failed signatures to text signatures. for sig, error in sig_error_map.items(): - (name, path, linenum) = sig - bracket_text = '' - if path is not None: - path = os.path.relpath(path) - if linenum is None: - bracket_text = '[{}]'.format(path) - else: - bracket_text = '[{}:{}]'.format(path, linenum) - - failed_sigtxts.add((name + bracket_text, error)) + failed_sigtxts.add((StringifySignature(sig), error)) - return (add_set, remove_set, eliminated_addrs, failed_sigtxts) + return (add_set, remove_list, eliminated_addrs, failed_sigtxts) - def PreprocessAnnotation(self, function_map, add_set, remove_set, + def PreprocessAnnotation(self, function_map, add_set, remove_list, eliminated_addrs): """Preprocess the annotation and callgraph. - Add the missing call edges, and remove simple invalid paths (the paths only - have one vertex) from the function_map. + Add the missing call edges, and delete simple remove paths (the paths have + one or two vertices) from the function_map. Eliminate the annotated indirect callsites. + Return the remaining remove list. + Args: function_map: Function map. add_set: Set of missing call edges. - remove_set: Set of invalid paths. + remove_list: List of remove paths. eliminated_addrs: Set of eliminated callsite addresses. + + Returns: + List of remaining remove paths. """ + def CheckEdge(path): + """Check if all edges of the path are on the callgraph. + + Args: + path: Path. + + Returns: + True or False. + """ + for index in range(len(path) - 1): + if (path[index], path[index + 1]) not in edge_set: + return False + + return True + for src_func, dst_func in add_set: # TODO(cheyuw): Support tailing call annotation. src_func.callsites.append( Callsite(None, dst_func.address, False, dst_func)) + # Delete simple remove paths. + remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2) + edge_set = set() for function in function_map.values(): cleaned_callsites = [] for callsite in function.callsites: - if callsite.callee in remove_set: + if ((callsite.callee,) in remove_simple or + (function, callsite.callee) in remove_simple): continue if callsite.target is None and callsite.address in eliminated_addrs: continue cleaned_callsites.append(callsite) + if callsite.callee is not None: + edge_set.add((function, callsite.callee)) function.callsites = cleaned_callsites - def AnalyzeCallGraph(self, function_map): + return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)] + + def AnalyzeCallGraph(self, function_map, remove_list): """Analyze callgraph. It will update the max stack size and path for each function. Args: function_map: Function map. + remove_list: List of remove paths. Returns: - SCC groups of the callgraph. + List of function cycles. """ - def BuildSCC(function): - """Tarjan's strongly connected components algorithm. - - It also calculates the max stack size and path for the function. - For cycle, we only count the stack size following the traversal order. + def Traverse(curr_state): + """Traverse the callgraph and calculate the max stack usages of functions. Args: - function: Current function. + curr_state: Current state. + + Returns: + SCC lowest link. """ - function.scc_index = scc_index_counter[0] - function.scc_lowlink = function.scc_index + scc_index = scc_index_counter[0] scc_index_counter[0] += 1 - scc_stack.append(function) - function.scc_onstack = True + scc_index_map[curr_state] = scc_index + scc_lowlink = scc_index + scc_stack.append(curr_state) + # Push the current state in the stack. We can use a set to maintain this + # because the stacked states are unique; otherwise we will find a cycle + # first. + stacked_states.add(curr_state) + + (curr_address, curr_positions) = curr_state + curr_func = function_map[curr_address] + + invalid_flag = False + new_positions = list(curr_positions) + for index, position in enumerate(curr_positions): + remove_path = remove_list[index] + + # The position of each remove path in the state is the length of the + # longest matching path between the prefix of the remove path and the + # suffix of the current traversing path. We maintain this length when + # appending the next callee to the traversing path. And it can be used + # to check if the remove path appears in the traversing path. + + # TODO(cheyuw): Implement KMP algorithm to match remove paths + # efficiently. + if remove_path[position] is curr_func: + # Matches the current function, extend the length. + new_positions[index] = position + 1 + if new_positions[index] == len(remove_path): + # The length of the longest matching path is equal to the length of + # the remove path, which means the suffix of the current traversing + # path matches the remove path. + invalid_flag = True + break - # Max stack usage is at least equal to the stack frame. - max_stack_usage = function.stack_frame - max_callee = None + else: + # We can't get the new longest matching path by extending the previous + # one directly. Fallback to search the new longest matching path. + + # If we can't find any matching path in the following search, reset + # the matching length to 0. + new_positions[index] = 0 + + # We want to find the new longest matching prefix of remove path with + # the suffix of the current traversing path. Because the new longest + # matching path won't be longer than the prevous one now, and part of + # the suffix matches the prefix of remove path, we can get the needed + # suffix from the previous matching prefix of the invalid path. + suffix = remove_path[:position] + [curr_func] + for offset in range(1, len(suffix)): + length = position - offset + if remove_path[:length] == suffix[offset:]: + new_positions[index] = length + break + + new_positions = tuple(new_positions) + + # If the current suffix is invalid, set the max stack usage to 0. + max_stack_usage = 0 + max_callee_state = None self_loop = False - for callsite in function.callsites: - callee = callsite.callee - if callee is None: - continue - if callee.scc_lowlink is None: - # Unvisited descendant. - BuildSCC(callee) - function.scc_lowlink = min(function.scc_lowlink, callee.scc_lowlink) - elif callee.scc_onstack: - # Reaches a parent node or self. - function.scc_lowlink = min(function.scc_lowlink, callee.scc_index) - if callee is function: - self_loop = True - - # If the callee is a parent or itself, stack_max_usage will be None. - callee_stack_usage = callee.stack_max_usage - if callee_stack_usage is not None: - if callsite.is_tail: - # For tailing call, since the callee reuses the stack frame of the - # caller, choose which one is larger directly. - stack_usage = max(function.stack_frame, callee_stack_usage) - else: - stack_usage = function.stack_frame + callee_stack_usage - - if stack_usage > max_stack_usage: - max_stack_usage = stack_usage - max_callee = callee + if not invalid_flag: + # Max stack usage is at least equal to the stack frame. + max_stack_usage = curr_func.stack_frame + for callsite in curr_func.callsites: + callee = callsite.callee + if callee is None: + continue - if function.scc_lowlink == function.scc_index: - # Group the functions to a new cycle group. - group_index = len(cycle_groups) + callee_state = (callee.address, new_positions) + if callee_state not in scc_index_map: + # Unvisited state. + scc_lowlink = min(scc_lowlink, Traverse(callee_state)) + elif callee_state in stacked_states: + # The state is shown in the stack. There is a cycle. + sub_stack_usage = 0 + scc_lowlink = min(scc_lowlink, scc_index_map[callee_state]) + if callee_state == curr_state: + self_loop = True + + done_result = done_states.get(callee_state) + if done_result is not None: + # Already done this state and use its result. If the state reaches a + # cycle, reusing the result will cause inaccuracy (the stack usage + # of cycle depends on where the entrance is). But it's fine since we + # can't get accurate stack usage under this situation, and we rely + # on user-provided annotations to break the cycle, after which the + # result will be accurate again. + (sub_stack_usage, _) = done_result + + if callsite.is_tail: + # For tailing call, since the callee reuses the stack frame of the + # caller, choose the larger one directly. + stack_usage = max(curr_func.stack_frame, sub_stack_usage) + else: + stack_usage = curr_func.stack_frame + sub_stack_usage + + if stack_usage > max_stack_usage: + max_stack_usage = stack_usage + max_callee_state = callee_state + + if scc_lowlink == scc_index: group = [] - while scc_stack[-1] is not function: - scc_func = scc_stack.pop() - scc_func.scc_onstack = False - scc_func.cycle_index = group_index - group.append(scc_func) + while scc_stack[-1] != curr_state: + scc_state = scc_stack.pop() + stacked_states.remove(scc_state) + group.append(scc_state) scc_stack.pop() - function.scc_onstack = False - function.cycle_index = group_index + stacked_states.remove(curr_state) - # If the function is in any cycle (include self loop), add itself to - # the cycle group. Otherwise its cycle group is empty. + # If the cycle is not empty, record it. if len(group) > 0 or self_loop: - # The function is in a cycle. - group.append(function) - - cycle_groups.append(group) - - # Update stack analysis result. - function.stack_max_usage = max_stack_usage - function.stack_successor = max_callee - - cycle_groups = [] + group.append(curr_state) + cycle_groups.append(group) + + # Store the done result. + done_states[curr_state] = (max_stack_usage, max_callee_state) + + if curr_positions == initial_positions: + # If the current state is initial state, we traversed the callgraph by + # using the current function as start point. Update the stack usage of + # the function. + # If the function matches a single vertex remove path, this will set its + # max stack usage to 0, which is not expected (we still calculate its + # max stack usage, but prevent any function from calling it). However, + # all the single vertex remove paths have been preprocessed and removed. + curr_func.stack_max_usage = max_stack_usage + + # Reconstruct the max stack path by traversing the state transitions. + max_stack_path = [curr_func] + callee_state = max_callee_state + while callee_state is not None: + # The first element of state tuple is function address. + max_stack_path.append(function_map[callee_state[0]]) + done_result = done_states.get(callee_state) + # All of the descendants should be done. + assert done_result is not None + (_, callee_state) = done_result + + curr_func.stack_max_path = max_stack_path + + return scc_lowlink + + # The state is the concatenation of the current function address and the + # state of matching position. + initial_positions = (0,) * len(remove_list) + done_states = {} + stacked_states = set() scc_index_counter = [0] + scc_index_map = {} scc_stack = [] + cycle_groups = [] for function in function_map.values(): - if function.scc_lowlink is None: - BuildSCC(function) + if function.stack_max_usage is None: + Traverse((function.address, initial_positions)) - return cycle_groups + cycle_functions = [] + for group in cycle_groups: + cycle = set(function_map[state[0]] for state in group) + if cycle not in cycle_functions: + cycle_functions.append(cycle) + + return cycle_functions def Analyze(self): """Run the stack analysis. @@ -1018,15 +1181,26 @@ class StackAnalyzer(object): Raises: StackAnalyzerError: If disassembly fails. """ - def PrintInlineStack(address, prefix=''): - """Print beautiful inline stack. + def OutputInlineStack(address, prefix=''): + """Output beautiful inline stack. Args: address: Address. prefix: Prefix of each line. + + Returns: + Key for sorting, output text """ + line_infos = self.AddressToLine(address, True) + + if line_infos[0] is None: + order_key = (None, None) + else: + (_, path, linenum) = line_infos[0] + order_key = (linenum, path) + line_texts = [] - for line_info in reversed(self.AddressToLine(address, True)): + for line_info in reversed(line_infos): if line_info is None: (function_name, path, linenum) = ('??', '??', 0) else: @@ -1036,9 +1210,12 @@ class StackAnalyzer(object): os.path.relpath(path), linenum)) - print('{}-> {} {:x}'.format(prefix, line_texts[0], address)) + output = '{}-> {} {:x}\n'.format(prefix, line_texts[0], address) for depth, line_text in enumerate(line_texts[1:]): - print('{} {}- {}'.format(prefix, ' ' * depth, line_text)) + output += '{} {}- {}\n'.format(prefix, ' ' * depth, line_text) + + # Remove the last newline character. + return (order_key, output.rstrip('\n')) # Analyze disassembly. try: @@ -1052,11 +1229,12 @@ class StackAnalyzer(object): function_map = self.AnalyzeDisassembly(disasm_text) result = self.ResolveAnnotation(function_map) - (add_set, remove_set, eliminated_addrs, failed_sigtxts) = result - self.PreprocessAnnotation(function_map, - add_set, remove_set, - eliminated_addrs) - cycle_groups = self.AnalyzeCallGraph(function_map) + (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result + remove_list = self.PreprocessAnnotation(function_map, + add_set, + remove_list, + eliminated_addrs) + cycle_functions = self.AnalyzeCallGraph(function_map, remove_list) # Print the results of task-aware stack analysis. for task in self.tasklist: @@ -1069,36 +1247,37 @@ class StackAnalyzer(object): task.stack_max_size)) print('Call Trace:') - curr_func = routine_func - while curr_func is not None: + max_stack_path = routine_func.stack_max_path + # Assume the routine function is resolved. + assert max_stack_path is not None + for depth, curr_func in enumerate(max_stack_path): line_info = self.AddressToLine(curr_func.address)[0] if line_info is None: (path, linenum) = ('??', 0) else: (_, path, linenum) = line_info - output = ' {} ({}) [{}:{}] {:x}'.format(curr_func.name, - curr_func.stack_frame, - os.path.relpath(path), - linenum, - curr_func.address) - if len(cycle_groups[curr_func.cycle_index]) > 0: - # If its cycle group isn't empty, it is in a cycle. - output += ' [cycle]' + print(' {} ({}) [{}:{}] {:x}'.format(curr_func.name, + curr_func.stack_frame, + os.path.relpath(path), + linenum, + curr_func.address)) - print(output) - - succ_func = curr_func.stack_successor - if succ_func is not None: + if depth + 1 < len(max_stack_path): + succ_func = max_stack_path[depth + 1] + text_list = [] for callsite in curr_func.callsites: if callsite.callee is succ_func: indent_prefix = ' ' if callsite.address is None: - print('{}-> [annotation]'.format(indent_prefix)) + order_text = (None, '{}-> [annotation]'.format(indent_prefix)) else: - PrintInlineStack(callsite.address, indent_prefix) + order_text = OutputInlineStack(callsite.address, indent_prefix) + + text_list.append(order_text) - curr_func = succ_func + for _, text in sorted(text_list, key=lambda (k, _): k): + print(text) print('Unresolved indirect callsites:') for function in function_map.values(): @@ -1108,14 +1287,23 @@ class StackAnalyzer(object): indirect_callsites.append(callsite.address) if len(indirect_callsites) > 0: - print(' {}'.format(function.name)) + print(' In function {}:'.format(function.name)) + text_list = [] for address in indirect_callsites: - PrintInlineStack(address, ' ') + text_list.append(OutputInlineStack(address, ' ')) + + for _, text in sorted(text_list, key=lambda (k, _): k): + print(text) print('Unresolved annotation signatures:') for sigtxt, error in failed_sigtxts: print(' {}: {}'.format(sigtxt, error)) + if len(cycle_functions) > 0: + print('There are cycles in the following function sets:') + for functions in cycle_functions: + print('[{}]'.format(', '.join(function.name for function in functions))) + def ParseArgs(): """Parse commandline arguments. diff --git a/extra/stack_analyzer/stack_analyzer_unittest.py b/extra/stack_analyzer/stack_analyzer_unittest.py index ce12b83552..f6036a65e9 100755 --- a/extra/stack_analyzer/stack_analyzer_unittest.py +++ b/extra/stack_analyzer/stack_analyzer_unittest.py @@ -217,13 +217,13 @@ class StackAnalyzerTest(unittest.TestCase): self.analyzer.annotation = {} (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() self.assertEqual(add_rules, {}) - self.assertEqual(remove_rules, set()) + self.assertEqual(remove_rules, []) self.assertEqual(invalid_sigtxts, set()) self.analyzer.annotation = {'add': None, 'remove': None} (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() self.assertEqual(add_rules, {}) - self.assertEqual(remove_rules, set()) + self.assertEqual(remove_rules, []) self.assertEqual(invalid_sigtxts, set()) funcs = { @@ -256,29 +256,35 @@ class StackAnalyzerTest(unittest.TestCase): 'inlined_mul[x.c]': ['inlined_mul'], 'toot_calc[t.c:1234]': ['hook_task'], }, - 'remove': { - 'touchpad?calc[', + 'remove': [ + ['touchpad?calc['], 'touchpad_calc', - 'touchpad_calc[a.c]', - 'task_unk[a.c]', - 'touchpad_calc[x/a.c]', - 'trackpad_range', - 'inlined_mul', - }, + ['touchpad_calc[a.c]'], + ['task_unk[a.c]'], + ['touchpad_calc[x/a.c]'], + ['trackpad_range'], + ['inlined_mul'], + ['inlined_mul', 'console_task', 'touchpad_calc[a.c]'], + ['inlined_mul', 'inlined_mul_alias', 'console_task'], + ['inlined_mul', 'inlined_mul_alias', 'console_task'], + ], } (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() self.assertEqual(invalid_sigtxts, {'touchpad?calc['}) - signature_set = set(remove_rules) + signature_set = set() for src_sig, dst_sigs in add_rules.items(): signature_set.add(src_sig) signature_set.update(dst_sigs) + for remove_sigs in remove_rules: + signature_set.update(remove_sigs) + (signature_map, failed_sigs) = self.analyzer.MapAnnotation(funcs, signature_set) result = self.analyzer.ResolveAnnotation(funcs) - (add_set, remove_set, eliminated_addrs, failed_sigs) = result + (add_set, remove_list, eliminated_addrs, failed_sigs) = result expect_signature_map = { ('hook_task', None, None): {funcs[0x1000]}, @@ -304,11 +310,19 @@ class StackAnalyzerTest(unittest.TestCase): (funcs[0x13100], funcs[0x13000]), (funcs[0x13100], funcs[0x13100]), }) - self.assertEqual(remove_set, { - funcs[0x4000], - funcs[0x13000], - funcs[0x13100] - }) + expect_remove_list = [ + [funcs[0x4000]], + [funcs[0x13000]], + [funcs[0x13100]], + [funcs[0x13000], funcs[0x2000], funcs[0x4000]], + [funcs[0x13100], funcs[0x2000], funcs[0x4000]], + [funcs[0x13000], funcs[0x13100], funcs[0x2000]], + [funcs[0x13100], funcs[0x13100], funcs[0x2000]], + ] + self.assertEqual(len(remove_list), len(expect_remove_list)) + for remove_path in remove_list: + self.assertTrue(remove_path in expect_remove_list) + self.assertEqual(eliminated_addrs, {0x1002}) self.assertEqual(failed_sigs, { ('touchpad?calc[', sa.StackAnalyzer.ANNOTATION_ERROR_INVALID), @@ -331,14 +345,25 @@ class StackAnalyzerTest(unittest.TestCase): sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]), sa.Callsite(0x2006, None, True, None), ] - add_set = {(funcs[0x2000], funcs[0x4000]), (funcs[0x4000], funcs[0x1000])} - remove_set = {funcs[0x1000]} + add_set = { + (funcs[0x2000], funcs[0x2000]), + (funcs[0x2000], funcs[0x4000]), + (funcs[0x4000], funcs[0x1000]), + (funcs[0x4000], funcs[0x2000]), + } + remove_list = [ + [funcs[0x1000]], + [funcs[0x2000], funcs[0x2000]], + [funcs[0x4000], funcs[0x1000]], + [funcs[0x2000], funcs[0x4000], funcs[0x2000]], + [funcs[0x4000], funcs[0x1000], funcs[0x4000]], + ] eliminated_addrs = {0x2006} - self.analyzer.PreprocessAnnotation(funcs, - add_set, - remove_set, - eliminated_addrs) + remaining_remove_list = self.analyzer.PreprocessAnnotation(funcs, + add_set, + remove_list, + eliminated_addrs) expect_funcs = { 0x1000: sa.Function(0x1000, 'hook_task', 0, []), @@ -347,7 +372,12 @@ class StackAnalyzerTest(unittest.TestCase): } expect_funcs[0x2000].callsites = [ sa.Callsite(None, 0x4000, False, expect_funcs[0x4000])] + expect_funcs[0x4000].callsites = [ + sa.Callsite(None, 0x2000, False, expect_funcs[0x2000])] self.assertEqual(funcs, expect_funcs) + self.assertEqual(remaining_remove_list, [ + [funcs[0x2000], funcs[0x4000], funcs[0x2000]], + ]) def testAnalyzeDisassembly(self): disasm_text = ( @@ -393,14 +423,18 @@ class StackAnalyzerTest(unittest.TestCase): 0x7000: sa.Function(0x7000, 'task_e', 24, []), 0x8000: sa.Function(0x8000, 'task_f', 20, []), 0x9000: sa.Function(0x9000, 'task_g', 20, []), + 0x10000: sa.Function(0x10000, 'task_x', 16, []), } funcs[0x1000].callsites = [ sa.Callsite(0x1002, 0x3000, False, funcs[0x3000]), sa.Callsite(0x1006, 0x4000, False, funcs[0x4000])] funcs[0x2000].callsites = [ - sa.Callsite(0x2002, 0x5000, False, funcs[0x5000])] + sa.Callsite(0x2002, 0x5000, False, funcs[0x5000]), + sa.Callsite(0x2006, 0x2000, False, funcs[0x2000]), + sa.Callsite(0x200a, 0x10000, False, funcs[0x10000])] funcs[0x3000].callsites = [ - sa.Callsite(0x3002, 0x4000, False, funcs[0x4000])] + sa.Callsite(0x3002, 0x4000, False, funcs[0x4000]), + sa.Callsite(0x3006, 0x1000, False, funcs[0x1000])] funcs[0x4000].callsites = [ sa.Callsite(0x4002, 0x6000, True, funcs[0x6000]), sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]), @@ -413,29 +447,68 @@ class StackAnalyzerTest(unittest.TestCase): sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])] funcs[0x9000].callsites = [ sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])] + funcs[0x10000].callsites = [ + sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])] - scc_group = self.analyzer.AnalyzeCallGraph(funcs) + cycles = self.analyzer.AnalyzeCallGraph(funcs, [ + [funcs[0x2000]] * 2, + [funcs[0x10000], funcs[0x2000]] * 3, + [funcs[0x1000], funcs[0x3000], funcs[0x1000]] + ]) expect_func_stack = { - 0x1000: (148, funcs[0x3000], set()), - 0x2000: (176, funcs[0x5000], set()), - 0x3000: (148, funcs[0x4000], set()), - 0x4000: (136, funcs[0x8000], {funcs[0x4000], - funcs[0x8000], - funcs[0x9000]}), - 0x5000: (168, funcs[0x4000], set()), - 0x6000: (100, None, set()), - 0x7000: (24, None, {funcs[0x7000]}), - 0x8000: (40, funcs[0x9000], {funcs[0x4000], - funcs[0x8000], - funcs[0x9000]}), - 0x9000: (20, None, {funcs[0x4000], funcs[0x8000], funcs[0x9000]}), + 0x1000: (268, [funcs[0x1000], + funcs[0x3000], + funcs[0x4000], + funcs[0x8000], + funcs[0x9000], + funcs[0x4000], + funcs[0x7000]]), + 0x2000: (208, [funcs[0x2000], + funcs[0x10000], + funcs[0x2000], + funcs[0x10000], + funcs[0x2000], + funcs[0x5000], + funcs[0x4000], + funcs[0x7000]]), + 0x3000: (280, [funcs[0x3000], + funcs[0x1000], + funcs[0x3000], + funcs[0x4000], + funcs[0x8000], + funcs[0x9000], + funcs[0x4000], + funcs[0x7000]]), + 0x4000: (120, [funcs[0x4000], funcs[0x7000]]), + 0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]), + 0x6000: (100, [funcs[0x6000]]), + 0x7000: (24, [funcs[0x7000]]), + 0x8000: (160, [funcs[0x8000], + funcs[0x9000], + funcs[0x4000], + funcs[0x7000]]), + 0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]), + 0x10000: (200, [funcs[0x10000], + funcs[0x2000], + funcs[0x10000], + funcs[0x2000], + funcs[0x5000], + funcs[0x4000], + funcs[0x7000]]), } + expect_cycles = [ + {funcs[0x4000], funcs[0x8000], funcs[0x9000]}, + {funcs[0x7000]}, + ] for func in funcs.values(): - (stack_max_usage, stack_successor, scc) = expect_func_stack[func.address] + (stack_max_usage, stack_max_path) = expect_func_stack[func.address] self.assertEqual(func.stack_max_usage, stack_max_usage) - self.assertEqual(func.stack_successor, stack_successor) - self.assertEqual(set(scc_group[func.cycle_index]), scc) + self.assertEqual(func.stack_max_path, stack_max_path) + + self.assertEqual(len(cycles), len(expect_cycles)) + for cycle in cycles: + self.assertTrue(cycle in expect_cycles) @mock.patch('subprocess.check_output') def testAddressToLine(self, checkoutput_mock): @@ -498,7 +571,7 @@ class StackAnalyzerTest(unittest.TestCase): ) addrtoline_mock.return_value = [('??', '??', 0)] - self.analyzer.annotation = {'remove': ['fake_func']} + self.analyzer.annotation = {'remove': [['fake_func']]} with mock.patch('__builtin__.print') as print_mock: checkoutput_mock.return_value = disasm_text @@ -515,7 +588,7 @@ class StackAnalyzerTest(unittest.TestCase): mock.call(' -> ??[??:0] 2002'), mock.call(' hook_task (8) [??:0] 1000'), mock.call('Unresolved indirect callsites:'), - mock.call(' console_task'), + mock.call(' In function console_task:'), mock.call(' -> ??[??:0] 200a'), mock.call('Unresolved annotation signatures:'), mock.call(' fake_func: function is not found'), |