diff options
-rw-r--r-- | Makefile.rules | 1 | ||||
-rw-r--r-- | extra/stack_analyzer/README.md | 5 | ||||
-rw-r--r-- | extra/stack_analyzer/example_annotation.yaml | 18 | ||||
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer.py | 235 | ||||
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer_unittest.py | 181 |
5 files changed, 413 insertions, 27 deletions
diff --git a/Makefile.rules b/Makefile.rules index 4256f921ee..7e66d2ef77 100644 --- a/Makefile.rules +++ b/Makefile.rules @@ -554,6 +554,7 @@ analyzestack: $(out)/util/export_taskinfo.so fi; \ extra/stack_analyzer/stack_analyzer.py --objdump "$(OBJDUMP)" \ --addr2line "$(ADDR2LINE)" --section "$$SECTION" \ + $(if $(ANNOTATION),--annotation $(ANNOTATION),) \ --export_taskinfo "$$EXPORT_TASKINFO" "$$ELF" .SECONDARY: diff --git a/extra/stack_analyzer/README.md b/extra/stack_analyzer/README.md index 472b4a91ed..e1be72ebbe 100644 --- a/extra/stack_analyzer/README.md +++ b/extra/stack_analyzer/README.md @@ -12,6 +12,7 @@ Make sure the firmware of your target board has been built. In `src/platform/ec`, run ``` -make BOARD=${BOARD} SECTION=${SECTION} analyzestack +make BOARD=${BOARD} SECTION=${SECTION} ANNOTATION=${ANNOTATION} analyzestack ``` -The `${SECTION}` can be `RO` or `RW`. +The `${SECTION}` can be `RO` or `RW`. The `${ANNOTATION}` is a optional +annotation file, see the example_annotation.yaml. diff --git a/extra/stack_analyzer/example_annotation.yaml b/extra/stack_analyzer/example_annotation.yaml new file mode 100644 index 0000000000..f4726d55c4 --- /dev/null +++ b/extra/stack_analyzer/example_annotation.yaml @@ -0,0 +1,18 @@ +# Add some missing calls. +add: + # console_task also calls command_display_accel_info and command_accel_init. + console_task: + - command_display_accel_info + - command_accel_init + + # Function name can be followed by [source code path] to indicate where is it + # declared (there may be several functions with the same name). + motion_lid_calc[common/motion_lid.c]: + - get_range[driver/accel_kionix.c] + +# Remove some call paths. +remove: +# Remove all callsites pointing to panic_assert_fail. +- panic_assert_fail +- panic +- software_panic diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py index cc9543cc8c..5d97d15ef1 100755 --- a/extra/stack_analyzer/stack_analyzer.py +++ b/extra/stack_analyzer/stack_analyzer.py @@ -16,9 +16,12 @@ from __future__ import print_function import argparse +import collections import ctypes +import os import re import subprocess +import yaml SECTION_RO = 'RO' @@ -132,7 +135,7 @@ class Callsite(object): """Function callsite. Attributes: - address: Address of callsite location. + address: Address of callsite location. None if it is unknown. target: Callee address. is_tail: A bool indicates that it is a tailing call. callee: Resolved callee function. None if it hasn't been resolved. @@ -359,17 +362,25 @@ class StackAnalyzer(object): Analyze: Run the stack analysis. """ - def __init__(self, options, symbols, tasklist): + # Errors of annotation resolving. + ANNOTATION_ERROR_INVALID = 'invalid signature' + ANNOTATION_ERROR_NOTFOUND = 'function is not found' + ANNOTATION_ERROR_AMBIGUOUS = 'signature is ambiguous' + + def __init__(self, options, symbols, tasklist, annotation): """Constructor. Args: options: Namespace from argparse.parse_args(). symbols: Symbol list. tasklist: Task list. + annotation: Annotation config. """ self.options = options self.symbols = symbols self.tasklist = tasklist + self.annotation = annotation + self.address_to_line_cache = {} def AddressToLine(self, address): """Convert address to line. @@ -383,6 +394,9 @@ class StackAnalyzer(object): Raises: StackAnalyzerError: If addr2line is failed. """ + if address in self.address_to_line_cache: + return self.address_to_line_cache[address] + try: line_text = subprocess.check_output([self.options.addr2line, '-e', @@ -393,7 +407,9 @@ class StackAnalyzer(object): except OSError: raise StackAnalyzerError('Failed to run addr2line.') - return line_text.strip() + line_text = line_text.strip() + self.address_to_line_cache[address] = line_text + return line_text def AnalyzeDisassembly(self, disasm_text): """Parse the disassembly text, analyze, and build a map of all functions. @@ -559,6 +575,186 @@ class StackAnalyzer(object): return function_map + def MappingAnnotation(self, function_map, signature_set): + """Map annotation signatures to functions. + + Args: + function_map: Function map. + signature_set: Set of annotation signatures. + + Returns: + Map of signatures to functions, set of signatures which can't be resolved. + """ + C_FUNCTION_NAME = r'_A-Za-z0-9' + ADDRTOLINE_FAILED_SYMBOL = '??' + # To eliminate the suffix appended by compilers, try to extract the + # C function name from the prefix of symbol name. + # Example: SHA256_transform.constprop.28 + prefix_name_regex = re.compile( + r'^(?P<name>[{0}]+)([^{0}].*)?$'.format(C_FUNCTION_NAME)) + # Example: get_range[driver/accel_kionix.c] + annotation_signature_regex = re.compile( + r'^(?P<name>[{}]+)(\[(?P<path>.+)\])?$'.format(C_FUNCTION_NAME)) + # Example: driver/accel_kionix.c:321 and ??:0 + addrtoline_regex = re.compile(r'^(?P<path>.+):\d+$') + + # Build the symbol map indexed by symbol name. If there are multiple symbols + # with the same name, add them into a set. (e.g. symbols of static function + # with the same name) + symbol_map = collections.defaultdict(set) + for symbol in self.symbols: + if symbol.symtype == 'F': + # Function symbol. + result = prefix_name_regex.match(symbol.name) + if result is not None: + function = function_map.get(symbol.address) + # Ignore the symbol not in disassembly. + if function is not None: + # If there are multiple symbol with the same name and point to the + # same function, the set will deduplicate them. + symbol_map[result.group('name').strip()].add(function) + + # Build the signature map indexed by annotation signature. + signature_map = {} + failed_sigs = set() + symbol_path_map = {} + for sig in signature_set: + result = annotation_signature_regex.match(sig) + if result is None: + failed_sigs.add((sig, self.ANNOTATION_ERROR_INVALID)) + continue + + name = result.group('name').strip() + path = result.group('path') + + functions = symbol_map.get(name) + if functions is None: + failed_sigs.add((sig, self.ANNOTATION_ERROR_NOTFOUND)) + continue + + if name not in symbol_path_map: + # Lazy symbol path resolving. Since the addr2line isn't fast, only + # resolve needed symbol paths. + group_map = collections.defaultdict(list) + for function in functions: + result = addrtoline_regex.match(self.AddressToLine(function.address)) + # Assume the output of addr2line is always well-formed. + assert result is not None + symbol_path = result.group('path').strip() + if symbol_path == ADDRTOLINE_FAILED_SYMBOL: + continue + + # Group the functions with the same symbol signature (symbol name + + # symbol path). Assume they are the same copies and do the same + # annotation operations of them because we don't know which copy is + # indicated by the users. + group_map[os.path.realpath(symbol_path)].append(function) + + symbol_path_map[name] = group_map + + # Symbol matching. + function_group = None + group_map = symbol_path_map[name] + if len(group_map) > 0: + if path is None: + if len(group_map) > 1: + # There is ambiguity but the path isn't specified. + failed_sigs.add((sig, self.ANNOTATION_ERROR_AMBIGUOUS)) + continue + + # No path signature but all symbol signatures of functions are same. + # Assume they are the same functions, so there is no ambiguity. + (function_group,) = group_map.values() + else: + function_group = group_map.get(os.path.realpath(path.strip())) + + if function_group is None: + failed_sigs.add((sig, self.ANNOTATION_ERROR_NOTFOUND)) + continue + + # The function_group is a list of all the same functions (according to + # our assumption) which should be annotated together. + signature_map[sig] = function_group + + return (signature_map, failed_sigs) + + def ResolveAnnotation(self, function_map): + """Resolve annotation. + + Args: + function_map: Function map. + + Returns: + Set of added call edges, set of invalid paths, set of annotation + signatures which can't be resolved. + """ + # Collect annotation signatures. + annotation_add_map = self.annotation.get('add', {}) + annotation_remove_list = self.annotation.get('remove', []) + + signature_set = set(annotation_remove_list) + for src_sig, dst_sigs in annotation_add_map.items(): + signature_set.add(src_sig) + signature_set.update(dst_sigs) + + signature_set = {sig.strip() for sig in signature_set} + + # Map signatures to functions. + (signature_map, failed_sigs) = self.MappingAnnotation(function_map, + signature_set) + + # Generate the annotation sets. + add_set = set() + remove_set = set() + + for src_sig, dst_sigs in annotation_add_map.items(): + src_funcs = signature_map.get(src_sig) + if src_funcs is None: + continue + + for dst_sig in dst_sigs: + dst_funcs = signature_map.get(dst_sig) + if dst_funcs is None: + continue + + # Duplicate the call edge for all the same source and destination + # functions. + for src_func in src_funcs: + for dst_func in dst_funcs: + add_set.add((src_func, dst_func)) + + for remove_sig in annotation_remove_list: + remove_funcs = signature_map.get(remove_sig) + if remove_funcs is not None: + # Add all the same functions. + remove_set.update(remove_funcs) + + return add_set, remove_set, failed_sigs + + def PreprocessCallGraph(self, function_map, add_set, remove_set): + """Preprocess the callgraph. + + It will add the missing call edges, and remove simple invalid paths (the + paths only have one vertex) from the function_map. + + Args: + function_map: Function map. + add_set: Set of missing call edges. + remove_set: Set of invalid paths. + """ + for src_func, dst_func in add_set: + # TODO(cheyuw): Support tailing call annotation. + src_func.callsites.append( + Callsite(None, dst_func.address, False, dst_func)) + + for function in function_map.values(): + cleaned_callsites = [] + for callsite in function.callsites: + if callsite.callee not in remove_set: + cleaned_callsites.append(callsite) + + function.callsites = cleaned_callsites + def AnalyzeCallGraph(self, function_map): """Analyze call graph. @@ -654,7 +850,11 @@ class StackAnalyzer(object): return cycle_groups def Analyze(self): - """Run the stack analysis.""" + """Run the stack analysis. + + Raises: + StackAnalyzerError: If disassembly fails. + """ # Analyze disassembly. try: disasm_text = subprocess.check_output([self.options.objdump, @@ -666,6 +866,8 @@ class StackAnalyzer(object): raise StackAnalyzerError('Failed to run objdump.') function_map = self.AnalyzeDisassembly(disasm_text) + (add_set, remove_set, failed_sigs) = self.ResolveAnnotation(function_map) + self.PreprocessCallGraph(function_map, add_set, remove_set) cycle_groups = self.AnalyzeCallGraph(function_map) # Print the results of task-aware stack analysis. @@ -693,6 +895,11 @@ class StackAnalyzer(object): print(output) curr_func = curr_func.stack_successor + if len(failed_sigs) > 0: + print('Failed to resolve some annotation signatures:') + for sig, error in failed_sigs: + print('\t{}: {}'.format(sig, error)) + def ParseArgs(): """Parse commandline arguments. @@ -710,6 +917,8 @@ def ParseArgs(): help='the path of objdump') parser.add_argument('--addr2line', default='addr2line', help='the path of addr2line') + parser.add_argument('--annotation', default=None, + help='the path of annotation file') # TODO(cheyuw): Add an option for dumping stack usage of all functions. @@ -799,6 +1008,22 @@ def main(): try: options = ParseArgs() + # Load annotation config. + if options.annotation is None: + annotation = {} + else: + try: + with open(options.annotation, 'r') as annotation_file: + annotation = yaml.safe_load(annotation_file) + + except yaml.YAMLError: + raise StackAnalyzerError('Failed to parse annotation file.') + except IOError: + raise StackAnalyzerError('Failed to open annotation file.') + + if not isinstance(annotation, dict): + raise StackAnalyzerError('Invalid annotation file.') + # Generate and parse the symbols. try: symbol_text = subprocess.check_output([options.objdump, @@ -819,7 +1044,7 @@ def main(): tasklist = LoadTasklist(options.section, export_taskinfo, symbols) - analyzer = StackAnalyzer(options, symbols, tasklist) + analyzer = StackAnalyzer(options, symbols, tasklist, annotation) analyzer.Analyze() except StackAnalyzerError as e: print('Error: {}'.format(e)) diff --git a/extra/stack_analyzer/stack_analyzer_unittest.py b/extra/stack_analyzer/stack_analyzer_unittest.py index 390072fd8b..2c0a4ae8f1 100755 --- a/extra/stack_analyzer/stack_analyzer_unittest.py +++ b/extra/stack_analyzer/stack_analyzer_unittest.py @@ -128,15 +128,21 @@ class StackAnalyzerTest(unittest.TestCase): symbols = [sa.Symbol(0x1000, 'F', 0x15C, 'hook_task'), sa.Symbol(0x2000, 'F', 0x51C, 'console_task'), sa.Symbol(0x3200, 'O', 0x124, '__just_data'), - sa.Symbol(0x4000, 'F', 0x11C, 'touchpad_calc')] + sa.Symbol(0x4000, 'F', 0x11C, 'touchpad_calc'), + sa.Symbol(0x5000, 'F', 0x12C, 'touchpad_calc.constprop.42'), + sa.Symbol(0x12000, 'F', 0x13C, 'trackpad_range'), + sa.Symbol(0x13000, 'F', 0x200, 'inlined_mul'), + sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul'), + sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul_alias')] tasklist = [sa.Task('HOOKS', 'hook_task', 2048, 0x1000), sa.Task('CONSOLE', 'console_task', 460, 0x2000)] options = mock.MagicMock(elf_path='./ec.RW.elf', - export_taskinfo='none', + export_taskinfo='fake', section='RW', objdump='objdump', - addr2line='addr2line') - self.analyzer = sa.StackAnalyzer(options, symbols, tasklist) + addr2line='addr2line', + annotation=None) + self.analyzer = sa.StackAnalyzer(options, symbols, tasklist, {}) def testParseSymbolText(self): symbol_text = ( @@ -191,6 +197,112 @@ class StackAnalyzerTest(unittest.TestCase): tasklist = sa.LoadTasklist('RW', export_taskinfo, self.analyzer.symbols) self.assertEqual(tasklist, expect_rw_tasklist) + def testResolveAnnotation(self): + funcs = { + 0x1000: sa.Function(0x1000, 'hook_task', 0, []), + 0x2000: sa.Function(0x2000, 'console_task', 0, []), + 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), + 0x5000: sa.Function(0x5000, 'touchpad_calc.constprop.42', 0, []), + 0x13000: sa.Function(0x13000, 'inlined_mul', 0, []), + 0x13100: sa.Function(0x13100, 'inlined_mul', 0, []), + } + # Set address_to_line_cache to fake the results of addr2line. + self.analyzer.address_to_line_cache = { + 0x1000: 'a.c:10', + 0x2000: 'b.c:20', + 0x4000: './a.c:30', + 0x5000: 'b.c:40', + 0x12000: 't.c:10', + 0x13000: 'x.c:12', + 0x13100: 'x.c:12', + } + self.analyzer.annotation = { + 'add': { + 'hook_task': ['touchpad_calc[a.c]', 'hook_task'], + 'console_task': ['touchpad_calc[b.c]', 'inlined_mul_alias'], + 'hook_task[q.c]': ['hook_task'], + 'inlined_mul[x.c]': ['inlined_mul'], + }, + 'remove': { + 'touchpad?calc', + 'touchpad_calc', + 'touchpad_calc[a.c]', + 'task_unk[a.c]', + 'touchpad_calc[../a.c]', + 'trackpad_range', + 'inlined_mul', + }, + } + signature_set = set(self.analyzer.annotation['remove']) + for src_sig, dst_sigs in self.analyzer.annotation['add'].items(): + signature_set.add(src_sig) + signature_set.update(dst_sigs) + + (signature_map, failed_sigs) = self.analyzer.MappingAnnotation( + funcs, signature_set) + (add_set, remove_set, failed_sigs) = self.analyzer.ResolveAnnotation(funcs) + + expect_signature_map = { + 'hook_task': {funcs[0x1000]}, + 'touchpad_calc[a.c]': {funcs[0x4000]}, + 'touchpad_calc[b.c]': {funcs[0x5000]}, + 'console_task': {funcs[0x2000]}, + 'inlined_mul_alias': {funcs[0x13100]}, + 'inlined_mul[x.c]': {funcs[0x13000], funcs[0x13100]}, + 'inlined_mul': {funcs[0x13000], funcs[0x13100]}, + } + self.assertEqual(len(signature_map), len(expect_signature_map)) + for sig, funclist in signature_map.items(): + self.assertEqual(set(funclist), expect_signature_map[sig]) + + self.assertEqual(add_set, { + (funcs[0x1000], funcs[0x4000]), + (funcs[0x1000], funcs[0x1000]), + (funcs[0x2000], funcs[0x5000]), + (funcs[0x2000], funcs[0x13100]), + (funcs[0x13000], funcs[0x13000]), + (funcs[0x13000], funcs[0x13100]), + (funcs[0x13100], funcs[0x13000]), + (funcs[0x13100], funcs[0x13100]), + }) + self.assertEqual(remove_set, { + funcs[0x4000], + funcs[0x13000], + funcs[0x13100] + }) + self.assertEqual(failed_sigs, { + ('touchpad?calc', sa.StackAnalyzer.ANNOTATION_ERROR_INVALID), + ('touchpad_calc', sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS), + ('hook_task[q.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + ('task_unk[a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + ('touchpad_calc[../a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + ('trackpad_range', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + }) + + def testPreprocessCallGraph(self): + funcs = { + 0x1000: sa.Function(0x1000, 'hook_task', 0, []), + 0x2000: sa.Function(0x2000, 'console_task', 0, []), + 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), + } + funcs[0x1000].callsites = [ + sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])] + funcs[0x2000].callsites = [ + sa.Callsite(0x2002, 0x1000, False, funcs[0x1000])] + add_set = {(funcs[0x2000], funcs[0x4000]), (funcs[0x4000], funcs[0x1000])} + remove_set = {funcs[0x1000]} + + self.analyzer.PreprocessCallGraph(funcs, add_set, remove_set) + + expect_funcs = { + 0x1000: sa.Function(0x1000, 'hook_task', 0, []), + 0x2000: sa.Function(0x2000, 'console_task', 0, []), + 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), + } + expect_funcs[0x2000].callsites = [ + sa.Callsite(None, 0x4000, False, expect_funcs[0x4000])] + self.assertEqual(funcs, expect_funcs) + def testAnalyzeDisassembly(self): disasm_text = ( '\n' @@ -280,22 +392,23 @@ class StackAnalyzerTest(unittest.TestCase): @mock.patch('subprocess.check_output') def testAddressToLine(self, checkoutput_mock): checkoutput_mock.return_value = 'test.c [1]' - self.assertEqual(self.analyzer.AddressToLine(0x1000), 'test.c [1]') + self.assertEqual(self.analyzer.AddressToLine(0x1234), 'test.c [1]') checkoutput_mock.assert_called_once_with( - ['addr2line', '-e', './ec.RW.elf', '1000']) + ['addr2line', '-e', './ec.RW.elf', '1234']) with self.assertRaisesRegexp(sa.StackAnalyzerError, 'addr2line failed to resolve lines.'): checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '') - self.analyzer.AddressToLine(0x1000) + self.analyzer.AddressToLine(0x5678) with self.assertRaisesRegexp(sa.StackAnalyzerError, 'Failed to run addr2line.'): checkoutput_mock.side_effect = OSError() - self.analyzer.AddressToLine(0x1000) + self.analyzer.AddressToLine(0x9012) @mock.patch('subprocess.check_output') - def testAnalyze(self, checkoutput_mock): + @mock.patch('stack_analyzer.StackAnalyzer.AddressToLine') + def testAnalyze(self, addrtoline_mock, checkoutput_mock): disasm_text = ( '\n' 'Disassembly of section .text:\n' @@ -311,29 +424,33 @@ class StackAnalyzerTest(unittest.TestCase): ' 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n' ) + addrtoline_mock.return_value = '??:0' + self.analyzer.annotation = {'remove': ['fake_func']} + with mock.patch('__builtin__.print') as print_mock: - checkoutput_mock.side_effect = [disasm_text, '?', '?', '?'] + checkoutput_mock.return_value = disasm_text self.analyzer.Analyze() print_mock.assert_has_calls([ mock.call( 'Task: HOOKS, Max size: 224 (0 + 224), Allocated size: 2048'), mock.call('Call Trace:'), - mock.call('\thook_task (0) 1000 [?]'), + mock.call('\thook_task (0) 1000 [??:0]'), mock.call( 'Task: CONSOLE, Max size: 232 (8 + 224), Allocated size: 460'), mock.call('Call Trace:'), - mock.call('\tconsole_task (8) 2000 [?]'), + mock.call('\tconsole_task (8) 2000 [??:0]'), + mock.call('Failed to resolve some annotation signatures:'), + mock.call('\tfake_func: function is not found'), ]) with self.assertRaisesRegexp(sa.StackAnalyzerError, 'Failed to run objdump.'): - checkoutput_mock.side_effect = [OSError(), '?', '?', '?'] + checkoutput_mock.side_effect = OSError() self.analyzer.Analyze() with self.assertRaisesRegexp(sa.StackAnalyzerError, 'objdump failed to disassemble.'): - checkoutput_mock.side_effect = [subprocess.CalledProcessError(1, ''), '?', - '?', '?'] + checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '') self.analyzer.Analyze() @mock.patch('subprocess.check_output') @@ -342,11 +459,35 @@ class StackAnalyzerTest(unittest.TestCase): symbol_text = ('1000 g F .text 0000015c .hidden hook_task\n' '2000 g F .text 0000051c .hidden console_task\n') - parseargs_mock.return_value = mock.MagicMock(elf_path='./ec.RW.elf', - export_taskinfo='none', - section='RW', - objdump='objdump', - addr2line='addr2line') + args = mock.MagicMock(elf_path='./ec.RW.elf', + export_taskinfo='fake', + section='RW', + objdump='objdump', + addr2line='addr2line', + annotation='fake') + parseargs_mock.return_value = args + + with mock.patch('__builtin__.print') as print_mock: + sa.main() + print_mock.assert_called_once_with( + 'Error: Failed to open annotation file.') + + with mock.patch('__builtin__.print') as print_mock: + with mock.patch('__builtin__.open', mock.mock_open()) as open_mock: + open_mock.return_value.read.side_effect = ['{', ''] + sa.main() + open_mock.assert_called_once_with('fake', 'r') + print_mock.assert_called_once_with( + 'Error: Failed to parse annotation file.') + + with mock.patch('__builtin__.print') as print_mock: + with mock.patch('__builtin__.open', + mock.mock_open(read_data='')) as open_mock: + sa.main() + print_mock.assert_called_once_with( + 'Error: Invalid annotation file.') + + args.annotation = None with mock.patch('__builtin__.print') as print_mock: checkoutput_mock.return_value = symbol_text |