diff options
Diffstat (limited to 'extra/stack_analyzer/stack_analyzer.py')
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer.py | 3674 |
1 files changed, 1956 insertions, 1718 deletions
diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py index 77d16d5450..2431545c6a 100755 --- a/extra/stack_analyzer/stack_analyzer.py +++ b/extra/stack_analyzer/stack_analyzer.py @@ -1,11 +1,7 @@ #!/usr/bin/env python3 -# Copyright 2017 The Chromium OS Authors. All rights reserved. +# Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """Statically analyze stack usage of EC firmware. @@ -25,1848 +21,2090 @@ import ctypes import os import re import subprocess -import yaml +import yaml # pylint:disable=import-error -SECTION_RO = 'RO' -SECTION_RW = 'RW' +SECTION_RO = "RO" +SECTION_RW = "RW" # Default size of extra stack frame needed by exception context switch. # This value is for cortex-m with FPU enabled. DEFAULT_EXCEPTION_FRAME_SIZE = 224 class StackAnalyzerError(Exception): - """Exception class for stack analyzer utility.""" + """Exception class for stack analyzer utility.""" class TaskInfo(ctypes.Structure): - """Taskinfo ctypes structure. - - The structure definition is corresponding to the "struct taskinfo" - in "util/export_taskinfo.so.c". - """ - _fields_ = [('name', ctypes.c_char_p), - ('routine', ctypes.c_char_p), - ('stack_size', ctypes.c_uint32)] + """Taskinfo ctypes structure. + The structure definition is corresponding to the "struct taskinfo" + in "util/export_taskinfo.so.c". + """ -class Task(object): - """Task information. + _fields_ = [ + ("name", ctypes.c_char_p), + ("routine", ctypes.c_char_p), + ("stack_size", ctypes.c_uint32), + ] - Attributes: - name: Task name. - routine_name: Routine function name. - stack_max_size: Max stack size. - routine_address: Resolved routine address. None if it hasn't been resolved. - """ - def __init__(self, name, routine_name, stack_max_size, routine_address=None): - """Constructor. +class Task(object): + """Task information. - Args: + Attributes: name: Task name. routine_name: Routine function name. stack_max_size: Max stack size. - routine_address: Resolved routine address. - """ - self.name = name - self.routine_name = routine_name - self.stack_max_size = stack_max_size - self.routine_address = routine_address - - def __eq__(self, other): - """Task equality. - - Args: - other: The compared object. - - Returns: - True if equal, False if not. + routine_address: Resolved routine address. None if it hasn't been resolved. """ - if not isinstance(other, Task): - return False - return (self.name == other.name and - self.routine_name == other.routine_name and - self.stack_max_size == other.stack_max_size and - self.routine_address == other.routine_address) + def __init__( + self, name, routine_name, stack_max_size, routine_address=None + ): + """Constructor. + + Args: + name: Task name. + routine_name: Routine function name. + stack_max_size: Max stack size. + routine_address: Resolved routine address. + """ + self.name = name + self.routine_name = routine_name + self.stack_max_size = stack_max_size + self.routine_address = routine_address + + def __eq__(self, other): + """Task equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Task): + return False + + return ( + self.name == other.name + and self.routine_name == other.routine_name + and self.stack_max_size == other.stack_max_size + and self.routine_address == other.routine_address + ) class Symbol(object): - """Symbol information. + """Symbol information. - Attributes: - address: Symbol address. - symtype: Symbol type, 'O' (data, object) or 'F' (function). - size: Symbol size. - name: Symbol name. - """ - - def __init__(self, address, symtype, size, name): - """Constructor. - - Args: + Attributes: address: Symbol address. - symtype: Symbol type. + symtype: Symbol type, 'O' (data, object) or 'F' (function). size: Symbol size. name: Symbol name. """ - assert symtype in ['O', 'F'] - self.address = address - self.symtype = symtype - self.size = size - self.name = name - def __eq__(self, other): - """Symbol equality. - - Args: - other: The compared object. - - Returns: - True if equal, False if not. - """ - if not isinstance(other, Symbol): - return False - - return (self.address == other.address and - self.symtype == other.symtype and - self.size == other.size and - self.name == other.name) + def __init__(self, address, symtype, size, name): + """Constructor. + + Args: + address: Symbol address. + symtype: Symbol type. + size: Symbol size. + name: Symbol name. + """ + assert symtype in ["O", "F"] + self.address = address + self.symtype = symtype + self.size = size + self.name = name + + def __eq__(self, other): + """Symbol equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Symbol): + return False + + return ( + self.address == other.address + and self.symtype == other.symtype + and self.size == other.size + and self.name == other.name + ) class Callsite(object): - """Function callsite. - - Attributes: - address: Address of callsite location. None if it is unknown. - target: Callee address. None if it is unknown. - is_tail: A bool indicates that it is a tailing call. - callee: Resolved callee function. None if it hasn't been resolved. - """ + """Function callsite. - def __init__(self, address, target, is_tail, callee=None): - """Constructor. - - Args: + Attributes: address: Address of callsite location. None if it is unknown. target: Callee address. None if it is unknown. - is_tail: A bool indicates that it is a tailing call. (function jump to - another function without restoring the stack frame) - callee: Resolved callee function. + is_tail: A bool indicates that it is a tailing call. + callee: Resolved callee function. None if it hasn't been resolved. """ - # It makes no sense that both address and target are unknown. - assert not (address is None and target is None) - self.address = address - self.target = target - self.is_tail = is_tail - self.callee = callee - - def __eq__(self, other): - """Callsite equality. - Args: - other: The compared object. - - Returns: - True if equal, False if not. - """ - if not isinstance(other, Callsite): - return False - - if not (self.address == other.address and - self.target == other.target and - self.is_tail == other.is_tail): - return False - - if self.callee is None: - return other.callee is None - elif other.callee is None: - return False - - # Assume the addresses of functions are unique. - return self.callee.address == other.callee.address + def __init__(self, address, target, is_tail, callee=None): + """Constructor. + + Args: + address: Address of callsite location. None if it is unknown. + target: Callee address. None if it is unknown. + is_tail: A bool indicates that it is a tailing call. (function jump to + another function without restoring the stack frame) + callee: Resolved callee function. + """ + # It makes no sense that both address and target are unknown. + assert not (address is None and target is None) + self.address = address + self.target = target + self.is_tail = is_tail + self.callee = callee + + def __eq__(self, other): + """Callsite equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Callsite): + return False + + if not ( + self.address == other.address + and self.target == other.target + and self.is_tail == other.is_tail + ): + return False + + if self.callee is None: + return other.callee is None + elif other.callee is None: + return False + + # Assume the addresses of functions are unique. + return self.callee.address == other.callee.address class Function(object): - """Function. + """Function. - Attributes: - address: Address of function. - name: Name of function from its symbol. - stack_frame: Size of stack frame. - callsites: Callsite list. - stack_max_usage: Max stack usage. None if it hasn't been analyzed. - stack_max_path: Max stack usage path. None if it hasn't been analyzed. - """ - - def __init__(self, address, name, stack_frame, callsites): - """Constructor. - - Args: + Attributes: address: Address of function. name: Name of function from its symbol. stack_frame: Size of stack frame. callsites: Callsite list. + stack_max_usage: Max stack usage. None if it hasn't been analyzed. + stack_max_path: Max stack usage path. None if it hasn't been analyzed. """ - self.address = address - self.name = name - self.stack_frame = stack_frame - self.callsites = callsites - self.stack_max_usage = None - self.stack_max_path = None - - def __eq__(self, other): - """Function equality. - - Args: - other: The compared object. - - Returns: - True if equal, False if not. - """ - if not isinstance(other, Function): - return False - if not (self.address == other.address and - self.name == other.name and - self.stack_frame == other.stack_frame and - self.callsites == other.callsites and - self.stack_max_usage == other.stack_max_usage): - return False + def __init__(self, address, name, stack_frame, callsites): + """Constructor. + + Args: + address: Address of function. + name: Name of function from its symbol. + stack_frame: Size of stack frame. + callsites: Callsite list. + """ + self.address = address + self.name = name + self.stack_frame = stack_frame + self.callsites = callsites + self.stack_max_usage = None + self.stack_max_path = None + + def __eq__(self, other): + """Function equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Function): + return False + + if not ( + self.address == other.address + and self.name == other.name + and self.stack_frame == other.stack_frame + and self.callsites == other.callsites + and self.stack_max_usage == other.stack_max_usage + ): + return False + + if self.stack_max_path is None: + return other.stack_max_path is None + elif other.stack_max_path is None: + return False + + if len(self.stack_max_path) != len(other.stack_max_path): + return False + + for self_func, other_func in zip( + self.stack_max_path, other.stack_max_path + ): + # Assume the addresses of functions are unique. + if self_func.address != other_func.address: + return False + + return True + + def __hash__(self): + return id(self) - if self.stack_max_path is None: - return other.stack_max_path is None - elif other.stack_max_path is None: - return False - - if len(self.stack_max_path) != len(other.stack_max_path): - return False - - for self_func, other_func in zip(self.stack_max_path, other.stack_max_path): - # Assume the addresses of functions are unique. - if self_func.address != other_func.address: - return False - - return True - - def __hash__(self): - return id(self) class AndesAnalyzer(object): - """Disassembly analyzer for Andes architecture. - - Public Methods: - AnalyzeFunction: Analyze stack frame and callsites of the function. - """ - - GENERAL_PURPOSE_REGISTER_SIZE = 4 - - # Possible condition code suffixes. - CONDITION_CODES = [ 'eq', 'eqz', 'gez', 'gtz', 'lez', 'ltz', 'ne', 'nez', - 'eqc', 'nec', 'nezs', 'nes', 'eqs'] - CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) - - IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>' - # Branch instructions. - JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$' \ - .format(CONDITION_CODES_RE)) - # Call instructions. - CALL_OPCODE_RE = re.compile \ - (r'^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$') - CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE)) - # Ignore lp register because it's for return. - INDIRECT_CALL_OPERAND_RE = re.compile \ - (r'^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$') - # TODO: Handle other kinds of store instructions. - PUSH_OPCODE_RE = re.compile(r'^push(\d{1,})$') - PUSH_OPERAND_RE = re.compile(r'^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}') - SMW_OPCODE_RE = re.compile(r'^smw(\.\w\w|\.\w\w\w)$') - SMW_OPERAND_RE = re.compile(r'^(\$r\d{1,}|\$\wp), \[\$\wp\], ' - r'(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}') - OPERANDGROUP_RE = re.compile(r'^\$r\d{1,}\~\$r\d{1,}') - - LWI_OPCODE_RE = re.compile(r'^lwi(\.\w\w)$') - LWI_PC_OPERAND_RE = re.compile(r'^\$pc, \[([^\]]+)\]') - # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec" - # Assume there is always a "\t" after the hex data. - DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+' - r'(?P<words>[0-9A-Fa-f ]+)' - r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?') - - def ParseInstruction(self, line, function_end): - """Parse the line of instruction. + """Disassembly analyzer for Andes architecture. - Args: - line: Text of disassembly. - function_end: End address of the current function. None if unknown. - - Returns: - (address, words, opcode, operand_text): The instruction address, words, - opcode, and the text of operands. - None if it isn't an instruction line. + Public Methods: + AnalyzeFunction: Analyze stack frame and callsites of the function. """ - result = self.DISASM_REGEX_RE.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - # Check if it's out of bound. - if function_end is not None and address >= function_end: - return None - - opcode = result.group('opcode').strip() - operand_text = result.group('operand') - words = result.group('words') - if operand_text is None: - operand_text = '' - else: - operand_text = operand_text.strip() - - return (address, words, opcode, operand_text) - - def AnalyzeFunction(self, function_symbol, instructions): - - stack_frame = 0 - callsites = [] - for address, words, opcode, operand_text in instructions: - is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None - is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None - - if is_jump_opcode or is_call_opcode: - is_tail = is_jump_opcode - - result = self.CALL_OPERAND_RE.match(operand_text) + GENERAL_PURPOSE_REGISTER_SIZE = 4 + + # Possible condition code suffixes. + CONDITION_CODES = [ + "eq", + "eqz", + "gez", + "gtz", + "lez", + "ltz", + "ne", + "nez", + "eqc", + "nec", + "nezs", + "nes", + "eqs", + ] + CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES)) + + IMM_ADDRESS_RE = r"([0-9A-Fa-f]+)\s+<([^>]+)>" + # Branch instructions. + JUMP_OPCODE_RE = re.compile( + r"^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$".format(CONDITION_CODES_RE) + ) + # Call instructions. + CALL_OPCODE_RE = re.compile( + r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$" + ) + CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE)) + # Ignore lp register because it's for return. + INDIRECT_CALL_OPERAND_RE = re.compile( + r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$" + ) + # TODO: Handle other kinds of store instructions. + PUSH_OPCODE_RE = re.compile(r"^push(\d{1,})$") + PUSH_OPERAND_RE = re.compile(r"^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}") + SMW_OPCODE_RE = re.compile(r"^smw(\.\w\w|\.\w\w\w)$") + SMW_OPERAND_RE = re.compile( + r"^(\$r\d{1,}|\$\wp), \[\$\wp\], " + r"(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}" + ) + OPERANDGROUP_RE = re.compile(r"^\$r\d{1,}\~\$r\d{1,}") + + LWI_OPCODE_RE = re.compile(r"^lwi(\.\w\w)$") + LWI_PC_OPERAND_RE = re.compile(r"^\$pc, \[([^\]]+)\]") + # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec" + # Assume there is always a "\t" after the hex data. + DISASM_REGEX_RE = re.compile( + r"^(?P<address>[0-9A-Fa-f]+):\s+" + r"(?P<words>[0-9A-Fa-f ]+)" + r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?" + ) + + def ParseInstruction(self, line, function_end): + """Parse the line of instruction. + + Args: + line: Text of disassembly. + function_end: End address of the current function. None if unknown. + + Returns: + (address, words, opcode, operand_text): The instruction address, words, + opcode, and the text of operands. + None if it isn't an instruction line. + """ + result = self.DISASM_REGEX_RE.match(line) if result is None: - if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): - # Found an indirect call. - callsites.append(Callsite(address, None, is_tail)) - + return None + + address = int(result.group("address"), 16) + # Check if it's out of bound. + if function_end is not None and address >= function_end: + return None + + opcode = result.group("opcode").strip() + operand_text = result.group("operand") + words = result.group("words") + if operand_text is None: + operand_text = "" else: - target_address = int(result.group(1), 16) - # Filter out the in-function target (branches and in-function calls, - # which are actually branches). - if not (function_symbol.size > 0 and - function_symbol.address < target_address < - (function_symbol.address + function_symbol.size)): - # Maybe it is a callsite. - callsites.append(Callsite(address, target_address, is_tail)) - - elif self.LWI_OPCODE_RE.match(opcode) is not None: - result = self.LWI_PC_OPERAND_RE.match(operand_text) - if result is not None: - # Ignore "lwi $pc, [$sp], xx" because it's usually a return. - if result.group(1) != '$sp': - # Found an indirect call. - callsites.append(Callsite(address, None, True)) - - elif self.PUSH_OPCODE_RE.match(opcode) is not None: - # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp} - if self.PUSH_OPERAND_RE.match(operand_text) is not None: - # capture fc 20 - imm5u = int(words.split(' ')[1], 16) - # sp = sp - (imm5u << 3) - imm8u = (imm5u<<3) & 0xff - stack_frame += imm8u - - result = self.PUSH_OPERAND_RE.match(operand_text) - operandgroup_text = result.group(1) - # capture $rx~$ry - if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: - # capture number & transfer string to integer - oprandgrouphead = operandgroup_text.split(',')[0] - rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0]))) - ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1]))) - - stack_frame += ((len(operandgroup_text.split(','))+ry-rx) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - else: - stack_frame += (len(operandgroup_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - - elif self.SMW_OPCODE_RE.match(opcode) is not None: - # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp} - if self.SMW_OPERAND_RE.match(operand_text) is not None: - result = self.SMW_OPERAND_RE.match(operand_text) - operandgroup_text = result.group(3) - # capture $rx~$ry - if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: - # capture number & transfer string to integer - oprandgrouphead = operandgroup_text.split(',')[0] - rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0]))) - ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1]))) - - stack_frame += ((len(operandgroup_text.split(','))+ry-rx) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - else: - stack_frame += (len(operandgroup_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - - return (stack_frame, callsites) + operand_text = operand_text.strip() + + return (address, words, opcode, operand_text) + + def AnalyzeFunction(self, function_symbol, instructions): + + stack_frame = 0 + callsites = [] + for address, words, opcode, operand_text in instructions: + is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None + is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None + + if is_jump_opcode or is_call_opcode: + is_tail = is_jump_opcode + + result = self.CALL_OPERAND_RE.match(operand_text) + + if result is None: + if ( + self.INDIRECT_CALL_OPERAND_RE.match(operand_text) + is not None + ): + # Found an indirect call. + callsites.append(Callsite(address, None, is_tail)) + + else: + target_address = int(result.group(1), 16) + # Filter out the in-function target (branches and in-function calls, + # which are actually branches). + if not ( + function_symbol.size > 0 + and function_symbol.address + < target_address + < (function_symbol.address + function_symbol.size) + ): + # Maybe it is a callsite. + callsites.append( + Callsite(address, target_address, is_tail) + ) + + elif self.LWI_OPCODE_RE.match(opcode) is not None: + result = self.LWI_PC_OPERAND_RE.match(operand_text) + if result is not None: + # Ignore "lwi $pc, [$sp], xx" because it's usually a return. + if result.group(1) != "$sp": + # Found an indirect call. + callsites.append(Callsite(address, None, True)) + + elif self.PUSH_OPCODE_RE.match(opcode) is not None: + # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp} + if self.PUSH_OPERAND_RE.match(operand_text) is not None: + # capture fc 20 + imm5u = int(words.split(" ")[1], 16) + # sp = sp - (imm5u << 3) + imm8u = (imm5u << 3) & 0xFF + stack_frame += imm8u + + result = self.PUSH_OPERAND_RE.match(operand_text) + operandgroup_text = result.group(1) + # capture $rx~$ry + if ( + self.OPERANDGROUP_RE.match(operandgroup_text) + is not None + ): + # capture number & transfer string to integer + oprandgrouphead = operandgroup_text.split(",")[0] + rx = int( + "".join( + filter( + str.isdigit, oprandgrouphead.split("~")[0] + ) + ) + ) + ry = int( + "".join( + filter( + str.isdigit, oprandgrouphead.split("~")[1] + ) + ) + ) + + stack_frame += ( + len(operandgroup_text.split(",")) + ry - rx + ) * self.GENERAL_PURPOSE_REGISTER_SIZE + else: + stack_frame += ( + len(operandgroup_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + + elif self.SMW_OPCODE_RE.match(opcode) is not None: + # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp} + if self.SMW_OPERAND_RE.match(operand_text) is not None: + result = self.SMW_OPERAND_RE.match(operand_text) + operandgroup_text = result.group(3) + # capture $rx~$ry + if ( + self.OPERANDGROUP_RE.match(operandgroup_text) + is not None + ): + # capture number & transfer string to integer + oprandgrouphead = operandgroup_text.split(",")[0] + rx = int( + "".join( + filter( + str.isdigit, oprandgrouphead.split("~")[0] + ) + ) + ) + ry = int( + "".join( + filter( + str.isdigit, oprandgrouphead.split("~")[1] + ) + ) + ) + + stack_frame += ( + len(operandgroup_text.split(",")) + ry - rx + ) * self.GENERAL_PURPOSE_REGISTER_SIZE + else: + stack_frame += ( + len(operandgroup_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + + return (stack_frame, callsites) -class ArmAnalyzer(object): - """Disassembly analyzer for ARM architecture. - - Public Methods: - AnalyzeFunction: Analyze stack frame and callsites of the function. - """ - - GENERAL_PURPOSE_REGISTER_SIZE = 4 - - # Possible condition code suffixes. - CONDITION_CODES = ['', 'eq', 'ne', 'cs', 'hs', 'cc', 'lo', 'mi', 'pl', 'vs', - 'vc', 'hi', 'ls', 'ge', 'lt', 'gt', 'le'] - CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) - # Assume there is no function name containing ">". - IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>' - - # Fuzzy regular expressions for instruction and operand parsing. - # Branch instructions. - JUMP_OPCODE_RE = re.compile( - r'^(b{0}|bx{0})(\.\w)?$'.format(CONDITION_CODES_RE)) - # Call instructions. - CALL_OPCODE_RE = re.compile( - r'^(bl{0}|blx{0})(\.\w)?$'.format(CONDITION_CODES_RE)) - CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE)) - CBZ_CBNZ_OPCODE_RE = re.compile(r'^(cbz|cbnz)(\.\w)?$') - # Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>" - CBZ_CBNZ_OPERAND_RE = re.compile(r'^[^,]+,\s+{}$'.format(IMM_ADDRESS_RE)) - # Ignore lr register because it's for return. - INDIRECT_CALL_OPERAND_RE = re.compile(r'^r\d+|sb|sl|fp|ip|sp|pc$') - # TODO(cheyuw): Handle conditional versions of following - # instructions. - # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc). - LDR_OPCODE_RE = re.compile(r'^ldr(\.\w)?$') - # Example: "pc, [sp], #4" - LDR_PC_OPERAND_RE = re.compile(r'^pc, \[([^\]]+)\]') - # TODO(cheyuw): Handle other kinds of stm instructions. - PUSH_OPCODE_RE = re.compile(r'^push$') - STM_OPCODE_RE = re.compile(r'^stmdb$') - # Stack subtraction instructions. - SUB_OPCODE_RE = re.compile(r'^sub(s|w)?(\.\w)?$') - SUB_OPERAND_RE = re.compile(r'^sp[^#]+#(\d+)') - # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68" - # Assume there is always a "\t" after the hex data. - DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+' - r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?') - - def ParseInstruction(self, line, function_end): - """Parse the line of instruction. - Args: - line: Text of disassembly. - function_end: End address of the current function. None if unknown. +class ArmAnalyzer(object): + """Disassembly analyzer for ARM architecture. - Returns: - (address, opcode, operand_text): The instruction address, opcode, - and the text of operands. None if it - isn't an instruction line. + Public Methods: + AnalyzeFunction: Analyze stack frame and callsites of the function. """ - result = self.DISASM_REGEX_RE.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - # Check if it's out of bound. - if function_end is not None and address >= function_end: - return None - - opcode = result.group('opcode').strip() - operand_text = result.group('operand') - if operand_text is None: - operand_text = '' - else: - operand_text = operand_text.strip() - - return (address, opcode, operand_text) - - def AnalyzeFunction(self, function_symbol, instructions): - """Analyze function, resolve the size of stack frame and callsites. - - Args: - function_symbol: Function symbol. - instructions: Instruction list. - - Returns: - (stack_frame, callsites): Size of stack frame, callsite list. - """ - stack_frame = 0 - callsites = [] - for address, opcode, operand_text in instructions: - is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None - is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None - is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None - if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode: - is_tail = is_jump_opcode or is_cbz_cbnz_opcode - - if is_cbz_cbnz_opcode: - result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text) - else: - result = self.CALL_OPERAND_RE.match(operand_text) + GENERAL_PURPOSE_REGISTER_SIZE = 4 + + # Possible condition code suffixes. + CONDITION_CODES = [ + "", + "eq", + "ne", + "cs", + "hs", + "cc", + "lo", + "mi", + "pl", + "vs", + "vc", + "hi", + "ls", + "ge", + "lt", + "gt", + "le", + ] + CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES)) + # Assume there is no function name containing ">". + IMM_ADDRESS_RE = r"([0-9A-Fa-f]+)\s+<([^>]+)>" + + # Fuzzy regular expressions for instruction and operand parsing. + # Branch instructions. + JUMP_OPCODE_RE = re.compile( + r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE) + ) + # Call instructions. + CALL_OPCODE_RE = re.compile( + r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE) + ) + CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE)) + CBZ_CBNZ_OPCODE_RE = re.compile(r"^(cbz|cbnz)(\.\w)?$") + # Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>" + CBZ_CBNZ_OPERAND_RE = re.compile(r"^[^,]+,\s+{}$".format(IMM_ADDRESS_RE)) + # Ignore lr register because it's for return. + INDIRECT_CALL_OPERAND_RE = re.compile(r"^r\d+|sb|sl|fp|ip|sp|pc$") + # TODO(cheyuw): Handle conditional versions of following + # instructions. + # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc). + LDR_OPCODE_RE = re.compile(r"^ldr(\.\w)?$") + # Example: "pc, [sp], #4" + LDR_PC_OPERAND_RE = re.compile(r"^pc, \[([^\]]+)\]") + # TODO(cheyuw): Handle other kinds of stm instructions. + PUSH_OPCODE_RE = re.compile(r"^push$") + STM_OPCODE_RE = re.compile(r"^stmdb$") + # Stack subtraction instructions. + SUB_OPCODE_RE = re.compile(r"^sub(s|w)?(\.\w)?$") + SUB_OPERAND_RE = re.compile(r"^sp[^#]+#(\d+)") + # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68" + # Assume there is always a "\t" after the hex data. + DISASM_REGEX_RE = re.compile( + r"^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+" + r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?" + ) + + def ParseInstruction(self, line, function_end): + """Parse the line of instruction. + + Args: + line: Text of disassembly. + function_end: End address of the current function. None if unknown. + + Returns: + (address, opcode, operand_text): The instruction address, opcode, + and the text of operands. None if it + isn't an instruction line. + """ + result = self.DISASM_REGEX_RE.match(line) if result is None: - # Failed to match immediate address, maybe it is an indirect call. - # CBZ and CBNZ can't be indirect calls. - if (not is_cbz_cbnz_opcode and - self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): - # Found an indirect call. - callsites.append(Callsite(address, None, is_tail)) + return None - else: - target_address = int(result.group(1), 16) - # Filter out the in-function target (branches and in-function calls, - # which are actually branches). - if not (function_symbol.size > 0 and - function_symbol.address < target_address < - (function_symbol.address + function_symbol.size)): - # Maybe it is a callsite. - callsites.append(Callsite(address, target_address, is_tail)) - - elif self.LDR_OPCODE_RE.match(opcode) is not None: - result = self.LDR_PC_OPERAND_RE.match(operand_text) - if result is not None: - # Ignore "ldr pc, [sp], xx" because it's usually a return. - if result.group(1) != 'sp': - # Found an indirect call. - callsites.append(Callsite(address, None, True)) - - elif self.PUSH_OPCODE_RE.match(opcode) is not None: - # Example: "{r4, r5, r6, r7, lr}" - stack_frame += (len(operand_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - elif self.SUB_OPCODE_RE.match(opcode) is not None: - result = self.SUB_OPERAND_RE.match(operand_text) - if result is not None: - stack_frame += int(result.group(1)) - else: - # Unhandled stack register subtraction. - assert not operand_text.startswith('sp') + address = int(result.group("address"), 16) + # Check if it's out of bound. + if function_end is not None and address >= function_end: + return None - elif self.STM_OPCODE_RE.match(opcode) is not None: - if operand_text.startswith('sp!'): - # Subtract and writeback to stack register. - # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}" - # Get the text of pushed register list. - unused_sp, unused_sep, parameter_text = operand_text.partition(',') - stack_frame += (len(parameter_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) + opcode = result.group("opcode").strip() + operand_text = result.group("operand") + if operand_text is None: + operand_text = "" + else: + operand_text = operand_text.strip() + + return (address, opcode, operand_text) + + def AnalyzeFunction(self, function_symbol, instructions): + """Analyze function, resolve the size of stack frame and callsites. + + Args: + function_symbol: Function symbol. + instructions: Instruction list. + + Returns: + (stack_frame, callsites): Size of stack frame, callsite list. + """ + stack_frame = 0 + callsites = [] + for address, opcode, operand_text in instructions: + is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None + is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None + is_cbz_cbnz_opcode = ( + self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None + ) + if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode: + is_tail = is_jump_opcode or is_cbz_cbnz_opcode + + if is_cbz_cbnz_opcode: + result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text) + else: + result = self.CALL_OPERAND_RE.match(operand_text) + + if result is None: + # Failed to match immediate address, maybe it is an indirect call. + # CBZ and CBNZ can't be indirect calls. + if ( + not is_cbz_cbnz_opcode + and self.INDIRECT_CALL_OPERAND_RE.match(operand_text) + is not None + ): + # Found an indirect call. + callsites.append(Callsite(address, None, is_tail)) + + else: + target_address = int(result.group(1), 16) + # Filter out the in-function target (branches and in-function calls, + # which are actually branches). + if not ( + function_symbol.size > 0 + and function_symbol.address + < target_address + < (function_symbol.address + function_symbol.size) + ): + # Maybe it is a callsite. + callsites.append( + Callsite(address, target_address, is_tail) + ) + + elif self.LDR_OPCODE_RE.match(opcode) is not None: + result = self.LDR_PC_OPERAND_RE.match(operand_text) + if result is not None: + # Ignore "ldr pc, [sp], xx" because it's usually a return. + if result.group(1) != "sp": + # Found an indirect call. + callsites.append(Callsite(address, None, True)) + + elif self.PUSH_OPCODE_RE.match(opcode) is not None: + # Example: "{r4, r5, r6, r7, lr}" + stack_frame += ( + len(operand_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + elif self.SUB_OPCODE_RE.match(opcode) is not None: + result = self.SUB_OPERAND_RE.match(operand_text) + if result is not None: + stack_frame += int(result.group(1)) + else: + # Unhandled stack register subtraction. + assert not operand_text.startswith("sp") + + elif self.STM_OPCODE_RE.match(opcode) is not None: + if operand_text.startswith("sp!"): + # Subtract and writeback to stack register. + # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}" + # Get the text of pushed register list. + ( + unused_sp, + unused_sep, + parameter_text, + ) = operand_text.partition(",") + stack_frame += ( + len(parameter_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + + return (stack_frame, callsites) - return (stack_frame, callsites) class RiscvAnalyzer(object): - """Disassembly analyzer for RISC-V architecture. - - Public Methods: - AnalyzeFunction: Analyze stack frame and callsites of the function. - """ - - # Possible condition code suffixes. - CONDITION_CODES = [ 'eqz', 'nez', 'lez', 'gez', 'ltz', 'gtz', 'gt', 'le', - 'gtu', 'leu', 'eq', 'ne', 'ge', 'lt', 'ltu', 'geu'] - CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) - # Branch instructions. - JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr)$'.format(CONDITION_CODES_RE)) - # Call instructions. - CALL_OPCODE_RE = re.compile(r'^(jal|jalr)$') - # Example: "j 8009b318 <set_state_prl_hr>" or - # "jal ra,800a4394 <power_get_signals>" or - # "bltu t0,t1,80080300 <data_loop>" - JUMP_ADDRESS_RE = r'((\w(\w|\d\d),){0,2})([0-9A-Fa-f]+)\s+<([^>]+)>' - CALL_OPERAND_RE = re.compile(r'^{}$'.format(JUMP_ADDRESS_RE)) - # Capture address, Example: 800a4394 - CAPTURE_ADDRESS = re.compile(r'[0-9A-Fa-f]{8}') - # Indirect jump, Example: jalr a5 - INDIRECT_CALL_OPERAND_RE = re.compile(r'^t\d+|s\d+|a\d+$') - # Example: addi - ADDI_OPCODE_RE = re.compile(r'^addi$') - # Allocate stack instructions. - ADDI_OPERAND_RE = re.compile(r'^(sp,sp,-\d+)$') - # Example: "800804b6: 1101 addi sp,sp,-32" - DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+' - r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?') - - def ParseInstruction(self, line, function_end): - """Parse the line of instruction. + """Disassembly analyzer for RISC-V architecture. - Args: - line: Text of disassembly. - function_end: End address of the current function. None if unknown. - - Returns: - (address, opcode, operand_text): The instruction address, opcode, - and the text of operands. None if it - isn't an instruction line. + Public Methods: + AnalyzeFunction: Analyze stack frame and callsites of the function. """ - result = self.DISASM_REGEX_RE.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - # Check if it's out of bound. - if function_end is not None and address >= function_end: - return None - - opcode = result.group('opcode').strip() - operand_text = result.group('operand') - if operand_text is None: - operand_text = '' - else: - operand_text = operand_text.strip() - - return (address, opcode, operand_text) - - def AnalyzeFunction(self, function_symbol, instructions): - - stack_frame = 0 - callsites = [] - for address, opcode, operand_text in instructions: - is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None - is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None - if is_jump_opcode or is_call_opcode: - is_tail = is_jump_opcode - - result = self.CALL_OPERAND_RE.match(operand_text) + # Possible condition code suffixes. + CONDITION_CODES = [ + "eqz", + "nez", + "lez", + "gez", + "ltz", + "gtz", + "gt", + "le", + "gtu", + "leu", + "eq", + "ne", + "ge", + "lt", + "ltu", + "geu", + ] + CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES)) + # Branch instructions. + JUMP_OPCODE_RE = re.compile(r"^(b{0}|j|jr)$".format(CONDITION_CODES_RE)) + # Call instructions. + CALL_OPCODE_RE = re.compile(r"^(jal|jalr)$") + # Example: "j 8009b318 <set_state_prl_hr>" or + # "jal ra,800a4394 <power_get_signals>" or + # "bltu t0,t1,80080300 <data_loop>" + JUMP_ADDRESS_RE = r"((\w(\w|\d\d),){0,2})([0-9A-Fa-f]+)\s+<([^>]+)>" + CALL_OPERAND_RE = re.compile(r"^{}$".format(JUMP_ADDRESS_RE)) + # Capture address, Example: 800a4394 + CAPTURE_ADDRESS = re.compile(r"[0-9A-Fa-f]{8}") + # Indirect jump, Example: jalr a5 + INDIRECT_CALL_OPERAND_RE = re.compile(r"^t\d+|s\d+|a\d+$") + # Example: addi + ADDI_OPCODE_RE = re.compile(r"^addi$") + # Allocate stack instructions. + ADDI_OPERAND_RE = re.compile(r"^(sp,sp,-\d+)$") + # Example: "800804b6: 1101 addi sp,sp,-32" + DISASM_REGEX_RE = re.compile( + r"^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+" + r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?" + ) + + def ParseInstruction(self, line, function_end): + """Parse the line of instruction. + + Args: + line: Text of disassembly. + function_end: End address of the current function. None if unknown. + + Returns: + (address, opcode, operand_text): The instruction address, opcode, + and the text of operands. None if it + isn't an instruction line. + """ + result = self.DISASM_REGEX_RE.match(line) if result is None: - if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): - # Found an indirect call. - callsites.append(Callsite(address, None, is_tail)) - - else: - # Capture address form operand_text and then convert to string - address_str = "".join(self.CAPTURE_ADDRESS.findall(operand_text)) - # String to integer - target_address = int(address_str, 16) - # Filter out the in-function target (branches and in-function calls, - # which are actually branches). - if not (function_symbol.size > 0 and - function_symbol.address < target_address < - (function_symbol.address + function_symbol.size)): - # Maybe it is a callsite. - callsites.append(Callsite(address, target_address, is_tail)) - - elif self.ADDI_OPCODE_RE.match(opcode) is not None: - # Example: sp,sp,-32 - if self.ADDI_OPERAND_RE.match(operand_text) is not None: - stack_frame += abs(int(operand_text.split(",")[2])) - - return (stack_frame, callsites) - -class StackAnalyzer(object): - """Class to analyze stack usage. - - Public Methods: - Analyze: Run the stack analysis. - """ - - C_FUNCTION_NAME = r'_A-Za-z0-9' - - # Assume there is no ":" in the path. - # Example: "driver/accel_kionix.c:321 (discriminator 3)" - ADDRTOLINE_RE = re.compile( - r'^(?P<path>[^:]+):(?P<linenum>\d+)(\s+\(discriminator\s+\d+\))?$') - # To eliminate the suffix appended by compilers, try to extract the - # C function name from the prefix of symbol name. - # Example: "SHA256_transform.constprop.28" - FUNCTION_PREFIX_NAME_RE = re.compile( - r'^(?P<name>[{0}]+)([^{0}].*)?$'.format(C_FUNCTION_NAME)) - - # Errors of annotation resolving. - ANNOTATION_ERROR_INVALID = 'invalid signature' - ANNOTATION_ERROR_NOTFOUND = 'function is not found' - ANNOTATION_ERROR_AMBIGUOUS = 'signature is ambiguous' - - def __init__(self, options, symbols, rodata, tasklist, annotation): - """Constructor. - - Args: - options: Namespace from argparse.parse_args(). - symbols: Symbol list. - rodata: Content of .rodata section (offset, data) - tasklist: Task list. - annotation: Annotation config. - """ - self.options = options - self.symbols = symbols - self.rodata_offset = rodata[0] - self.rodata = rodata[1] - self.tasklist = tasklist - self.annotation = annotation - self.address_to_line_cache = {} - - def AddressToLine(self, address, resolve_inline=False): - """Convert address to line. + return None - Args: - address: Target address. - resolve_inline: Output the stack of inlining. - - Returns: - lines: List of the corresponding lines. - - Raises: - StackAnalyzerError: If addr2line is failed. - """ - cache_key = (address, resolve_inline) - if cache_key in self.address_to_line_cache: - return self.address_to_line_cache[cache_key] - - try: - args = [self.options.addr2line, - '-f', - '-e', - self.options.elf_path, - '{:x}'.format(address)] - if resolve_inline: - args.append('-i') - - line_text = subprocess.check_output(args, encoding='utf-8') - except subprocess.CalledProcessError: - raise StackAnalyzerError('addr2line failed to resolve lines.') - except OSError: - raise StackAnalyzerError('Failed to run addr2line.') - - lines = [line.strip() for line in line_text.splitlines()] - # Assume the output has at least one pair like "function\nlocation\n", and - # they always show up in pairs. - # Example: "handle_request\n - # common/usb_pd_protocol.c:1191\n" - assert len(lines) >= 2 and len(lines) % 2 == 0 - - line_infos = [] - for index in range(0, len(lines), 2): - (function_name, line_text) = lines[index:index + 2] - if line_text in ['??:0', ':?']: - line_infos.append(None) - else: - result = self.ADDRTOLINE_RE.match(line_text) - # Assume the output is always well-formed. - assert result is not None - line_infos.append((function_name.strip(), - os.path.realpath(result.group('path').strip()), - int(result.group('linenum')))) - - self.address_to_line_cache[cache_key] = line_infos - return line_infos - - def AnalyzeDisassembly(self, disasm_text): - """Parse the disassembly text, analyze, and build a map of all functions. - - Args: - disasm_text: Disassembly text. + address = int(result.group("address"), 16) + # Check if it's out of bound. + if function_end is not None and address >= function_end: + return None - Returns: - function_map: Dict of functions. - """ - disasm_lines = [line.strip() for line in disasm_text.splitlines()] - - if 'nds' in disasm_lines[1]: - analyzer = AndesAnalyzer() - elif 'arm' in disasm_lines[1]: - analyzer = ArmAnalyzer() - elif 'riscv' in disasm_lines[1]: - analyzer = RiscvAnalyzer() - else: - raise StackAnalyzerError('Unsupported architecture.') - - # Example: "08028c8c <motion_lid_calc>:" - function_signature_regex = re.compile( - r'^(?P<address>[0-9A-Fa-f]+)\s+<(?P<name>[^>]+)>:$') - - def DetectFunctionHead(line): - """Check if the line is a function head. - - Args: - line: Text of disassembly. - - Returns: - symbol: Function symbol. None if it isn't a function head. - """ - result = function_signature_regex.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - symbol = symbol_map.get(address) - - # Check if the function exists and matches. - if symbol is None or symbol.symtype != 'F': - return None - - return symbol - - # Build symbol map, indexed by symbol address. - symbol_map = {} - for symbol in self.symbols: - # If there are multiple symbols with same address, keeping any of them is - # good enough. - symbol_map[symbol.address] = symbol - - # Parse the disassembly text. We update the variable "line" to next line - # when needed. There are two steps of parser: - # - # Step 1: Searching for the function head. Once reach the function head, - # move to the next line, which is the first line of function body. - # - # Step 2: Parsing each instruction line of function body. Once reach a - # non-instruction line, stop parsing and analyze the parsed instructions. - # - # Finally turn back to the step 1 without updating the line, because the - # current non-instruction line can be another function head. - function_map = {} - # The following three variables are the states of the parsing processing. - # They will be initialized properly during the state changes. - function_symbol = None - function_end = None - instructions = [] - - # Remove heading and tailing spaces for each line. - line_index = 0 - while line_index < len(disasm_lines): - # Get the current line. - line = disasm_lines[line_index] - - if function_symbol is None: - # Step 1: Search for the function head. - - function_symbol = DetectFunctionHead(line) - if function_symbol is not None: - # Assume there is no empty function. If the function head is followed - # by EOF, it is an empty function. - assert line_index + 1 < len(disasm_lines) - - # Found the function head, initialize and turn to the step 2. - instructions = [] - # If symbol size exists, use it as a hint of function size. - if function_symbol.size > 0: - function_end = function_symbol.address + function_symbol.size - else: - function_end = None - - else: - # Step 2: Parse the function body. - - instruction = analyzer.ParseInstruction(line, function_end) - if instruction is not None: - instructions.append(instruction) - - if instruction is None or line_index + 1 == len(disasm_lines): - # Either the invalid instruction or EOF indicates the end of the - # function, finalize the function analysis. - - # Assume there is no empty function. - assert len(instructions) > 0 - - (stack_frame, callsites) = analyzer.AnalyzeFunction(function_symbol, - instructions) - # Assume the function addresses are unique in the disassembly. - assert function_symbol.address not in function_map - function_map[function_symbol.address] = Function( - function_symbol.address, - function_symbol.name, - stack_frame, - callsites) - - # Initialize and turn back to the step 1. - function_symbol = None - - # If the current line isn't an instruction, it can be another function - # head, skip moving to the next line. - if instruction is None: - continue - - # Move to the next line. - line_index += 1 - - # Resolve callees of functions. - for function in function_map.values(): - for callsite in function.callsites: - if callsite.target is not None: - # Remain the callee as None if we can't resolve it. - callsite.callee = function_map.get(callsite.target) - - return function_map + opcode = result.group("opcode").strip() + operand_text = result.group("operand") + if operand_text is None: + operand_text = "" + else: + operand_text = operand_text.strip() + + return (address, opcode, operand_text) + + def AnalyzeFunction(self, function_symbol, instructions): + + stack_frame = 0 + callsites = [] + for address, opcode, operand_text in instructions: + is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None + is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None + + if is_jump_opcode or is_call_opcode: + is_tail = is_jump_opcode + + result = self.CALL_OPERAND_RE.match(operand_text) + if result is None: + if ( + self.INDIRECT_CALL_OPERAND_RE.match(operand_text) + is not None + ): + # Found an indirect call. + callsites.append(Callsite(address, None, is_tail)) + + else: + # Capture address form operand_text and then convert to string + address_str = "".join( + self.CAPTURE_ADDRESS.findall(operand_text) + ) + # String to integer + target_address = int(address_str, 16) + # Filter out the in-function target (branches and in-function calls, + # which are actually branches). + if not ( + function_symbol.size > 0 + and function_symbol.address + < target_address + < (function_symbol.address + function_symbol.size) + ): + # Maybe it is a callsite. + callsites.append( + Callsite(address, target_address, is_tail) + ) + + elif self.ADDI_OPCODE_RE.match(opcode) is not None: + # Example: sp,sp,-32 + if self.ADDI_OPERAND_RE.match(operand_text) is not None: + stack_frame += abs(int(operand_text.split(",")[2])) + + return (stack_frame, callsites) - def MapAnnotation(self, function_map, signature_set): - """Map annotation signatures to functions. - Args: - function_map: Function map. - signature_set: Set of annotation signatures. +class StackAnalyzer(object): + """Class to analyze stack usage. - Returns: - Map of signatures to functions, map of signatures which can't be resolved. + Public Methods: + Analyze: Run the stack analysis. """ - # Build the symbol map indexed by symbol name. If there are multiple symbols - # with the same name, add them into a set. (e.g. symbols of static function - # with the same name) - symbol_map = collections.defaultdict(set) - for symbol in self.symbols: - if symbol.symtype == 'F': - # Function symbol. - result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) - if result is not None: - function = function_map.get(symbol.address) - # Ignore the symbol not in disassembly. - if function is not None: - # If there are multiple symbol with the same name and point to the - # same function, the set will deduplicate them. - symbol_map[result.group('name').strip()].add(function) - - # Build the signature map indexed by annotation signature. - signature_map = {} - sig_error_map = {} - symbol_path_map = {} - for sig in signature_set: - (name, path, _) = sig - - functions = symbol_map.get(name) - if functions is None: - sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND - continue - - if name not in symbol_path_map: - # Lazy symbol path resolving. Since the addr2line isn't fast, only - # resolve needed symbol paths. - group_map = collections.defaultdict(list) - for function in functions: - line_info = self.AddressToLine(function.address)[0] - if line_info is None: - continue - (_, symbol_path, _) = line_info + C_FUNCTION_NAME = r"_A-Za-z0-9" - # Group the functions with the same symbol signature (symbol name + - # symbol path). Assume they are the same copies and do the same - # annotation operations of them because we don't know which copy is - # indicated by the users. - group_map[symbol_path].append(function) + # Assume there is no ":" in the path. + # Example: "driver/accel_kionix.c:321 (discriminator 3)" + ADDRTOLINE_RE = re.compile( + r"^(?P<path>[^:]+):(?P<linenum>\d+)(\s+\(discriminator\s+\d+\))?$" + ) + # To eliminate the suffix appended by compilers, try to extract the + # C function name from the prefix of symbol name. + # Example: "SHA256_transform.constprop.28" + FUNCTION_PREFIX_NAME_RE = re.compile( + r"^(?P<name>[{0}]+)([^{0}].*)?$".format(C_FUNCTION_NAME) + ) + + # Errors of annotation resolving. + ANNOTATION_ERROR_INVALID = "invalid signature" + ANNOTATION_ERROR_NOTFOUND = "function is not found" + ANNOTATION_ERROR_AMBIGUOUS = "signature is ambiguous" + + def __init__(self, options, symbols, rodata, tasklist, annotation): + """Constructor. + + Args: + options: Namespace from argparse.parse_args(). + symbols: Symbol list. + rodata: Content of .rodata section (offset, data) + tasklist: Task list. + annotation: Annotation config. + """ + self.options = options + self.symbols = symbols + self.rodata_offset = rodata[0] + self.rodata = rodata[1] + self.tasklist = tasklist + self.annotation = annotation + self.address_to_line_cache = {} + + def AddressToLine(self, address, resolve_inline=False): + """Convert address to line. + + Args: + address: Target address. + resolve_inline: Output the stack of inlining. + + Returns: + lines: List of the corresponding lines. + + Raises: + StackAnalyzerError: If addr2line is failed. + """ + cache_key = (address, resolve_inline) + if cache_key in self.address_to_line_cache: + return self.address_to_line_cache[cache_key] + + try: + args = [ + self.options.addr2line, + "-f", + "-e", + self.options.elf_path, + "{:x}".format(address), + ] + if resolve_inline: + args.append("-i") + + line_text = subprocess.check_output(args, encoding="utf-8") + except subprocess.CalledProcessError: + raise StackAnalyzerError("addr2line failed to resolve lines.") + except OSError: + raise StackAnalyzerError("Failed to run addr2line.") + + lines = [line.strip() for line in line_text.splitlines()] + # Assume the output has at least one pair like "function\nlocation\n", and + # they always show up in pairs. + # Example: "handle_request\n + # common/usb_pd_protocol.c:1191\n" + assert len(lines) >= 2 and len(lines) % 2 == 0 + + line_infos = [] + for index in range(0, len(lines), 2): + (function_name, line_text) = lines[index : index + 2] + if line_text in ["??:0", ":?"]: + line_infos.append(None) + else: + result = self.ADDRTOLINE_RE.match(line_text) + # Assume the output is always well-formed. + assert result is not None + line_infos.append( + ( + function_name.strip(), + os.path.realpath(result.group("path").strip()), + int(result.group("linenum")), + ) + ) + + self.address_to_line_cache[cache_key] = line_infos + return line_infos + + def AnalyzeDisassembly(self, disasm_text): + """Parse the disassembly text, analyze, and build a map of all functions. + + Args: + disasm_text: Disassembly text. + + Returns: + function_map: Dict of functions. + """ + disasm_lines = [line.strip() for line in disasm_text.splitlines()] + + if "nds" in disasm_lines[1]: + analyzer = AndesAnalyzer() + elif "arm" in disasm_lines[1]: + analyzer = ArmAnalyzer() + elif "riscv" in disasm_lines[1]: + analyzer = RiscvAnalyzer() + else: + raise StackAnalyzerError("Unsupported architecture.") - symbol_path_map[name] = group_map + # Example: "08028c8c <motion_lid_calc>:" + function_signature_regex = re.compile( + r"^(?P<address>[0-9A-Fa-f]+)\s+<(?P<name>[^>]+)>:$" + ) - # Symbol matching. - function_group = None - group_map = symbol_path_map[name] - if len(group_map) > 0: - if path is None: - if len(group_map) > 1: - # There is ambiguity but the path isn't specified. - sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS - continue + def DetectFunctionHead(line): + """Check if the line is a function head. - # No path signature but all symbol signatures of functions are same. - # Assume they are the same functions, so there is no ambiguity. - (function_group,) = group_map.values() - else: - function_group = group_map.get(path) + Args: + line: Text of disassembly. - if function_group is None: - sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND - continue + Returns: + symbol: Function symbol. None if it isn't a function head. + """ + result = function_signature_regex.match(line) + if result is None: + return None - # The function_group is a list of all the same functions (according to - # our assumption) which should be annotated together. - signature_map[sig] = function_group + address = int(result.group("address"), 16) + symbol = symbol_map.get(address) - return (signature_map, sig_error_map) + # Check if the function exists and matches. + if symbol is None or symbol.symtype != "F": + return None - def LoadAnnotation(self): - """Load annotation rules. + return symbol - Returns: - Map of add rules, set of remove rules, set of text signatures which can't - be parsed. - """ - # Assume there is no ":" in the path. - # Example: "get_range.lto.2501[driver/accel_kionix.c:327]" - annotation_signature_regex = re.compile( - r'^(?P<name>[^\[]+)(\[(?P<path>[^:]+)(:(?P<linenum>\d+))?\])?$') - - def NormalizeSignature(signature_text): - """Parse and normalize the annotation signature. - - Args: - signature_text: Text of the annotation signature. - - Returns: - (function name, path, line number) of the signature. The path and line - number can be None if not exist. None if failed to parse. - """ - result = annotation_signature_regex.match(signature_text.strip()) - if result is None: - return None - - name_result = self.FUNCTION_PREFIX_NAME_RE.match( - result.group('name').strip()) - if name_result is None: - return None - - path = result.group('path') - if path is not None: - path = os.path.realpath(path.strip()) - - linenum = result.group('linenum') - if linenum is not None: - linenum = int(linenum.strip()) - - return (name_result.group('name').strip(), path, linenum) - - def ExpandArray(dic): - """Parse and expand a symbol array - - Args: - dic: Dictionary for the array annotation - - Returns: - array of (symbol name, None, None). - """ - # TODO(drinkcat): This function is quite inefficient, as it goes through - # the symbol table multiple times. - - begin_name = dic['name'] - end_name = dic['name'] + "_end" - offset = dic['offset'] if 'offset' in dic else 0 - stride = dic['stride'] - - begin_address = None - end_address = None - - for symbol in self.symbols: - if (symbol.name == begin_name): - begin_address = symbol.address - if (symbol.name == end_name): - end_address = symbol.address - - if (not begin_address or not end_address): - return None - - output = [] - # TODO(drinkcat): This is inefficient as we go from address to symbol - # object then to symbol name, and later on we'll go back from symbol name - # to symbol object. - for addr in range(begin_address+offset, end_address, stride): - # TODO(drinkcat): Not all architectures need to drop the first bit. - val = self.rodata[(addr-self.rodata_offset) // 4] & 0xfffffffe - name = None + # Build symbol map, indexed by symbol address. + symbol_map = {} for symbol in self.symbols: - if (symbol.address == val): - result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) - name = result.group('name') - break - - if not name: - raise StackAnalyzerError('Cannot find function for address %s.', - hex(val)) - - output.append((name, None, None)) - - return output - - add_rules = collections.defaultdict(set) - remove_rules = list() - invalid_sigtxts = set() - - if 'add' in self.annotation and self.annotation['add'] is not None: - for src_sigtxt, dst_sigtxts in self.annotation['add'].items(): - src_sig = NormalizeSignature(src_sigtxt) - if src_sig is None: - invalid_sigtxts.add(src_sigtxt) - continue - - for dst_sigtxt in dst_sigtxts: - if isinstance(dst_sigtxt, dict): - dst_sig = ExpandArray(dst_sigtxt) - if dst_sig is None: - invalid_sigtxts.add(str(dst_sigtxt)) + # If there are multiple symbols with same address, keeping any of them is + # good enough. + symbol_map[symbol.address] = symbol + + # Parse the disassembly text. We update the variable "line" to next line + # when needed. There are two steps of parser: + # + # Step 1: Searching for the function head. Once reach the function head, + # move to the next line, which is the first line of function body. + # + # Step 2: Parsing each instruction line of function body. Once reach a + # non-instruction line, stop parsing and analyze the parsed instructions. + # + # Finally turn back to the step 1 without updating the line, because the + # current non-instruction line can be another function head. + function_map = {} + # The following three variables are the states of the parsing processing. + # They will be initialized properly during the state changes. + function_symbol = None + function_end = None + instructions = [] + + # Remove heading and tailing spaces for each line. + line_index = 0 + while line_index < len(disasm_lines): + # Get the current line. + line = disasm_lines[line_index] + + if function_symbol is None: + # Step 1: Search for the function head. + + function_symbol = DetectFunctionHead(line) + if function_symbol is not None: + # Assume there is no empty function. If the function head is followed + # by EOF, it is an empty function. + assert line_index + 1 < len(disasm_lines) + + # Found the function head, initialize and turn to the step 2. + instructions = [] + # If symbol size exists, use it as a hint of function size. + if function_symbol.size > 0: + function_end = ( + function_symbol.address + function_symbol.size + ) + else: + function_end = None + else: - add_rules[src_sig].update(dst_sig) - else: - dst_sig = NormalizeSignature(dst_sigtxt) - if dst_sig is None: - invalid_sigtxts.add(dst_sigtxt) + # Step 2: Parse the function body. + + instruction = analyzer.ParseInstruction(line, function_end) + if instruction is not None: + instructions.append(instruction) + + if instruction is None or line_index + 1 == len(disasm_lines): + # Either the invalid instruction or EOF indicates the end of the + # function, finalize the function analysis. + + # Assume there is no empty function. + assert len(instructions) > 0 + + (stack_frame, callsites) = analyzer.AnalyzeFunction( + function_symbol, instructions + ) + # Assume the function addresses are unique in the disassembly. + assert function_symbol.address not in function_map + function_map[function_symbol.address] = Function( + function_symbol.address, + function_symbol.name, + stack_frame, + callsites, + ) + + # Initialize and turn back to the step 1. + function_symbol = None + + # If the current line isn't an instruction, it can be another function + # head, skip moving to the next line. + if instruction is None: + continue + + # Move to the next line. + line_index += 1 + + # Resolve callees of functions. + for function in function_map.values(): + for callsite in function.callsites: + if callsite.target is not None: + # Remain the callee as None if we can't resolve it. + callsite.callee = function_map.get(callsite.target) + + return function_map + + def MapAnnotation(self, function_map, signature_set): + """Map annotation signatures to functions. + + Args: + function_map: Function map. + signature_set: Set of annotation signatures. + + Returns: + Map of signatures to functions, map of signatures which can't be resolved. + """ + # Build the symbol map indexed by symbol name. If there are multiple symbols + # with the same name, add them into a set. (e.g. symbols of static function + # with the same name) + symbol_map = collections.defaultdict(set) + for symbol in self.symbols: + if symbol.symtype == "F": + # Function symbol. + result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) + if result is not None: + function = function_map.get(symbol.address) + # Ignore the symbol not in disassembly. + if function is not None: + # If there are multiple symbol with the same name and point to the + # same function, the set will deduplicate them. + symbol_map[result.group("name").strip()].add(function) + + # Build the signature map indexed by annotation signature. + signature_map = {} + sig_error_map = {} + symbol_path_map = {} + for sig in signature_set: + (name, path, _) = sig + + functions = symbol_map.get(name) + if functions is None: + sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND + continue + + if name not in symbol_path_map: + # Lazy symbol path resolving. Since the addr2line isn't fast, only + # resolve needed symbol paths. + group_map = collections.defaultdict(list) + for function in functions: + line_info = self.AddressToLine(function.address)[0] + if line_info is None: + continue + + (_, symbol_path, _) = line_info + + # Group the functions with the same symbol signature (symbol name + + # symbol path). Assume they are the same copies and do the same + # annotation operations of them because we don't know which copy is + # indicated by the users. + group_map[symbol_path].append(function) + + symbol_path_map[name] = group_map + + # Symbol matching. + function_group = None + group_map = symbol_path_map[name] + if len(group_map) > 0: + if path is None: + if len(group_map) > 1: + # There is ambiguity but the path isn't specified. + sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS + continue + + # No path signature but all symbol signatures of functions are same. + # Assume they are the same functions, so there is no ambiguity. + (function_group,) = group_map.values() + else: + function_group = group_map.get(path) + + if function_group is None: + sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND + continue + + # The function_group is a list of all the same functions (according to + # our assumption) which should be annotated together. + signature_map[sig] = function_group + + return (signature_map, sig_error_map) + + def LoadAnnotation(self): + """Load annotation rules. + + Returns: + Map of add rules, set of remove rules, set of text signatures which can't + be parsed. + """ + # Assume there is no ":" in the path. + # Example: "get_range.lto.2501[driver/accel_kionix.c:327]" + annotation_signature_regex = re.compile( + r"^(?P<name>[^\[]+)(\[(?P<path>[^:]+)(:(?P<linenum>\d+))?\])?$" + ) + + def NormalizeSignature(signature_text): + """Parse and normalize the annotation signature. + + Args: + signature_text: Text of the annotation signature. + + Returns: + (function name, path, line number) of the signature. The path and line + number can be None if not exist. None if failed to parse. + """ + result = annotation_signature_regex.match(signature_text.strip()) + if result is None: + return None + + name_result = self.FUNCTION_PREFIX_NAME_RE.match( + result.group("name").strip() + ) + if name_result is None: + return None + + path = result.group("path") + if path is not None: + path = os.path.realpath(path.strip()) + + linenum = result.group("linenum") + if linenum is not None: + linenum = int(linenum.strip()) + + return (name_result.group("name").strip(), path, linenum) + + def ExpandArray(dic): + """Parse and expand a symbol array + + Args: + dic: Dictionary for the array annotation + + Returns: + array of (symbol name, None, None). + """ + # TODO(drinkcat): This function is quite inefficient, as it goes through + # the symbol table multiple times. + + begin_name = dic["name"] + end_name = dic["name"] + "_end" + offset = dic["offset"] if "offset" in dic else 0 + stride = dic["stride"] + + begin_address = None + end_address = None + + for symbol in self.symbols: + if symbol.name == begin_name: + begin_address = symbol.address + if symbol.name == end_name: + end_address = symbol.address + + if not begin_address or not end_address: + return None + + output = [] + # TODO(drinkcat): This is inefficient as we go from address to symbol + # object then to symbol name, and later on we'll go back from symbol name + # to symbol object. + for addr in range(begin_address + offset, end_address, stride): + # TODO(drinkcat): Not all architectures need to drop the first bit. + val = self.rodata[(addr - self.rodata_offset) // 4] & 0xFFFFFFFE + name = None + for symbol in self.symbols: + if symbol.address == val: + result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) + name = result.group("name") + break + + if not name: + raise StackAnalyzerError( + "Cannot find function for address %s." % hex(val) + ) + + output.append((name, None, None)) + + return output + + add_rules = collections.defaultdict(set) + remove_rules = list() + invalid_sigtxts = set() + + if "add" in self.annotation and self.annotation["add"] is not None: + for src_sigtxt, dst_sigtxts in self.annotation["add"].items(): + src_sig = NormalizeSignature(src_sigtxt) + if src_sig is None: + invalid_sigtxts.add(src_sigtxt) + continue + + for dst_sigtxt in dst_sigtxts: + if isinstance(dst_sigtxt, dict): + dst_sig = ExpandArray(dst_sigtxt) + if dst_sig is None: + invalid_sigtxts.add(str(dst_sigtxt)) + else: + add_rules[src_sig].update(dst_sig) + else: + dst_sig = NormalizeSignature(dst_sigtxt) + if dst_sig is None: + invalid_sigtxts.add(dst_sigtxt) + else: + add_rules[src_sig].add(dst_sig) + + if ( + "remove" in self.annotation + and self.annotation["remove"] is not None + ): + for sigtxt_path in self.annotation["remove"]: + if isinstance(sigtxt_path, str): + # The path has only one vertex. + sigtxt_path = [sigtxt_path] + + if len(sigtxt_path) == 0: + continue + + # Generate multiple remove paths from all the combinations of the + # signatures of each vertex. + sig_paths = [[]] + broken_flag = False + for sigtxt_node in sigtxt_path: + if isinstance(sigtxt_node, str): + # The vertex has only one signature. + sigtxt_set = {sigtxt_node} + elif isinstance(sigtxt_node, list): + # The vertex has multiple signatures. + sigtxt_set = set(sigtxt_node) + else: + # Assume the format of annotation is verified. There should be no + # invalid case. + assert False + + sig_set = set() + for sigtxt in sigtxt_set: + sig = NormalizeSignature(sigtxt) + if sig is None: + invalid_sigtxts.add(sigtxt) + broken_flag = True + elif not broken_flag: + sig_set.add(sig) + + if broken_flag: + continue + + # Append each signature of the current node to the all previous + # remove paths. + sig_paths = [ + path + [sig] for path in sig_paths for sig in sig_set + ] + + if not broken_flag: + # All signatures are normalized. The remove path has no error. + remove_rules.extend(sig_paths) + + return (add_rules, remove_rules, invalid_sigtxts) + + def ResolveAnnotation(self, function_map): + """Resolve annotation. + + Args: + function_map: Function map. + + Returns: + Set of added call edges, list of remove paths, set of eliminated + callsite addresses, set of annotation signatures which can't be resolved. + """ + + def StringifySignature(signature): + """Stringify the tupled signature. + + Args: + signature: Tupled signature. + + Returns: + Signature string. + """ + (name, path, linenum) = signature + bracket_text = "" + if path is not None: + path = os.path.relpath(path) + if linenum is None: + bracket_text = "[{}]".format(path) + else: + bracket_text = "[{}:{}]".format(path, linenum) + + return name + bracket_text + + (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation() + + signature_set = set() + for src_sig, dst_sigs in add_rules.items(): + signature_set.add(src_sig) + signature_set.update(dst_sigs) + + for remove_sigs in remove_rules: + signature_set.update(remove_sigs) + + # Map signatures to functions. + (signature_map, sig_error_map) = self.MapAnnotation( + function_map, signature_set + ) + + # Build the indirect callsite map indexed by callsite signature. + indirect_map = collections.defaultdict(set) + for function in function_map.values(): + for callsite in function.callsites: + if callsite.target is not None: + continue + + # Found an indirect callsite. + line_info = self.AddressToLine(callsite.address)[0] + if line_info is None: + continue + + (name, path, linenum) = line_info + result = self.FUNCTION_PREFIX_NAME_RE.match(name) + if result is None: + continue + + indirect_map[(result.group("name").strip(), path, linenum)].add( + (function, callsite.address) + ) + + # Generate the annotation sets. + add_set = set() + remove_list = list() + eliminated_addrs = set() + + for src_sig, dst_sigs in add_rules.items(): + src_funcs = set(signature_map.get(src_sig, [])) + # Try to match the source signature to the indirect callsites. Even if it + # can't be found in disassembly. + indirect_calls = indirect_map.get(src_sig) + if indirect_calls is not None: + for function, callsite_address in indirect_calls: + # Add the caller of the indirect callsite to the source functions. + src_funcs.add(function) + # Assume each callsite can be represented by a unique address. + eliminated_addrs.add(callsite_address) + + if src_sig in sig_error_map: + # Assume the error is always the not found error. Since the signature + # found in indirect callsite map must be a full signature, it can't + # happen the ambiguous error. + assert ( + sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND + ) + # Found in inline stack, remove the not found error. + del sig_error_map[src_sig] + + for dst_sig in dst_sigs: + dst_funcs = signature_map.get(dst_sig) + if dst_funcs is None: + continue + + # Duplicate the call edge for all the same source and destination + # functions. + for src_func in src_funcs: + for dst_func in dst_funcs: + add_set.add((src_func, dst_func)) + + for remove_sigs in remove_rules: + # Since each signature can be mapped to multiple functions, generate + # multiple remove paths from all the combinations of these functions. + remove_paths = [[]] + skip_flag = False + for remove_sig in remove_sigs: + # Transform each signature to the corresponding functions. + remove_funcs = signature_map.get(remove_sig) + if remove_funcs is None: + # There is an unresolved signature in the remove path. Ignore the + # whole broken remove path. + skip_flag = True + break + else: + # Append each function of the current signature to the all previous + # remove paths. + remove_paths = [ + p + [f] for p in remove_paths for f in remove_funcs + ] + + if skip_flag: + # Ignore the broken remove path. + continue + + for remove_path in remove_paths: + # Deduplicate the remove paths. + if remove_path not in remove_list: + remove_list.append(remove_path) + + # Format the error messages. + failed_sigtxts = set() + for sigtxt in invalid_sigtxts: + failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID)) + + for sig, error in sig_error_map.items(): + failed_sigtxts.add((StringifySignature(sig), error)) + + return (add_set, remove_list, eliminated_addrs, failed_sigtxts) + + def PreprocessAnnotation( + self, function_map, add_set, remove_list, eliminated_addrs + ): + """Preprocess the annotation and callgraph. + + Add the missing call edges, and delete simple remove paths (the paths have + one or two vertices) from the function_map. + + Eliminate the annotated indirect callsites. + + Return the remaining remove list. + + Args: + function_map: Function map. + add_set: Set of missing call edges. + remove_list: List of remove paths. + eliminated_addrs: Set of eliminated callsite addresses. + + Returns: + List of remaining remove paths. + """ + + def CheckEdge(path): + """Check if all edges of the path are on the callgraph. + + Args: + path: Path. + + Returns: + True or False. + """ + for index in range(len(path) - 1): + if (path[index], path[index + 1]) not in edge_set: + return False + + return True + + for src_func, dst_func in add_set: + # TODO(cheyuw): Support tailing call annotation. + src_func.callsites.append( + Callsite(None, dst_func.address, False, dst_func) + ) + + # Delete simple remove paths. + remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2) + edge_set = set() + for function in function_map.values(): + cleaned_callsites = [] + for callsite in function.callsites: + if (callsite.callee,) in remove_simple or ( + function, + callsite.callee, + ) in remove_simple: + continue + + if ( + callsite.target is None + and callsite.address in eliminated_addrs + ): + continue + + cleaned_callsites.append(callsite) + if callsite.callee is not None: + edge_set.add((function, callsite.callee)) + + function.callsites = cleaned_callsites + + return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)] + + def AnalyzeCallGraph(self, function_map, remove_list): + """Analyze callgraph. + + It will update the max stack size and path for each function. + + Args: + function_map: Function map. + remove_list: List of remove paths. + + Returns: + List of function cycles. + """ + + def Traverse(curr_state): + """Traverse the callgraph and calculate the max stack usages of functions. + + Args: + curr_state: Current state. + + Returns: + SCC lowest link. + """ + scc_index = scc_index_counter[0] + scc_index_counter[0] += 1 + scc_index_map[curr_state] = scc_index + scc_lowlink = scc_index + scc_stack.append(curr_state) + # Push the current state in the stack. We can use a set to maintain this + # because the stacked states are unique; otherwise we will find a cycle + # first. + stacked_states.add(curr_state) + + (curr_address, curr_positions) = curr_state + curr_func = function_map[curr_address] + + invalid_flag = False + new_positions = list(curr_positions) + for index, position in enumerate(curr_positions): + remove_path = remove_list[index] + + # The position of each remove path in the state is the length of the + # longest matching path between the prefix of the remove path and the + # suffix of the current traversing path. We maintain this length when + # appending the next callee to the traversing path. And it can be used + # to check if the remove path appears in the traversing path. + + # TODO(cheyuw): Implement KMP algorithm to match remove paths + # efficiently. + if remove_path[position] is curr_func: + # Matches the current function, extend the length. + new_positions[index] = position + 1 + if new_positions[index] == len(remove_path): + # The length of the longest matching path is equal to the length of + # the remove path, which means the suffix of the current traversing + # path matches the remove path. + invalid_flag = True + break + + else: + # We can't get the new longest matching path by extending the previous + # one directly. Fallback to search the new longest matching path. + + # If we can't find any matching path in the following search, reset + # the matching length to 0. + new_positions[index] = 0 + + # We want to find the new longest matching prefix of remove path with + # the suffix of the current traversing path. Because the new longest + # matching path won't be longer than the prevous one now, and part of + # the suffix matches the prefix of remove path, we can get the needed + # suffix from the previous matching prefix of the invalid path. + suffix = remove_path[:position] + [curr_func] + for offset in range(1, len(suffix)): + length = position - offset + if remove_path[:length] == suffix[offset:]: + new_positions[index] = length + break + + new_positions = tuple(new_positions) + + # If the current suffix is invalid, set the max stack usage to 0. + max_stack_usage = 0 + max_callee_state = None + self_loop = False + + if not invalid_flag: + # Max stack usage is at least equal to the stack frame. + max_stack_usage = curr_func.stack_frame + for callsite in curr_func.callsites: + callee = callsite.callee + if callee is None: + continue + + callee_state = (callee.address, new_positions) + if callee_state not in scc_index_map: + # Unvisited state. + scc_lowlink = min(scc_lowlink, Traverse(callee_state)) + elif callee_state in stacked_states: + # The state is shown in the stack. There is a cycle. + sub_stack_usage = 0 + scc_lowlink = min( + scc_lowlink, scc_index_map[callee_state] + ) + if callee_state == curr_state: + self_loop = True + + done_result = done_states.get(callee_state) + if done_result is not None: + # Already done this state and use its result. If the state reaches a + # cycle, reusing the result will cause inaccuracy (the stack usage + # of cycle depends on where the entrance is). But it's fine since we + # can't get accurate stack usage under this situation, and we rely + # on user-provided annotations to break the cycle, after which the + # result will be accurate again. + (sub_stack_usage, _) = done_result + + if callsite.is_tail: + # For tailing call, since the callee reuses the stack frame of the + # caller, choose the larger one directly. + stack_usage = max( + curr_func.stack_frame, sub_stack_usage + ) + else: + stack_usage = ( + curr_func.stack_frame + sub_stack_usage + ) + + if stack_usage > max_stack_usage: + max_stack_usage = stack_usage + max_callee_state = callee_state + + if scc_lowlink == scc_index: + group = [] + while scc_stack[-1] != curr_state: + scc_state = scc_stack.pop() + stacked_states.remove(scc_state) + group.append(scc_state) + + scc_stack.pop() + stacked_states.remove(curr_state) + + # If the cycle is not empty, record it. + if len(group) > 0 or self_loop: + group.append(curr_state) + cycle_groups.append(group) + + # Store the done result. + done_states[curr_state] = (max_stack_usage, max_callee_state) + + if curr_positions == initial_positions: + # If the current state is initial state, we traversed the callgraph by + # using the current function as start point. Update the stack usage of + # the function. + # If the function matches a single vertex remove path, this will set its + # max stack usage to 0, which is not expected (we still calculate its + # max stack usage, but prevent any function from calling it). However, + # all the single vertex remove paths have been preprocessed and removed. + curr_func.stack_max_usage = max_stack_usage + + # Reconstruct the max stack path by traversing the state transitions. + max_stack_path = [curr_func] + callee_state = max_callee_state + while callee_state is not None: + # The first element of state tuple is function address. + max_stack_path.append(function_map[callee_state[0]]) + done_result = done_states.get(callee_state) + # All of the descendants should be done. + assert done_result is not None + (_, callee_state) = done_result + + curr_func.stack_max_path = max_stack_path + + return scc_lowlink + + # The state is the concatenation of the current function address and the + # state of matching position. + initial_positions = (0,) * len(remove_list) + done_states = {} + stacked_states = set() + scc_index_counter = [0] + scc_index_map = {} + scc_stack = [] + cycle_groups = [] + for function in function_map.values(): + if function.stack_max_usage is None: + Traverse((function.address, initial_positions)) + + cycle_functions = [] + for group in cycle_groups: + cycle = set(function_map[state[0]] for state in group) + if cycle not in cycle_functions: + cycle_functions.append(cycle) + + return cycle_functions + + def Analyze(self): + """Run the stack analysis. + + Raises: + StackAnalyzerError: If disassembly fails. + """ + + def OutputInlineStack(address, prefix=""): + """Output beautiful inline stack. + + Args: + address: Address. + prefix: Prefix of each line. + + Returns: + Key for sorting, output text + """ + line_infos = self.AddressToLine(address, True) + + if line_infos[0] is None: + order_key = (None, None) else: - add_rules[src_sig].add(dst_sig) - - if 'remove' in self.annotation and self.annotation['remove'] is not None: - for sigtxt_path in self.annotation['remove']: - if isinstance(sigtxt_path, str): - # The path has only one vertex. - sigtxt_path = [sigtxt_path] - - if len(sigtxt_path) == 0: - continue - - # Generate multiple remove paths from all the combinations of the - # signatures of each vertex. - sig_paths = [[]] - broken_flag = False - for sigtxt_node in sigtxt_path: - if isinstance(sigtxt_node, str): - # The vertex has only one signature. - sigtxt_set = {sigtxt_node} - elif isinstance(sigtxt_node, list): - # The vertex has multiple signatures. - sigtxt_set = set(sigtxt_node) - else: - # Assume the format of annotation is verified. There should be no - # invalid case. - assert False - - sig_set = set() - for sigtxt in sigtxt_set: - sig = NormalizeSignature(sigtxt) - if sig is None: - invalid_sigtxts.add(sigtxt) - broken_flag = True - elif not broken_flag: - sig_set.add(sig) - - if broken_flag: - continue - - # Append each signature of the current node to the all previous - # remove paths. - sig_paths = [path + [sig] for path in sig_paths for sig in sig_set] - - if not broken_flag: - # All signatures are normalized. The remove path has no error. - remove_rules.extend(sig_paths) - - return (add_rules, remove_rules, invalid_sigtxts) + (_, path, linenum) = line_infos[0] + order_key = (linenum, path) + + line_texts = [] + for line_info in reversed(line_infos): + if line_info is None: + (function_name, path, linenum) = ("??", "??", 0) + else: + (function_name, path, linenum) = line_info + + line_texts.append( + "{}[{}:{}]".format( + function_name, os.path.relpath(path), linenum + ) + ) + + output = "{}-> {} {:x}\n".format(prefix, line_texts[0], address) + for depth, line_text in enumerate(line_texts[1:]): + output += "{} {}- {}\n".format( + prefix, " " * depth, line_text + ) + + # Remove the last newline character. + return (order_key, output.rstrip("\n")) + + # Analyze disassembly. + try: + disasm_text = subprocess.check_output( + [self.options.objdump, "-d", self.options.elf_path], + encoding="utf-8", + ) + except subprocess.CalledProcessError: + raise StackAnalyzerError("objdump failed to disassemble.") + except OSError: + raise StackAnalyzerError("Failed to run objdump.") + + function_map = self.AnalyzeDisassembly(disasm_text) + result = self.ResolveAnnotation(function_map) + (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result + remove_list = self.PreprocessAnnotation( + function_map, add_set, remove_list, eliminated_addrs + ) + cycle_functions = self.AnalyzeCallGraph(function_map, remove_list) + + # Print the results of task-aware stack analysis. + extra_stack_frame = self.annotation.get( + "exception_frame_size", DEFAULT_EXCEPTION_FRAME_SIZE + ) + for task in self.tasklist: + routine_func = function_map[task.routine_address] + print( + "Task: {}, Max size: {} ({} + {}), Allocated size: {}".format( + task.name, + routine_func.stack_max_usage + extra_stack_frame, + routine_func.stack_max_usage, + extra_stack_frame, + task.stack_max_size, + ) + ) + + print("Call Trace:") + max_stack_path = routine_func.stack_max_path + # Assume the routine function is resolved. + assert max_stack_path is not None + for depth, curr_func in enumerate(max_stack_path): + line_info = self.AddressToLine(curr_func.address)[0] + if line_info is None: + (path, linenum) = ("??", 0) + else: + (_, path, linenum) = line_info + + print( + " {} ({}) [{}:{}] {:x}".format( + curr_func.name, + curr_func.stack_frame, + os.path.relpath(path), + linenum, + curr_func.address, + ) + ) + + if depth + 1 < len(max_stack_path): + succ_func = max_stack_path[depth + 1] + text_list = [] + for callsite in curr_func.callsites: + if callsite.callee is succ_func: + indent_prefix = " " + if callsite.address is None: + order_text = ( + None, + "{}-> [annotation]".format(indent_prefix), + ) + else: + order_text = OutputInlineStack( + callsite.address, indent_prefix + ) + + text_list.append(order_text) + + for _, text in sorted(text_list, key=lambda item: item[0]): + print(text) + + print("Unresolved indirect callsites:") + for function in function_map.values(): + indirect_callsites = [] + for callsite in function.callsites: + if callsite.target is None: + indirect_callsites.append(callsite.address) + + if len(indirect_callsites) > 0: + print(" In function {}:".format(function.name)) + text_list = [] + for address in indirect_callsites: + text_list.append(OutputInlineStack(address, " ")) + + for _, text in sorted(text_list, key=lambda item: item[0]): + print(text) + + print("Unresolved annotation signatures:") + for sigtxt, error in failed_sigtxts: + print(" {}: {}".format(sigtxt, error)) + + if len(cycle_functions) > 0: + print("There are cycles in the following function sets:") + for functions in cycle_functions: + print( + "[{}]".format( + ", ".join(function.name for function in functions) + ) + ) - def ResolveAnnotation(self, function_map): - """Resolve annotation. - Args: - function_map: Function map. +def ParseArgs(): + """Parse commandline arguments. Returns: - Set of added call edges, list of remove paths, set of eliminated - callsite addresses, set of annotation signatures which can't be resolved. + options: Namespace from argparse.parse_args(). """ - def StringifySignature(signature): - """Stringify the tupled signature. - - Args: - signature: Tupled signature. - - Returns: - Signature string. - """ - (name, path, linenum) = signature - bracket_text = '' - if path is not None: - path = os.path.relpath(path) - if linenum is None: - bracket_text = '[{}]'.format(path) - else: - bracket_text = '[{}:{}]'.format(path, linenum) - - return name + bracket_text - - (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation() - - signature_set = set() - for src_sig, dst_sigs in add_rules.items(): - signature_set.add(src_sig) - signature_set.update(dst_sigs) - - for remove_sigs in remove_rules: - signature_set.update(remove_sigs) - - # Map signatures to functions. - (signature_map, sig_error_map) = self.MapAnnotation(function_map, - signature_set) - - # Build the indirect callsite map indexed by callsite signature. - indirect_map = collections.defaultdict(set) - for function in function_map.values(): - for callsite in function.callsites: - if callsite.target is not None: - continue - - # Found an indirect callsite. - line_info = self.AddressToLine(callsite.address)[0] - if line_info is None: - continue - - (name, path, linenum) = line_info - result = self.FUNCTION_PREFIX_NAME_RE.match(name) - if result is None: - continue - - indirect_map[(result.group('name').strip(), path, linenum)].add( - (function, callsite.address)) - - # Generate the annotation sets. - add_set = set() - remove_list = list() - eliminated_addrs = set() - - for src_sig, dst_sigs in add_rules.items(): - src_funcs = set(signature_map.get(src_sig, [])) - # Try to match the source signature to the indirect callsites. Even if it - # can't be found in disassembly. - indirect_calls = indirect_map.get(src_sig) - if indirect_calls is not None: - for function, callsite_address in indirect_calls: - # Add the caller of the indirect callsite to the source functions. - src_funcs.add(function) - # Assume each callsite can be represented by a unique address. - eliminated_addrs.add(callsite_address) - - if src_sig in sig_error_map: - # Assume the error is always the not found error. Since the signature - # found in indirect callsite map must be a full signature, it can't - # happen the ambiguous error. - assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND - # Found in inline stack, remove the not found error. - del sig_error_map[src_sig] - - for dst_sig in dst_sigs: - dst_funcs = signature_map.get(dst_sig) - if dst_funcs is None: - continue - - # Duplicate the call edge for all the same source and destination - # functions. - for src_func in src_funcs: - for dst_func in dst_funcs: - add_set.add((src_func, dst_func)) - - for remove_sigs in remove_rules: - # Since each signature can be mapped to multiple functions, generate - # multiple remove paths from all the combinations of these functions. - remove_paths = [[]] - skip_flag = False - for remove_sig in remove_sigs: - # Transform each signature to the corresponding functions. - remove_funcs = signature_map.get(remove_sig) - if remove_funcs is None: - # There is an unresolved signature in the remove path. Ignore the - # whole broken remove path. - skip_flag = True - break - else: - # Append each function of the current signature to the all previous - # remove paths. - remove_paths = [p + [f] for p in remove_paths for f in remove_funcs] - - if skip_flag: - # Ignore the broken remove path. - continue - - for remove_path in remove_paths: - # Deduplicate the remove paths. - if remove_path not in remove_list: - remove_list.append(remove_path) - - # Format the error messages. - failed_sigtxts = set() - for sigtxt in invalid_sigtxts: - failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID)) - - for sig, error in sig_error_map.items(): - failed_sigtxts.add((StringifySignature(sig), error)) - - return (add_set, remove_list, eliminated_addrs, failed_sigtxts) - - def PreprocessAnnotation(self, function_map, add_set, remove_list, - eliminated_addrs): - """Preprocess the annotation and callgraph. + parser = argparse.ArgumentParser(description="EC firmware stack analyzer.") + parser.add_argument("elf_path", help="the path of EC firmware ELF") + parser.add_argument( + "--export_taskinfo", + required=True, + help="the path of export_taskinfo.so utility", + ) + parser.add_argument( + "--section", + required=True, + help="the section.", + choices=[SECTION_RO, SECTION_RW], + ) + parser.add_argument( + "--objdump", default="objdump", help="the path of objdump" + ) + parser.add_argument( + "--addr2line", default="addr2line", help="the path of addr2line" + ) + parser.add_argument( + "--annotation", default=None, help="the path of annotation file" + ) + + # TODO(cheyuw): Add an option for dumping stack usage of all functions. + + return parser.parse_args() - Add the missing call edges, and delete simple remove paths (the paths have - one or two vertices) from the function_map. - Eliminate the annotated indirect callsites. - - Return the remaining remove list. +def ParseSymbolText(symbol_text): + """Parse the content of the symbol text. Args: - function_map: Function map. - add_set: Set of missing call edges. - remove_list: List of remove paths. - eliminated_addrs: Set of eliminated callsite addresses. + symbol_text: Text of the symbols. Returns: - List of remaining remove paths. + symbols: Symbol list. """ - def CheckEdge(path): - """Check if all edges of the path are on the callgraph. - - Args: - path: Path. - - Returns: - True or False. - """ - for index in range(len(path) - 1): - if (path[index], path[index + 1]) not in edge_set: - return False - - return True - - for src_func, dst_func in add_set: - # TODO(cheyuw): Support tailing call annotation. - src_func.callsites.append( - Callsite(None, dst_func.address, False, dst_func)) - - # Delete simple remove paths. - remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2) - edge_set = set() - for function in function_map.values(): - cleaned_callsites = [] - for callsite in function.callsites: - if ((callsite.callee,) in remove_simple or - (function, callsite.callee) in remove_simple): - continue - - if callsite.target is None and callsite.address in eliminated_addrs: - continue - - cleaned_callsites.append(callsite) - if callsite.callee is not None: - edge_set.add((function, callsite.callee)) + # Example: "10093064 g F .text 0000015c .hidden hook_task" + symbol_regex = re.compile( + r"^(?P<address>[0-9A-Fa-f]+)\s+[lwg]\s+" + r"((?P<type>[OF])\s+)?\S+\s+" + r"(?P<size>[0-9A-Fa-f]+)\s+" + r"(\S+\s+)?(?P<name>\S+)$" + ) + + symbols = [] + for line in symbol_text.splitlines(): + line = line.strip() + result = symbol_regex.match(line) + if result is not None: + address = int(result.group("address"), 16) + symtype = result.group("type") + if symtype is None: + symtype = "O" - function.callsites = cleaned_callsites + size = int(result.group("size"), 16) + name = result.group("name") + symbols.append(Symbol(address, symtype, size, name)) - return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)] + return symbols - def AnalyzeCallGraph(self, function_map, remove_list): - """Analyze callgraph. - It will update the max stack size and path for each function. +def ParseRoDataText(rodata_text): + """Parse the content of rodata Args: - function_map: Function map. - remove_list: List of remove paths. + symbol_text: Text of the rodata dump. Returns: - List of function cycles. + symbols: Symbol list. """ - def Traverse(curr_state): - """Traverse the callgraph and calculate the max stack usages of functions. - - Args: - curr_state: Current state. - - Returns: - SCC lowest link. - """ - scc_index = scc_index_counter[0] - scc_index_counter[0] += 1 - scc_index_map[curr_state] = scc_index - scc_lowlink = scc_index - scc_stack.append(curr_state) - # Push the current state in the stack. We can use a set to maintain this - # because the stacked states are unique; otherwise we will find a cycle - # first. - stacked_states.add(curr_state) - - (curr_address, curr_positions) = curr_state - curr_func = function_map[curr_address] - - invalid_flag = False - new_positions = list(curr_positions) - for index, position in enumerate(curr_positions): - remove_path = remove_list[index] - - # The position of each remove path in the state is the length of the - # longest matching path between the prefix of the remove path and the - # suffix of the current traversing path. We maintain this length when - # appending the next callee to the traversing path. And it can be used - # to check if the remove path appears in the traversing path. - - # TODO(cheyuw): Implement KMP algorithm to match remove paths - # efficiently. - if remove_path[position] is curr_func: - # Matches the current function, extend the length. - new_positions[index] = position + 1 - if new_positions[index] == len(remove_path): - # The length of the longest matching path is equal to the length of - # the remove path, which means the suffix of the current traversing - # path matches the remove path. - invalid_flag = True - break - - else: - # We can't get the new longest matching path by extending the previous - # one directly. Fallback to search the new longest matching path. - - # If we can't find any matching path in the following search, reset - # the matching length to 0. - new_positions[index] = 0 - - # We want to find the new longest matching prefix of remove path with - # the suffix of the current traversing path. Because the new longest - # matching path won't be longer than the prevous one now, and part of - # the suffix matches the prefix of remove path, we can get the needed - # suffix from the previous matching prefix of the invalid path. - suffix = remove_path[:position] + [curr_func] - for offset in range(1, len(suffix)): - length = position - offset - if remove_path[:length] == suffix[offset:]: - new_positions[index] = length - break - - new_positions = tuple(new_positions) - - # If the current suffix is invalid, set the max stack usage to 0. - max_stack_usage = 0 - max_callee_state = None - self_loop = False - - if not invalid_flag: - # Max stack usage is at least equal to the stack frame. - max_stack_usage = curr_func.stack_frame - for callsite in curr_func.callsites: - callee = callsite.callee - if callee is None: + # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K... + # 100a7294 00000000 00000000 01000000 ............ + + base_offset = None + offset = None + rodata = [] + for line in rodata_text.splitlines(): + line = line.strip() + space = line.find(" ") + if space < 0: + continue + try: + address = int(line[0:space], 16) + except ValueError: continue - callee_state = (callee.address, new_positions) - if callee_state not in scc_index_map: - # Unvisited state. - scc_lowlink = min(scc_lowlink, Traverse(callee_state)) - elif callee_state in stacked_states: - # The state is shown in the stack. There is a cycle. - sub_stack_usage = 0 - scc_lowlink = min(scc_lowlink, scc_index_map[callee_state]) - if callee_state == curr_state: - self_loop = True - - done_result = done_states.get(callee_state) - if done_result is not None: - # Already done this state and use its result. If the state reaches a - # cycle, reusing the result will cause inaccuracy (the stack usage - # of cycle depends on where the entrance is). But it's fine since we - # can't get accurate stack usage under this situation, and we rely - # on user-provided annotations to break the cycle, after which the - # result will be accurate again. - (sub_stack_usage, _) = done_result - - if callsite.is_tail: - # For tailing call, since the callee reuses the stack frame of the - # caller, choose the larger one directly. - stack_usage = max(curr_func.stack_frame, sub_stack_usage) - else: - stack_usage = curr_func.stack_frame + sub_stack_usage - - if stack_usage > max_stack_usage: - max_stack_usage = stack_usage - max_callee_state = callee_state - - if scc_lowlink == scc_index: - group = [] - while scc_stack[-1] != curr_state: - scc_state = scc_stack.pop() - stacked_states.remove(scc_state) - group.append(scc_state) - - scc_stack.pop() - stacked_states.remove(curr_state) - - # If the cycle is not empty, record it. - if len(group) > 0 or self_loop: - group.append(curr_state) - cycle_groups.append(group) - - # Store the done result. - done_states[curr_state] = (max_stack_usage, max_callee_state) - - if curr_positions == initial_positions: - # If the current state is initial state, we traversed the callgraph by - # using the current function as start point. Update the stack usage of - # the function. - # If the function matches a single vertex remove path, this will set its - # max stack usage to 0, which is not expected (we still calculate its - # max stack usage, but prevent any function from calling it). However, - # all the single vertex remove paths have been preprocessed and removed. - curr_func.stack_max_usage = max_stack_usage - - # Reconstruct the max stack path by traversing the state transitions. - max_stack_path = [curr_func] - callee_state = max_callee_state - while callee_state is not None: - # The first element of state tuple is function address. - max_stack_path.append(function_map[callee_state[0]]) - done_result = done_states.get(callee_state) - # All of the descendants should be done. - assert done_result is not None - (_, callee_state) = done_result - - curr_func.stack_max_path = max_stack_path - - return scc_lowlink - - # The state is the concatenation of the current function address and the - # state of matching position. - initial_positions = (0,) * len(remove_list) - done_states = {} - stacked_states = set() - scc_index_counter = [0] - scc_index_map = {} - scc_stack = [] - cycle_groups = [] - for function in function_map.values(): - if function.stack_max_usage is None: - Traverse((function.address, initial_positions)) - - cycle_functions = [] - for group in cycle_groups: - cycle = set(function_map[state[0]] for state in group) - if cycle not in cycle_functions: - cycle_functions.append(cycle) - - return cycle_functions - - def Analyze(self): - """Run the stack analysis. - - Raises: - StackAnalyzerError: If disassembly fails. - """ - def OutputInlineStack(address, prefix=''): - """Output beautiful inline stack. - - Args: - address: Address. - prefix: Prefix of each line. - - Returns: - Key for sorting, output text - """ - line_infos = self.AddressToLine(address, True) - - if line_infos[0] is None: - order_key = (None, None) - else: - (_, path, linenum) = line_infos[0] - order_key = (linenum, path) - - line_texts = [] - for line_info in reversed(line_infos): - if line_info is None: - (function_name, path, linenum) = ('??', '??', 0) - else: - (function_name, path, linenum) = line_info - - line_texts.append('{}[{}:{}]'.format(function_name, - os.path.relpath(path), - linenum)) - - output = '{}-> {} {:x}\n'.format(prefix, line_texts[0], address) - for depth, line_text in enumerate(line_texts[1:]): - output += '{} {}- {}\n'.format(prefix, ' ' * depth, line_text) - - # Remove the last newline character. - return (order_key, output.rstrip('\n')) - - # Analyze disassembly. - try: - disasm_text = subprocess.check_output([self.options.objdump, - '-d', - self.options.elf_path], - encoding='utf-8') - except subprocess.CalledProcessError: - raise StackAnalyzerError('objdump failed to disassemble.') - except OSError: - raise StackAnalyzerError('Failed to run objdump.') - - function_map = self.AnalyzeDisassembly(disasm_text) - result = self.ResolveAnnotation(function_map) - (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result - remove_list = self.PreprocessAnnotation(function_map, - add_set, - remove_list, - eliminated_addrs) - cycle_functions = self.AnalyzeCallGraph(function_map, remove_list) - - # Print the results of task-aware stack analysis. - extra_stack_frame = self.annotation.get('exception_frame_size', - DEFAULT_EXCEPTION_FRAME_SIZE) - for task in self.tasklist: - routine_func = function_map[task.routine_address] - print('Task: {}, Max size: {} ({} + {}), Allocated size: {}'.format( - task.name, - routine_func.stack_max_usage + extra_stack_frame, - routine_func.stack_max_usage, - extra_stack_frame, - task.stack_max_size)) - - print('Call Trace:') - max_stack_path = routine_func.stack_max_path - # Assume the routine function is resolved. - assert max_stack_path is not None - for depth, curr_func in enumerate(max_stack_path): - line_info = self.AddressToLine(curr_func.address)[0] - if line_info is None: - (path, linenum) = ('??', 0) - else: - (_, path, linenum) = line_info - - print(' {} ({}) [{}:{}] {:x}'.format(curr_func.name, - curr_func.stack_frame, - os.path.relpath(path), - linenum, - curr_func.address)) - - if depth + 1 < len(max_stack_path): - succ_func = max_stack_path[depth + 1] - text_list = [] - for callsite in curr_func.callsites: - if callsite.callee is succ_func: - indent_prefix = ' ' - if callsite.address is None: - order_text = (None, '{}-> [annotation]'.format(indent_prefix)) - else: - order_text = OutputInlineStack(callsite.address, indent_prefix) - - text_list.append(order_text) - - for _, text in sorted(text_list, key=lambda item: item[0]): - print(text) - - print('Unresolved indirect callsites:') - for function in function_map.values(): - indirect_callsites = [] - for callsite in function.callsites: - if callsite.target is None: - indirect_callsites.append(callsite.address) - - if len(indirect_callsites) > 0: - print(' In function {}:'.format(function.name)) - text_list = [] - for address in indirect_callsites: - text_list.append(OutputInlineStack(address, ' ')) - - for _, text in sorted(text_list, key=lambda item: item[0]): - print(text) - - print('Unresolved annotation signatures:') - for sigtxt, error in failed_sigtxts: - print(' {}: {}'.format(sigtxt, error)) - - if len(cycle_functions) > 0: - print('There are cycles in the following function sets:') - for functions in cycle_functions: - print('[{}]'.format(', '.join(function.name for function in functions))) - - -def ParseArgs(): - """Parse commandline arguments. - - Returns: - options: Namespace from argparse.parse_args(). - """ - parser = argparse.ArgumentParser(description="EC firmware stack analyzer.") - parser.add_argument('elf_path', help="the path of EC firmware ELF") - parser.add_argument('--export_taskinfo', required=True, - help="the path of export_taskinfo.so utility") - parser.add_argument('--section', required=True, help='the section.', - choices=[SECTION_RO, SECTION_RW]) - parser.add_argument('--objdump', default='objdump', - help='the path of objdump') - parser.add_argument('--addr2line', default='addr2line', - help='the path of addr2line') - parser.add_argument('--annotation', default=None, - help='the path of annotation file') - - # TODO(cheyuw): Add an option for dumping stack usage of all functions. - - return parser.parse_args() - - -def ParseSymbolText(symbol_text): - """Parse the content of the symbol text. - - Args: - symbol_text: Text of the symbols. - - Returns: - symbols: Symbol list. - """ - # Example: "10093064 g F .text 0000015c .hidden hook_task" - symbol_regex = re.compile(r'^(?P<address>[0-9A-Fa-f]+)\s+[lwg]\s+' - r'((?P<type>[OF])\s+)?\S+\s+' - r'(?P<size>[0-9A-Fa-f]+)\s+' - r'(\S+\s+)?(?P<name>\S+)$') - - symbols = [] - for line in symbol_text.splitlines(): - line = line.strip() - result = symbol_regex.match(line) - if result is not None: - address = int(result.group('address'), 16) - symtype = result.group('type') - if symtype is None: - symtype = 'O' - - size = int(result.group('size'), 16) - name = result.group('name') - symbols.append(Symbol(address, symtype, size, name)) - - return symbols - - -def ParseRoDataText(rodata_text): - """Parse the content of rodata - - Args: - symbol_text: Text of the rodata dump. - - Returns: - symbols: Symbol list. - """ - # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K... - # 100a7294 00000000 00000000 01000000 ............ - - base_offset = None - offset = None - rodata = [] - for line in rodata_text.splitlines(): - line = line.strip() - space = line.find(' ') - if space < 0: - continue - try: - address = int(line[0:space], 16) - except ValueError: - continue - - if not base_offset: - base_offset = address - offset = address - elif address != offset: - raise StackAnalyzerError('objdump of rodata not contiguous.') + if not base_offset: + base_offset = address + offset = address + elif address != offset: + raise StackAnalyzerError("objdump of rodata not contiguous.") - for i in range(0, 4): - num = line[(space + 1 + i*9):(space + 9 + i*9)] - if len(num.strip()) > 0: - val = int(num, 16) - else: - val = 0 - # TODO(drinkcat): Not all platforms are necessarily big-endian - rodata.append((val & 0x000000ff) << 24 | - (val & 0x0000ff00) << 8 | - (val & 0x00ff0000) >> 8 | - (val & 0xff000000) >> 24) + for i in range(0, 4): + num = line[(space + 1 + i * 9) : (space + 9 + i * 9)] + if len(num.strip()) > 0: + val = int(num, 16) + else: + val = 0 + # TODO(drinkcat): Not all platforms are necessarily big-endian + rodata.append( + (val & 0x000000FF) << 24 + | (val & 0x0000FF00) << 8 + | (val & 0x00FF0000) >> 8 + | (val & 0xFF000000) >> 24 + ) - offset = offset + 4*4 + offset = offset + 4 * 4 - return (base_offset, rodata) + return (base_offset, rodata) def LoadTasklist(section, export_taskinfo, symbols): - """Load the task information. + """Load the task information. - Args: - section: Section (RO | RW). - export_taskinfo: Handle of export_taskinfo.so. - symbols: Symbol list. + Args: + section: Section (RO | RW). + export_taskinfo: Handle of export_taskinfo.so. + symbols: Symbol list. - Returns: - tasklist: Task list. - """ + Returns: + tasklist: Task list. + """ - TaskInfoPointer = ctypes.POINTER(TaskInfo) - taskinfos = TaskInfoPointer() - if section == SECTION_RO: - get_taskinfos_func = export_taskinfo.get_ro_taskinfos - else: - get_taskinfos_func = export_taskinfo.get_rw_taskinfos + TaskInfoPointer = ctypes.POINTER(TaskInfo) + taskinfos = TaskInfoPointer() + if section == SECTION_RO: + get_taskinfos_func = export_taskinfo.get_ro_taskinfos + else: + get_taskinfos_func = export_taskinfo.get_rw_taskinfos - taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos)) + taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos)) - tasklist = [] - for index in range(taskinfo_num): - taskinfo = taskinfos[index] - tasklist.append(Task(taskinfo.name.decode('utf-8'), - taskinfo.routine.decode('utf-8'), - taskinfo.stack_size)) + tasklist = [] + for index in range(taskinfo_num): + taskinfo = taskinfos[index] + tasklist.append( + Task( + taskinfo.name.decode("utf-8"), + taskinfo.routine.decode("utf-8"), + taskinfo.stack_size, + ) + ) - # Resolve routine address for each task. It's more efficient to resolve all - # routine addresses of tasks together. - routine_map = dict((task.routine_name, None) for task in tasklist) + # Resolve routine address for each task. It's more efficient to resolve all + # routine addresses of tasks together. + routine_map = dict((task.routine_name, None) for task in tasklist) - for symbol in symbols: - # Resolve task routine address. - if symbol.name in routine_map: - # Assume the symbol of routine is unique. - assert routine_map[symbol.name] is None - routine_map[symbol.name] = symbol.address + for symbol in symbols: + # Resolve task routine address. + if symbol.name in routine_map: + # Assume the symbol of routine is unique. + assert routine_map[symbol.name] is None + routine_map[symbol.name] = symbol.address - for task in tasklist: - address = routine_map[task.routine_name] - # Assume we have resolved all routine addresses. - assert address is not None - task.routine_address = address + for task in tasklist: + address = routine_map[task.routine_name] + # Assume we have resolved all routine addresses. + assert address is not None + task.routine_address = address - return tasklist + return tasklist def main(): - """Main function.""" - try: - options = ParseArgs() - - # Load annotation config. - if options.annotation is None: - annotation = {} - elif not os.path.exists(options.annotation): - print('Warning: Annotation file {} does not exist.' - .format(options.annotation)) - annotation = {} - else: - try: - with open(options.annotation, 'r') as annotation_file: - annotation = yaml.safe_load(annotation_file) - - except yaml.YAMLError: - raise StackAnalyzerError('Failed to parse annotation file {}.' - .format(options.annotation)) - except IOError: - raise StackAnalyzerError('Failed to open annotation file {}.' - .format(options.annotation)) - - # TODO(cheyuw): Do complete annotation format verification. - if not isinstance(annotation, dict): - raise StackAnalyzerError('Invalid annotation file {}.' - .format(options.annotation)) - - # Generate and parse the symbols. + """Main function.""" try: - symbol_text = subprocess.check_output([options.objdump, - '-t', - options.elf_path], - encoding='utf-8') - rodata_text = subprocess.check_output([options.objdump, - '-s', - '-j', '.rodata', - options.elf_path], - encoding='utf-8') - except subprocess.CalledProcessError: - raise StackAnalyzerError('objdump failed to dump symbol table or rodata.') - except OSError: - raise StackAnalyzerError('Failed to run objdump.') - - symbols = ParseSymbolText(symbol_text) - rodata = ParseRoDataText(rodata_text) - - # Load the tasklist. - try: - export_taskinfo = ctypes.CDLL(options.export_taskinfo) - except OSError: - raise StackAnalyzerError('Failed to load export_taskinfo.') - - tasklist = LoadTasklist(options.section, export_taskinfo, symbols) - - analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation) - analyzer.Analyze() - except StackAnalyzerError as e: - print('Error: {}'.format(e)) - - -if __name__ == '__main__': - main() + options = ParseArgs() + + # Load annotation config. + if options.annotation is None: + annotation = {} + elif not os.path.exists(options.annotation): + print( + "Warning: Annotation file {} does not exist.".format( + options.annotation + ) + ) + annotation = {} + else: + try: + with open(options.annotation, "r") as annotation_file: + annotation = yaml.safe_load(annotation_file) + + except yaml.YAMLError: + raise StackAnalyzerError( + "Failed to parse annotation file {}.".format( + options.annotation + ) + ) + except IOError: + raise StackAnalyzerError( + "Failed to open annotation file {}.".format( + options.annotation + ) + ) + + # TODO(cheyuw): Do complete annotation format verification. + if not isinstance(annotation, dict): + raise StackAnalyzerError( + "Invalid annotation file {}.".format(options.annotation) + ) + + # Generate and parse the symbols. + try: + symbol_text = subprocess.check_output( + [options.objdump, "-t", options.elf_path], encoding="utf-8" + ) + rodata_text = subprocess.check_output( + [options.objdump, "-s", "-j", ".rodata", options.elf_path], + encoding="utf-8", + ) + except subprocess.CalledProcessError: + raise StackAnalyzerError( + "objdump failed to dump symbol table or rodata." + ) + except OSError: + raise StackAnalyzerError("Failed to run objdump.") + + symbols = ParseSymbolText(symbol_text) + rodata = ParseRoDataText(rodata_text) + + # Load the tasklist. + try: + export_taskinfo = ctypes.CDLL(options.export_taskinfo) + except OSError: + raise StackAnalyzerError("Failed to load export_taskinfo.") + + tasklist = LoadTasklist(options.section, export_taskinfo, symbols) + + analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation) + analyzer.Analyze() + except StackAnalyzerError as e: + print("Error: {}".format(e)) + + +if __name__ == "__main__": + main() |