diff options
author | Jeremy Bettis <jbettis@google.com> | 2022-07-08 10:58:19 -0600 |
---|---|---|
committer | Chromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-07-12 19:13:33 +0000 |
commit | 7540e7b47b55447475bb8191fb3520dd67cf7998 (patch) | |
tree | 13309dbcf1db48e60fa2c2e5aed79f63bce00b5e /extra/stack_analyzer | |
parent | 7c114b8e1a3bb29991da70b9de394ac5d4f6c909 (diff) | |
download | chrome-ec-7540e7b47b55447475bb8191fb3520dd67cf7998.tar.gz |
ec: Format all python files with black and isort
find . \( -path ./private -prune \) -o -name '*.py' -print | xargs black
find . \( -path ./private -prune \) -o -name '*.py' -print | xargs ~/chromiumos/chromite/scripts/isort --settings-file=.isort.cfg
BRANCH=None
BUG=b:238434058
TEST=None
Signed-off-by: Jeremy Bettis <jbettis@google.com>
Change-Id: I63462d6f15d1eaf3db84eb20d1404ee976be8382
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3749242
Commit-Queue: Jeremy Bettis <jbettis@chromium.org>
Reviewed-by: Tom Hughes <tomhughes@chromium.org>
Tested-by: Jeremy Bettis <jbettis@chromium.org>
Commit-Queue: Jack Rosenthal <jrosenth@chromium.org>
Auto-Submit: Jeremy Bettis <jbettis@chromium.org>
Reviewed-by: Jack Rosenthal <jrosenth@chromium.org>
Diffstat (limited to 'extra/stack_analyzer')
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer.py | 3562 | ||||
-rwxr-xr-x | extra/stack_analyzer/stack_analyzer_unittest.py | 1708 |
2 files changed, 2759 insertions, 2511 deletions
diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py index 77d16d5450..17b2651972 100755 --- a/extra/stack_analyzer/stack_analyzer.py +++ b/extra/stack_analyzer/stack_analyzer.py @@ -25,1848 +25,1992 @@ import ctypes import os import re import subprocess -import yaml +import yaml -SECTION_RO = 'RO' -SECTION_RW = 'RW' +SECTION_RO = "RO" +SECTION_RW = "RW" # Default size of extra stack frame needed by exception context switch. # This value is for cortex-m with FPU enabled. DEFAULT_EXCEPTION_FRAME_SIZE = 224 class StackAnalyzerError(Exception): - """Exception class for stack analyzer utility.""" + """Exception class for stack analyzer utility.""" class TaskInfo(ctypes.Structure): - """Taskinfo ctypes structure. + """Taskinfo ctypes structure. - The structure definition is corresponding to the "struct taskinfo" - in "util/export_taskinfo.so.c". - """ - _fields_ = [('name', ctypes.c_char_p), - ('routine', ctypes.c_char_p), - ('stack_size', ctypes.c_uint32)] + The structure definition is corresponding to the "struct taskinfo" + in "util/export_taskinfo.so.c". + """ + _fields_ = [ + ("name", ctypes.c_char_p), + ("routine", ctypes.c_char_p), + ("stack_size", ctypes.c_uint32), + ] -class Task(object): - """Task information. - Attributes: - name: Task name. - routine_name: Routine function name. - stack_max_size: Max stack size. - routine_address: Resolved routine address. None if it hasn't been resolved. - """ - - def __init__(self, name, routine_name, stack_max_size, routine_address=None): - """Constructor. +class Task(object): + """Task information. - Args: + Attributes: name: Task name. routine_name: Routine function name. stack_max_size: Max stack size. - routine_address: Resolved routine address. + routine_address: Resolved routine address. None if it hasn't been resolved. """ - self.name = name - self.routine_name = routine_name - self.stack_max_size = stack_max_size - self.routine_address = routine_address - def __eq__(self, other): - """Task equality. + def __init__(self, name, routine_name, stack_max_size, routine_address=None): + """Constructor. - Args: - other: The compared object. + Args: + name: Task name. + routine_name: Routine function name. + stack_max_size: Max stack size. + routine_address: Resolved routine address. + """ + self.name = name + self.routine_name = routine_name + self.stack_max_size = stack_max_size + self.routine_address = routine_address - Returns: - True if equal, False if not. - """ - if not isinstance(other, Task): - return False + def __eq__(self, other): + """Task equality. - return (self.name == other.name and - self.routine_name == other.routine_name and - self.stack_max_size == other.stack_max_size and - self.routine_address == other.routine_address) + Args: + other: The compared object. + Returns: + True if equal, False if not. + """ + if not isinstance(other, Task): + return False -class Symbol(object): - """Symbol information. + return ( + self.name == other.name + and self.routine_name == other.routine_name + and self.stack_max_size == other.stack_max_size + and self.routine_address == other.routine_address + ) - Attributes: - address: Symbol address. - symtype: Symbol type, 'O' (data, object) or 'F' (function). - size: Symbol size. - name: Symbol name. - """ - def __init__(self, address, symtype, size, name): - """Constructor. +class Symbol(object): + """Symbol information. - Args: + Attributes: address: Symbol address. - symtype: Symbol type. + symtype: Symbol type, 'O' (data, object) or 'F' (function). size: Symbol size. name: Symbol name. """ - assert symtype in ['O', 'F'] - self.address = address - self.symtype = symtype - self.size = size - self.name = name - def __eq__(self, other): - """Symbol equality. - - Args: - other: The compared object. - - Returns: - True if equal, False if not. - """ - if not isinstance(other, Symbol): - return False - - return (self.address == other.address and - self.symtype == other.symtype and - self.size == other.size and - self.name == other.name) + def __init__(self, address, symtype, size, name): + """Constructor. + + Args: + address: Symbol address. + symtype: Symbol type. + size: Symbol size. + name: Symbol name. + """ + assert symtype in ["O", "F"] + self.address = address + self.symtype = symtype + self.size = size + self.name = name + + def __eq__(self, other): + """Symbol equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Symbol): + return False + + return ( + self.address == other.address + and self.symtype == other.symtype + and self.size == other.size + and self.name == other.name + ) class Callsite(object): - """Function callsite. - - Attributes: - address: Address of callsite location. None if it is unknown. - target: Callee address. None if it is unknown. - is_tail: A bool indicates that it is a tailing call. - callee: Resolved callee function. None if it hasn't been resolved. - """ - - def __init__(self, address, target, is_tail, callee=None): - """Constructor. + """Function callsite. - Args: + Attributes: address: Address of callsite location. None if it is unknown. target: Callee address. None if it is unknown. - is_tail: A bool indicates that it is a tailing call. (function jump to - another function without restoring the stack frame) - callee: Resolved callee function. + is_tail: A bool indicates that it is a tailing call. + callee: Resolved callee function. None if it hasn't been resolved. """ - # It makes no sense that both address and target are unknown. - assert not (address is None and target is None) - self.address = address - self.target = target - self.is_tail = is_tail - self.callee = callee - - def __eq__(self, other): - """Callsite equality. - - Args: - other: The compared object. - Returns: - True if equal, False if not. - """ - if not isinstance(other, Callsite): - return False - - if not (self.address == other.address and - self.target == other.target and - self.is_tail == other.is_tail): - return False - - if self.callee is None: - return other.callee is None - elif other.callee is None: - return False - - # Assume the addresses of functions are unique. - return self.callee.address == other.callee.address + def __init__(self, address, target, is_tail, callee=None): + """Constructor. + + Args: + address: Address of callsite location. None if it is unknown. + target: Callee address. None if it is unknown. + is_tail: A bool indicates that it is a tailing call. (function jump to + another function without restoring the stack frame) + callee: Resolved callee function. + """ + # It makes no sense that both address and target are unknown. + assert not (address is None and target is None) + self.address = address + self.target = target + self.is_tail = is_tail + self.callee = callee + + def __eq__(self, other): + """Callsite equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Callsite): + return False + + if not ( + self.address == other.address + and self.target == other.target + and self.is_tail == other.is_tail + ): + return False + + if self.callee is None: + return other.callee is None + elif other.callee is None: + return False + + # Assume the addresses of functions are unique. + return self.callee.address == other.callee.address class Function(object): - """Function. - - Attributes: - address: Address of function. - name: Name of function from its symbol. - stack_frame: Size of stack frame. - callsites: Callsite list. - stack_max_usage: Max stack usage. None if it hasn't been analyzed. - stack_max_path: Max stack usage path. None if it hasn't been analyzed. - """ + """Function. - def __init__(self, address, name, stack_frame, callsites): - """Constructor. - - Args: + Attributes: address: Address of function. name: Name of function from its symbol. stack_frame: Size of stack frame. callsites: Callsite list. + stack_max_usage: Max stack usage. None if it hasn't been analyzed. + stack_max_path: Max stack usage path. None if it hasn't been analyzed. """ - self.address = address - self.name = name - self.stack_frame = stack_frame - self.callsites = callsites - self.stack_max_usage = None - self.stack_max_path = None - - def __eq__(self, other): - """Function equality. - - Args: - other: The compared object. - - Returns: - True if equal, False if not. - """ - if not isinstance(other, Function): - return False - - if not (self.address == other.address and - self.name == other.name and - self.stack_frame == other.stack_frame and - self.callsites == other.callsites and - self.stack_max_usage == other.stack_max_usage): - return False - if self.stack_max_path is None: - return other.stack_max_path is None - elif other.stack_max_path is None: - return False + def __init__(self, address, name, stack_frame, callsites): + """Constructor. + + Args: + address: Address of function. + name: Name of function from its symbol. + stack_frame: Size of stack frame. + callsites: Callsite list. + """ + self.address = address + self.name = name + self.stack_frame = stack_frame + self.callsites = callsites + self.stack_max_usage = None + self.stack_max_path = None + + def __eq__(self, other): + """Function equality. + + Args: + other: The compared object. + + Returns: + True if equal, False if not. + """ + if not isinstance(other, Function): + return False + + if not ( + self.address == other.address + and self.name == other.name + and self.stack_frame == other.stack_frame + and self.callsites == other.callsites + and self.stack_max_usage == other.stack_max_usage + ): + return False + + if self.stack_max_path is None: + return other.stack_max_path is None + elif other.stack_max_path is None: + return False + + if len(self.stack_max_path) != len(other.stack_max_path): + return False + + for self_func, other_func in zip(self.stack_max_path, other.stack_max_path): + # Assume the addresses of functions are unique. + if self_func.address != other_func.address: + return False + + return True + + def __hash__(self): + return id(self) - if len(self.stack_max_path) != len(other.stack_max_path): - return False - - for self_func, other_func in zip(self.stack_max_path, other.stack_max_path): - # Assume the addresses of functions are unique. - if self_func.address != other_func.address: - return False - - return True - - def __hash__(self): - return id(self) class AndesAnalyzer(object): - """Disassembly analyzer for Andes architecture. - - Public Methods: - AnalyzeFunction: Analyze stack frame and callsites of the function. - """ - - GENERAL_PURPOSE_REGISTER_SIZE = 4 - - # Possible condition code suffixes. - CONDITION_CODES = [ 'eq', 'eqz', 'gez', 'gtz', 'lez', 'ltz', 'ne', 'nez', - 'eqc', 'nec', 'nezs', 'nes', 'eqs'] - CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) - - IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>' - # Branch instructions. - JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$' \ - .format(CONDITION_CODES_RE)) - # Call instructions. - CALL_OPCODE_RE = re.compile \ - (r'^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$') - CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE)) - # Ignore lp register because it's for return. - INDIRECT_CALL_OPERAND_RE = re.compile \ - (r'^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$') - # TODO: Handle other kinds of store instructions. - PUSH_OPCODE_RE = re.compile(r'^push(\d{1,})$') - PUSH_OPERAND_RE = re.compile(r'^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}') - SMW_OPCODE_RE = re.compile(r'^smw(\.\w\w|\.\w\w\w)$') - SMW_OPERAND_RE = re.compile(r'^(\$r\d{1,}|\$\wp), \[\$\wp\], ' - r'(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}') - OPERANDGROUP_RE = re.compile(r'^\$r\d{1,}\~\$r\d{1,}') - - LWI_OPCODE_RE = re.compile(r'^lwi(\.\w\w)$') - LWI_PC_OPERAND_RE = re.compile(r'^\$pc, \[([^\]]+)\]') - # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec" - # Assume there is always a "\t" after the hex data. - DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+' - r'(?P<words>[0-9A-Fa-f ]+)' - r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?') - - def ParseInstruction(self, line, function_end): - """Parse the line of instruction. + """Disassembly analyzer for Andes architecture. - Args: - line: Text of disassembly. - function_end: End address of the current function. None if unknown. - - Returns: - (address, words, opcode, operand_text): The instruction address, words, - opcode, and the text of operands. - None if it isn't an instruction line. + Public Methods: + AnalyzeFunction: Analyze stack frame and callsites of the function. """ - result = self.DISASM_REGEX_RE.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - # Check if it's out of bound. - if function_end is not None and address >= function_end: - return None - - opcode = result.group('opcode').strip() - operand_text = result.group('operand') - words = result.group('words') - if operand_text is None: - operand_text = '' - else: - operand_text = operand_text.strip() - - return (address, words, opcode, operand_text) - - def AnalyzeFunction(self, function_symbol, instructions): - - stack_frame = 0 - callsites = [] - for address, words, opcode, operand_text in instructions: - is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None - is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None - - if is_jump_opcode or is_call_opcode: - is_tail = is_jump_opcode - - result = self.CALL_OPERAND_RE.match(operand_text) + GENERAL_PURPOSE_REGISTER_SIZE = 4 + + # Possible condition code suffixes. + CONDITION_CODES = [ + "eq", + "eqz", + "gez", + "gtz", + "lez", + "ltz", + "ne", + "nez", + "eqc", + "nec", + "nezs", + "nes", + "eqs", + ] + CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES)) + + IMM_ADDRESS_RE = r"([0-9A-Fa-f]+)\s+<([^>]+)>" + # Branch instructions. + JUMP_OPCODE_RE = re.compile( + r"^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$".format(CONDITION_CODES_RE) + ) + # Call instructions. + CALL_OPCODE_RE = re.compile(r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$") + CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE)) + # Ignore lp register because it's for return. + INDIRECT_CALL_OPERAND_RE = re.compile(r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$") + # TODO: Handle other kinds of store instructions. + PUSH_OPCODE_RE = re.compile(r"^push(\d{1,})$") + PUSH_OPERAND_RE = re.compile(r"^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}") + SMW_OPCODE_RE = re.compile(r"^smw(\.\w\w|\.\w\w\w)$") + SMW_OPERAND_RE = re.compile( + r"^(\$r\d{1,}|\$\wp), \[\$\wp\], " + r"(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}" + ) + OPERANDGROUP_RE = re.compile(r"^\$r\d{1,}\~\$r\d{1,}") + + LWI_OPCODE_RE = re.compile(r"^lwi(\.\w\w)$") + LWI_PC_OPERAND_RE = re.compile(r"^\$pc, \[([^\]]+)\]") + # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec" + # Assume there is always a "\t" after the hex data. + DISASM_REGEX_RE = re.compile( + r"^(?P<address>[0-9A-Fa-f]+):\s+" + r"(?P<words>[0-9A-Fa-f ]+)" + r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?" + ) + + def ParseInstruction(self, line, function_end): + """Parse the line of instruction. + + Args: + line: Text of disassembly. + function_end: End address of the current function. None if unknown. + + Returns: + (address, words, opcode, operand_text): The instruction address, words, + opcode, and the text of operands. + None if it isn't an instruction line. + """ + result = self.DISASM_REGEX_RE.match(line) if result is None: - if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): - # Found an indirect call. - callsites.append(Callsite(address, None, is_tail)) - + return None + + address = int(result.group("address"), 16) + # Check if it's out of bound. + if function_end is not None and address >= function_end: + return None + + opcode = result.group("opcode").strip() + operand_text = result.group("operand") + words = result.group("words") + if operand_text is None: + operand_text = "" else: - target_address = int(result.group(1), 16) - # Filter out the in-function target (branches and in-function calls, - # which are actually branches). - if not (function_symbol.size > 0 and - function_symbol.address < target_address < - (function_symbol.address + function_symbol.size)): - # Maybe it is a callsite. - callsites.append(Callsite(address, target_address, is_tail)) - - elif self.LWI_OPCODE_RE.match(opcode) is not None: - result = self.LWI_PC_OPERAND_RE.match(operand_text) - if result is not None: - # Ignore "lwi $pc, [$sp], xx" because it's usually a return. - if result.group(1) != '$sp': - # Found an indirect call. - callsites.append(Callsite(address, None, True)) - - elif self.PUSH_OPCODE_RE.match(opcode) is not None: - # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp} - if self.PUSH_OPERAND_RE.match(operand_text) is not None: - # capture fc 20 - imm5u = int(words.split(' ')[1], 16) - # sp = sp - (imm5u << 3) - imm8u = (imm5u<<3) & 0xff - stack_frame += imm8u - - result = self.PUSH_OPERAND_RE.match(operand_text) - operandgroup_text = result.group(1) - # capture $rx~$ry - if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: - # capture number & transfer string to integer - oprandgrouphead = operandgroup_text.split(',')[0] - rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0]))) - ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1]))) - - stack_frame += ((len(operandgroup_text.split(','))+ry-rx) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - else: - stack_frame += (len(operandgroup_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - - elif self.SMW_OPCODE_RE.match(opcode) is not None: - # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp} - if self.SMW_OPERAND_RE.match(operand_text) is not None: - result = self.SMW_OPERAND_RE.match(operand_text) - operandgroup_text = result.group(3) - # capture $rx~$ry - if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: - # capture number & transfer string to integer - oprandgrouphead = operandgroup_text.split(',')[0] - rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0]))) - ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1]))) - - stack_frame += ((len(operandgroup_text.split(','))+ry-rx) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - else: - stack_frame += (len(operandgroup_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - - return (stack_frame, callsites) - -class ArmAnalyzer(object): - """Disassembly analyzer for ARM architecture. - - Public Methods: - AnalyzeFunction: Analyze stack frame and callsites of the function. - """ - - GENERAL_PURPOSE_REGISTER_SIZE = 4 - - # Possible condition code suffixes. - CONDITION_CODES = ['', 'eq', 'ne', 'cs', 'hs', 'cc', 'lo', 'mi', 'pl', 'vs', - 'vc', 'hi', 'ls', 'ge', 'lt', 'gt', 'le'] - CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) - # Assume there is no function name containing ">". - IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>' - - # Fuzzy regular expressions for instruction and operand parsing. - # Branch instructions. - JUMP_OPCODE_RE = re.compile( - r'^(b{0}|bx{0})(\.\w)?$'.format(CONDITION_CODES_RE)) - # Call instructions. - CALL_OPCODE_RE = re.compile( - r'^(bl{0}|blx{0})(\.\w)?$'.format(CONDITION_CODES_RE)) - CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE)) - CBZ_CBNZ_OPCODE_RE = re.compile(r'^(cbz|cbnz)(\.\w)?$') - # Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>" - CBZ_CBNZ_OPERAND_RE = re.compile(r'^[^,]+,\s+{}$'.format(IMM_ADDRESS_RE)) - # Ignore lr register because it's for return. - INDIRECT_CALL_OPERAND_RE = re.compile(r'^r\d+|sb|sl|fp|ip|sp|pc$') - # TODO(cheyuw): Handle conditional versions of following - # instructions. - # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc). - LDR_OPCODE_RE = re.compile(r'^ldr(\.\w)?$') - # Example: "pc, [sp], #4" - LDR_PC_OPERAND_RE = re.compile(r'^pc, \[([^\]]+)\]') - # TODO(cheyuw): Handle other kinds of stm instructions. - PUSH_OPCODE_RE = re.compile(r'^push$') - STM_OPCODE_RE = re.compile(r'^stmdb$') - # Stack subtraction instructions. - SUB_OPCODE_RE = re.compile(r'^sub(s|w)?(\.\w)?$') - SUB_OPERAND_RE = re.compile(r'^sp[^#]+#(\d+)') - # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68" - # Assume there is always a "\t" after the hex data. - DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+' - r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?') - - def ParseInstruction(self, line, function_end): - """Parse the line of instruction. + operand_text = operand_text.strip() + + return (address, words, opcode, operand_text) + + def AnalyzeFunction(self, function_symbol, instructions): + + stack_frame = 0 + callsites = [] + for address, words, opcode, operand_text in instructions: + is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None + is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None + + if is_jump_opcode or is_call_opcode: + is_tail = is_jump_opcode + + result = self.CALL_OPERAND_RE.match(operand_text) + + if result is None: + if self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None: + # Found an indirect call. + callsites.append(Callsite(address, None, is_tail)) + + else: + target_address = int(result.group(1), 16) + # Filter out the in-function target (branches and in-function calls, + # which are actually branches). + if not ( + function_symbol.size > 0 + and function_symbol.address + < target_address + < (function_symbol.address + function_symbol.size) + ): + # Maybe it is a callsite. + callsites.append(Callsite(address, target_address, is_tail)) + + elif self.LWI_OPCODE_RE.match(opcode) is not None: + result = self.LWI_PC_OPERAND_RE.match(operand_text) + if result is not None: + # Ignore "lwi $pc, [$sp], xx" because it's usually a return. + if result.group(1) != "$sp": + # Found an indirect call. + callsites.append(Callsite(address, None, True)) + + elif self.PUSH_OPCODE_RE.match(opcode) is not None: + # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp} + if self.PUSH_OPERAND_RE.match(operand_text) is not None: + # capture fc 20 + imm5u = int(words.split(" ")[1], 16) + # sp = sp - (imm5u << 3) + imm8u = (imm5u << 3) & 0xFF + stack_frame += imm8u + + result = self.PUSH_OPERAND_RE.match(operand_text) + operandgroup_text = result.group(1) + # capture $rx~$ry + if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: + # capture number & transfer string to integer + oprandgrouphead = operandgroup_text.split(",")[0] + rx = int( + "".join(filter(str.isdigit, oprandgrouphead.split("~")[0])) + ) + ry = int( + "".join(filter(str.isdigit, oprandgrouphead.split("~")[1])) + ) + + stack_frame += ( + len(operandgroup_text.split(",")) + ry - rx + ) * self.GENERAL_PURPOSE_REGISTER_SIZE + else: + stack_frame += ( + len(operandgroup_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + + elif self.SMW_OPCODE_RE.match(opcode) is not None: + # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp} + if self.SMW_OPERAND_RE.match(operand_text) is not None: + result = self.SMW_OPERAND_RE.match(operand_text) + operandgroup_text = result.group(3) + # capture $rx~$ry + if self.OPERANDGROUP_RE.match(operandgroup_text) is not None: + # capture number & transfer string to integer + oprandgrouphead = operandgroup_text.split(",")[0] + rx = int( + "".join(filter(str.isdigit, oprandgrouphead.split("~")[0])) + ) + ry = int( + "".join(filter(str.isdigit, oprandgrouphead.split("~")[1])) + ) + + stack_frame += ( + len(operandgroup_text.split(",")) + ry - rx + ) * self.GENERAL_PURPOSE_REGISTER_SIZE + else: + stack_frame += ( + len(operandgroup_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + + return (stack_frame, callsites) - Args: - line: Text of disassembly. - function_end: End address of the current function. None if unknown. - Returns: - (address, opcode, operand_text): The instruction address, opcode, - and the text of operands. None if it - isn't an instruction line. - """ - result = self.DISASM_REGEX_RE.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - # Check if it's out of bound. - if function_end is not None and address >= function_end: - return None - - opcode = result.group('opcode').strip() - operand_text = result.group('operand') - if operand_text is None: - operand_text = '' - else: - operand_text = operand_text.strip() - - return (address, opcode, operand_text) - - def AnalyzeFunction(self, function_symbol, instructions): - """Analyze function, resolve the size of stack frame and callsites. - - Args: - function_symbol: Function symbol. - instructions: Instruction list. +class ArmAnalyzer(object): + """Disassembly analyzer for ARM architecture. - Returns: - (stack_frame, callsites): Size of stack frame, callsite list. + Public Methods: + AnalyzeFunction: Analyze stack frame and callsites of the function. """ - stack_frame = 0 - callsites = [] - for address, opcode, operand_text in instructions: - is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None - is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None - is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None - if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode: - is_tail = is_jump_opcode or is_cbz_cbnz_opcode - - if is_cbz_cbnz_opcode: - result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text) - else: - result = self.CALL_OPERAND_RE.match(operand_text) + GENERAL_PURPOSE_REGISTER_SIZE = 4 + + # Possible condition code suffixes. + CONDITION_CODES = [ + "", + "eq", + "ne", + "cs", + "hs", + "cc", + "lo", + "mi", + "pl", + "vs", + "vc", + "hi", + "ls", + "ge", + "lt", + "gt", + "le", + ] + CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES)) + # Assume there is no function name containing ">". + IMM_ADDRESS_RE = r"([0-9A-Fa-f]+)\s+<([^>]+)>" + + # Fuzzy regular expressions for instruction and operand parsing. + # Branch instructions. + JUMP_OPCODE_RE = re.compile(r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE)) + # Call instructions. + CALL_OPCODE_RE = re.compile(r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE)) + CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE)) + CBZ_CBNZ_OPCODE_RE = re.compile(r"^(cbz|cbnz)(\.\w)?$") + # Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>" + CBZ_CBNZ_OPERAND_RE = re.compile(r"^[^,]+,\s+{}$".format(IMM_ADDRESS_RE)) + # Ignore lr register because it's for return. + INDIRECT_CALL_OPERAND_RE = re.compile(r"^r\d+|sb|sl|fp|ip|sp|pc$") + # TODO(cheyuw): Handle conditional versions of following + # instructions. + # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc). + LDR_OPCODE_RE = re.compile(r"^ldr(\.\w)?$") + # Example: "pc, [sp], #4" + LDR_PC_OPERAND_RE = re.compile(r"^pc, \[([^\]]+)\]") + # TODO(cheyuw): Handle other kinds of stm instructions. + PUSH_OPCODE_RE = re.compile(r"^push$") + STM_OPCODE_RE = re.compile(r"^stmdb$") + # Stack subtraction instructions. + SUB_OPCODE_RE = re.compile(r"^sub(s|w)?(\.\w)?$") + SUB_OPERAND_RE = re.compile(r"^sp[^#]+#(\d+)") + # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68" + # Assume there is always a "\t" after the hex data. + DISASM_REGEX_RE = re.compile( + r"^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+" + r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?" + ) + + def ParseInstruction(self, line, function_end): + """Parse the line of instruction. + + Args: + line: Text of disassembly. + function_end: End address of the current function. None if unknown. + + Returns: + (address, opcode, operand_text): The instruction address, opcode, + and the text of operands. None if it + isn't an instruction line. + """ + result = self.DISASM_REGEX_RE.match(line) if result is None: - # Failed to match immediate address, maybe it is an indirect call. - # CBZ and CBNZ can't be indirect calls. - if (not is_cbz_cbnz_opcode and - self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): - # Found an indirect call. - callsites.append(Callsite(address, None, is_tail)) + return None - else: - target_address = int(result.group(1), 16) - # Filter out the in-function target (branches and in-function calls, - # which are actually branches). - if not (function_symbol.size > 0 and - function_symbol.address < target_address < - (function_symbol.address + function_symbol.size)): - # Maybe it is a callsite. - callsites.append(Callsite(address, target_address, is_tail)) - - elif self.LDR_OPCODE_RE.match(opcode) is not None: - result = self.LDR_PC_OPERAND_RE.match(operand_text) - if result is not None: - # Ignore "ldr pc, [sp], xx" because it's usually a return. - if result.group(1) != 'sp': - # Found an indirect call. - callsites.append(Callsite(address, None, True)) - - elif self.PUSH_OPCODE_RE.match(opcode) is not None: - # Example: "{r4, r5, r6, r7, lr}" - stack_frame += (len(operand_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) - elif self.SUB_OPCODE_RE.match(opcode) is not None: - result = self.SUB_OPERAND_RE.match(operand_text) - if result is not None: - stack_frame += int(result.group(1)) - else: - # Unhandled stack register subtraction. - assert not operand_text.startswith('sp') + address = int(result.group("address"), 16) + # Check if it's out of bound. + if function_end is not None and address >= function_end: + return None - elif self.STM_OPCODE_RE.match(opcode) is not None: - if operand_text.startswith('sp!'): - # Subtract and writeback to stack register. - # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}" - # Get the text of pushed register list. - unused_sp, unused_sep, parameter_text = operand_text.partition(',') - stack_frame += (len(parameter_text.split(',')) * - self.GENERAL_PURPOSE_REGISTER_SIZE) + opcode = result.group("opcode").strip() + operand_text = result.group("operand") + if operand_text is None: + operand_text = "" + else: + operand_text = operand_text.strip() + + return (address, opcode, operand_text) + + def AnalyzeFunction(self, function_symbol, instructions): + """Analyze function, resolve the size of stack frame and callsites. + + Args: + function_symbol: Function symbol. + instructions: Instruction list. + + Returns: + (stack_frame, callsites): Size of stack frame, callsite list. + """ + stack_frame = 0 + callsites = [] + for address, opcode, operand_text in instructions: + is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None + is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None + is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None + if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode: + is_tail = is_jump_opcode or is_cbz_cbnz_opcode + + if is_cbz_cbnz_opcode: + result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text) + else: + result = self.CALL_OPERAND_RE.match(operand_text) + + if result is None: + # Failed to match immediate address, maybe it is an indirect call. + # CBZ and CBNZ can't be indirect calls. + if ( + not is_cbz_cbnz_opcode + and self.INDIRECT_CALL_OPERAND_RE.match(operand_text) + is not None + ): + # Found an indirect call. + callsites.append(Callsite(address, None, is_tail)) + + else: + target_address = int(result.group(1), 16) + # Filter out the in-function target (branches and in-function calls, + # which are actually branches). + if not ( + function_symbol.size > 0 + and function_symbol.address + < target_address + < (function_symbol.address + function_symbol.size) + ): + # Maybe it is a callsite. + callsites.append(Callsite(address, target_address, is_tail)) + + elif self.LDR_OPCODE_RE.match(opcode) is not None: + result = self.LDR_PC_OPERAND_RE.match(operand_text) + if result is not None: + # Ignore "ldr pc, [sp], xx" because it's usually a return. + if result.group(1) != "sp": + # Found an indirect call. + callsites.append(Callsite(address, None, True)) + + elif self.PUSH_OPCODE_RE.match(opcode) is not None: + # Example: "{r4, r5, r6, r7, lr}" + stack_frame += ( + len(operand_text.split(",")) * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + elif self.SUB_OPCODE_RE.match(opcode) is not None: + result = self.SUB_OPERAND_RE.match(operand_text) + if result is not None: + stack_frame += int(result.group(1)) + else: + # Unhandled stack register subtraction. + assert not operand_text.startswith("sp") + + elif self.STM_OPCODE_RE.match(opcode) is not None: + if operand_text.startswith("sp!"): + # Subtract and writeback to stack register. + # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}" + # Get the text of pushed register list. + unused_sp, unused_sep, parameter_text = operand_text.partition(",") + stack_frame += ( + len(parameter_text.split(",")) + * self.GENERAL_PURPOSE_REGISTER_SIZE + ) + + return (stack_frame, callsites) - return (stack_frame, callsites) class RiscvAnalyzer(object): - """Disassembly analyzer for RISC-V architecture. - - Public Methods: - AnalyzeFunction: Analyze stack frame and callsites of the function. - """ - - # Possible condition code suffixes. - CONDITION_CODES = [ 'eqz', 'nez', 'lez', 'gez', 'ltz', 'gtz', 'gt', 'le', - 'gtu', 'leu', 'eq', 'ne', 'ge', 'lt', 'ltu', 'geu'] - CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES)) - # Branch instructions. - JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr)$'.format(CONDITION_CODES_RE)) - # Call instructions. - CALL_OPCODE_RE = re.compile(r'^(jal|jalr)$') - # Example: "j 8009b318 <set_state_prl_hr>" or - # "jal ra,800a4394 <power_get_signals>" or - # "bltu t0,t1,80080300 <data_loop>" - JUMP_ADDRESS_RE = r'((\w(\w|\d\d),){0,2})([0-9A-Fa-f]+)\s+<([^>]+)>' - CALL_OPERAND_RE = re.compile(r'^{}$'.format(JUMP_ADDRESS_RE)) - # Capture address, Example: 800a4394 - CAPTURE_ADDRESS = re.compile(r'[0-9A-Fa-f]{8}') - # Indirect jump, Example: jalr a5 - INDIRECT_CALL_OPERAND_RE = re.compile(r'^t\d+|s\d+|a\d+$') - # Example: addi - ADDI_OPCODE_RE = re.compile(r'^addi$') - # Allocate stack instructions. - ADDI_OPERAND_RE = re.compile(r'^(sp,sp,-\d+)$') - # Example: "800804b6: 1101 addi sp,sp,-32" - DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+' - r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?') - - def ParseInstruction(self, line, function_end): - """Parse the line of instruction. + """Disassembly analyzer for RISC-V architecture. - Args: - line: Text of disassembly. - function_end: End address of the current function. None if unknown. - - Returns: - (address, opcode, operand_text): The instruction address, opcode, - and the text of operands. None if it - isn't an instruction line. + Public Methods: + AnalyzeFunction: Analyze stack frame and callsites of the function. """ - result = self.DISASM_REGEX_RE.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - # Check if it's out of bound. - if function_end is not None and address >= function_end: - return None - - opcode = result.group('opcode').strip() - operand_text = result.group('operand') - if operand_text is None: - operand_text = '' - else: - operand_text = operand_text.strip() - - return (address, opcode, operand_text) - - def AnalyzeFunction(self, function_symbol, instructions): - - stack_frame = 0 - callsites = [] - for address, opcode, operand_text in instructions: - is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None - is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None - if is_jump_opcode or is_call_opcode: - is_tail = is_jump_opcode - - result = self.CALL_OPERAND_RE.match(operand_text) + # Possible condition code suffixes. + CONDITION_CODES = [ + "eqz", + "nez", + "lez", + "gez", + "ltz", + "gtz", + "gt", + "le", + "gtu", + "leu", + "eq", + "ne", + "ge", + "lt", + "ltu", + "geu", + ] + CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES)) + # Branch instructions. + JUMP_OPCODE_RE = re.compile(r"^(b{0}|j|jr)$".format(CONDITION_CODES_RE)) + # Call instructions. + CALL_OPCODE_RE = re.compile(r"^(jal|jalr)$") + # Example: "j 8009b318 <set_state_prl_hr>" or + # "jal ra,800a4394 <power_get_signals>" or + # "bltu t0,t1,80080300 <data_loop>" + JUMP_ADDRESS_RE = r"((\w(\w|\d\d),){0,2})([0-9A-Fa-f]+)\s+<([^>]+)>" + CALL_OPERAND_RE = re.compile(r"^{}$".format(JUMP_ADDRESS_RE)) + # Capture address, Example: 800a4394 + CAPTURE_ADDRESS = re.compile(r"[0-9A-Fa-f]{8}") + # Indirect jump, Example: jalr a5 + INDIRECT_CALL_OPERAND_RE = re.compile(r"^t\d+|s\d+|a\d+$") + # Example: addi + ADDI_OPCODE_RE = re.compile(r"^addi$") + # Allocate stack instructions. + ADDI_OPERAND_RE = re.compile(r"^(sp,sp,-\d+)$") + # Example: "800804b6: 1101 addi sp,sp,-32" + DISASM_REGEX_RE = re.compile( + r"^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+" + r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?" + ) + + def ParseInstruction(self, line, function_end): + """Parse the line of instruction. + + Args: + line: Text of disassembly. + function_end: End address of the current function. None if unknown. + + Returns: + (address, opcode, operand_text): The instruction address, opcode, + and the text of operands. None if it + isn't an instruction line. + """ + result = self.DISASM_REGEX_RE.match(line) if result is None: - if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None): - # Found an indirect call. - callsites.append(Callsite(address, None, is_tail)) - - else: - # Capture address form operand_text and then convert to string - address_str = "".join(self.CAPTURE_ADDRESS.findall(operand_text)) - # String to integer - target_address = int(address_str, 16) - # Filter out the in-function target (branches and in-function calls, - # which are actually branches). - if not (function_symbol.size > 0 and - function_symbol.address < target_address < - (function_symbol.address + function_symbol.size)): - # Maybe it is a callsite. - callsites.append(Callsite(address, target_address, is_tail)) - - elif self.ADDI_OPCODE_RE.match(opcode) is not None: - # Example: sp,sp,-32 - if self.ADDI_OPERAND_RE.match(operand_text) is not None: - stack_frame += abs(int(operand_text.split(",")[2])) - - return (stack_frame, callsites) - -class StackAnalyzer(object): - """Class to analyze stack usage. - - Public Methods: - Analyze: Run the stack analysis. - """ - - C_FUNCTION_NAME = r'_A-Za-z0-9' - - # Assume there is no ":" in the path. - # Example: "driver/accel_kionix.c:321 (discriminator 3)" - ADDRTOLINE_RE = re.compile( - r'^(?P<path>[^:]+):(?P<linenum>\d+)(\s+\(discriminator\s+\d+\))?$') - # To eliminate the suffix appended by compilers, try to extract the - # C function name from the prefix of symbol name. - # Example: "SHA256_transform.constprop.28" - FUNCTION_PREFIX_NAME_RE = re.compile( - r'^(?P<name>[{0}]+)([^{0}].*)?$'.format(C_FUNCTION_NAME)) - - # Errors of annotation resolving. - ANNOTATION_ERROR_INVALID = 'invalid signature' - ANNOTATION_ERROR_NOTFOUND = 'function is not found' - ANNOTATION_ERROR_AMBIGUOUS = 'signature is ambiguous' - - def __init__(self, options, symbols, rodata, tasklist, annotation): - """Constructor. - - Args: - options: Namespace from argparse.parse_args(). - symbols: Symbol list. - rodata: Content of .rodata section (offset, data) - tasklist: Task list. - annotation: Annotation config. - """ - self.options = options - self.symbols = symbols - self.rodata_offset = rodata[0] - self.rodata = rodata[1] - self.tasklist = tasklist - self.annotation = annotation - self.address_to_line_cache = {} - - def AddressToLine(self, address, resolve_inline=False): - """Convert address to line. - - Args: - address: Target address. - resolve_inline: Output the stack of inlining. - - Returns: - lines: List of the corresponding lines. - - Raises: - StackAnalyzerError: If addr2line is failed. - """ - cache_key = (address, resolve_inline) - if cache_key in self.address_to_line_cache: - return self.address_to_line_cache[cache_key] - - try: - args = [self.options.addr2line, - '-f', - '-e', - self.options.elf_path, - '{:x}'.format(address)] - if resolve_inline: - args.append('-i') - - line_text = subprocess.check_output(args, encoding='utf-8') - except subprocess.CalledProcessError: - raise StackAnalyzerError('addr2line failed to resolve lines.') - except OSError: - raise StackAnalyzerError('Failed to run addr2line.') - - lines = [line.strip() for line in line_text.splitlines()] - # Assume the output has at least one pair like "function\nlocation\n", and - # they always show up in pairs. - # Example: "handle_request\n - # common/usb_pd_protocol.c:1191\n" - assert len(lines) >= 2 and len(lines) % 2 == 0 - - line_infos = [] - for index in range(0, len(lines), 2): - (function_name, line_text) = lines[index:index + 2] - if line_text in ['??:0', ':?']: - line_infos.append(None) - else: - result = self.ADDRTOLINE_RE.match(line_text) - # Assume the output is always well-formed. - assert result is not None - line_infos.append((function_name.strip(), - os.path.realpath(result.group('path').strip()), - int(result.group('linenum')))) - - self.address_to_line_cache[cache_key] = line_infos - return line_infos - - def AnalyzeDisassembly(self, disasm_text): - """Parse the disassembly text, analyze, and build a map of all functions. - - Args: - disasm_text: Disassembly text. - - Returns: - function_map: Dict of functions. - """ - disasm_lines = [line.strip() for line in disasm_text.splitlines()] - - if 'nds' in disasm_lines[1]: - analyzer = AndesAnalyzer() - elif 'arm' in disasm_lines[1]: - analyzer = ArmAnalyzer() - elif 'riscv' in disasm_lines[1]: - analyzer = RiscvAnalyzer() - else: - raise StackAnalyzerError('Unsupported architecture.') - - # Example: "08028c8c <motion_lid_calc>:" - function_signature_regex = re.compile( - r'^(?P<address>[0-9A-Fa-f]+)\s+<(?P<name>[^>]+)>:$') - - def DetectFunctionHead(line): - """Check if the line is a function head. - - Args: - line: Text of disassembly. - - Returns: - symbol: Function symbol. None if it isn't a function head. - """ - result = function_signature_regex.match(line) - if result is None: - return None - - address = int(result.group('address'), 16) - symbol = symbol_map.get(address) - - # Check if the function exists and matches. - if symbol is None or symbol.symtype != 'F': - return None - - return symbol - - # Build symbol map, indexed by symbol address. - symbol_map = {} - for symbol in self.symbols: - # If there are multiple symbols with same address, keeping any of them is - # good enough. - symbol_map[symbol.address] = symbol - - # Parse the disassembly text. We update the variable "line" to next line - # when needed. There are two steps of parser: - # - # Step 1: Searching for the function head. Once reach the function head, - # move to the next line, which is the first line of function body. - # - # Step 2: Parsing each instruction line of function body. Once reach a - # non-instruction line, stop parsing and analyze the parsed instructions. - # - # Finally turn back to the step 1 without updating the line, because the - # current non-instruction line can be another function head. - function_map = {} - # The following three variables are the states of the parsing processing. - # They will be initialized properly during the state changes. - function_symbol = None - function_end = None - instructions = [] - - # Remove heading and tailing spaces for each line. - line_index = 0 - while line_index < len(disasm_lines): - # Get the current line. - line = disasm_lines[line_index] - - if function_symbol is None: - # Step 1: Search for the function head. - - function_symbol = DetectFunctionHead(line) - if function_symbol is not None: - # Assume there is no empty function. If the function head is followed - # by EOF, it is an empty function. - assert line_index + 1 < len(disasm_lines) - - # Found the function head, initialize and turn to the step 2. - instructions = [] - # If symbol size exists, use it as a hint of function size. - if function_symbol.size > 0: - function_end = function_symbol.address + function_symbol.size - else: - function_end = None - - else: - # Step 2: Parse the function body. - - instruction = analyzer.ParseInstruction(line, function_end) - if instruction is not None: - instructions.append(instruction) - - if instruction is None or line_index + 1 == len(disasm_lines): - # Either the invalid instruction or EOF indicates the end of the - # function, finalize the function analysis. - - # Assume there is no empty function. - assert len(instructions) > 0 - - (stack_frame, callsites) = analyzer.AnalyzeFunction(function_symbol, - instructions) - # Assume the function addresses are unique in the disassembly. - assert function_symbol.address not in function_map - function_map[function_symbol.address] = Function( - function_symbol.address, - function_symbol.name, - stack_frame, - callsites) - - # Initialize and turn back to the step 1. - function_symbol = None - - # If the current line isn't an instruction, it can be another function - # head, skip moving to the next line. - if instruction is None: - continue - - # Move to the next line. - line_index += 1 + return None - # Resolve callees of functions. - for function in function_map.values(): - for callsite in function.callsites: - if callsite.target is not None: - # Remain the callee as None if we can't resolve it. - callsite.callee = function_map.get(callsite.target) + address = int(result.group("address"), 16) + # Check if it's out of bound. + if function_end is not None and address >= function_end: + return None - return function_map + opcode = result.group("opcode").strip() + operand_text = result.group("operand") + if operand_text is None: + operand_text = "" + else: + operand_text = operand_text.strip() + + return (address, opcode, operand_text) + + def AnalyzeFunction(self, function_symbol, instructions): + + stack_frame = 0 + callsites = [] + for address, opcode, operand_text in instructions: + is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None + is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None + + if is_jump_opcode or is_call_opcode: + is_tail = is_jump_opcode + + result = self.CALL_OPERAND_RE.match(operand_text) + if result is None: + if self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None: + # Found an indirect call. + callsites.append(Callsite(address, None, is_tail)) + + else: + # Capture address form operand_text and then convert to string + address_str = "".join(self.CAPTURE_ADDRESS.findall(operand_text)) + # String to integer + target_address = int(address_str, 16) + # Filter out the in-function target (branches and in-function calls, + # which are actually branches). + if not ( + function_symbol.size > 0 + and function_symbol.address + < target_address + < (function_symbol.address + function_symbol.size) + ): + # Maybe it is a callsite. + callsites.append(Callsite(address, target_address, is_tail)) + + elif self.ADDI_OPCODE_RE.match(opcode) is not None: + # Example: sp,sp,-32 + if self.ADDI_OPERAND_RE.match(operand_text) is not None: + stack_frame += abs(int(operand_text.split(",")[2])) + + return (stack_frame, callsites) - def MapAnnotation(self, function_map, signature_set): - """Map annotation signatures to functions. - Args: - function_map: Function map. - signature_set: Set of annotation signatures. +class StackAnalyzer(object): + """Class to analyze stack usage. - Returns: - Map of signatures to functions, map of signatures which can't be resolved. + Public Methods: + Analyze: Run the stack analysis. """ - # Build the symbol map indexed by symbol name. If there are multiple symbols - # with the same name, add them into a set. (e.g. symbols of static function - # with the same name) - symbol_map = collections.defaultdict(set) - for symbol in self.symbols: - if symbol.symtype == 'F': - # Function symbol. - result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) - if result is not None: - function = function_map.get(symbol.address) - # Ignore the symbol not in disassembly. - if function is not None: - # If there are multiple symbol with the same name and point to the - # same function, the set will deduplicate them. - symbol_map[result.group('name').strip()].add(function) - - # Build the signature map indexed by annotation signature. - signature_map = {} - sig_error_map = {} - symbol_path_map = {} - for sig in signature_set: - (name, path, _) = sig - - functions = symbol_map.get(name) - if functions is None: - sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND - continue - - if name not in symbol_path_map: - # Lazy symbol path resolving. Since the addr2line isn't fast, only - # resolve needed symbol paths. - group_map = collections.defaultdict(list) - for function in functions: - line_info = self.AddressToLine(function.address)[0] - if line_info is None: - continue - (_, symbol_path, _) = line_info + C_FUNCTION_NAME = r"_A-Za-z0-9" - # Group the functions with the same symbol signature (symbol name + - # symbol path). Assume they are the same copies and do the same - # annotation operations of them because we don't know which copy is - # indicated by the users. - group_map[symbol_path].append(function) + # Assume there is no ":" in the path. + # Example: "driver/accel_kionix.c:321 (discriminator 3)" + ADDRTOLINE_RE = re.compile( + r"^(?P<path>[^:]+):(?P<linenum>\d+)(\s+\(discriminator\s+\d+\))?$" + ) + # To eliminate the suffix appended by compilers, try to extract the + # C function name from the prefix of symbol name. + # Example: "SHA256_transform.constprop.28" + FUNCTION_PREFIX_NAME_RE = re.compile( + r"^(?P<name>[{0}]+)([^{0}].*)?$".format(C_FUNCTION_NAME) + ) + + # Errors of annotation resolving. + ANNOTATION_ERROR_INVALID = "invalid signature" + ANNOTATION_ERROR_NOTFOUND = "function is not found" + ANNOTATION_ERROR_AMBIGUOUS = "signature is ambiguous" + + def __init__(self, options, symbols, rodata, tasklist, annotation): + """Constructor. + + Args: + options: Namespace from argparse.parse_args(). + symbols: Symbol list. + rodata: Content of .rodata section (offset, data) + tasklist: Task list. + annotation: Annotation config. + """ + self.options = options + self.symbols = symbols + self.rodata_offset = rodata[0] + self.rodata = rodata[1] + self.tasklist = tasklist + self.annotation = annotation + self.address_to_line_cache = {} + + def AddressToLine(self, address, resolve_inline=False): + """Convert address to line. + + Args: + address: Target address. + resolve_inline: Output the stack of inlining. + + Returns: + lines: List of the corresponding lines. + + Raises: + StackAnalyzerError: If addr2line is failed. + """ + cache_key = (address, resolve_inline) + if cache_key in self.address_to_line_cache: + return self.address_to_line_cache[cache_key] + + try: + args = [ + self.options.addr2line, + "-f", + "-e", + self.options.elf_path, + "{:x}".format(address), + ] + if resolve_inline: + args.append("-i") + + line_text = subprocess.check_output(args, encoding="utf-8") + except subprocess.CalledProcessError: + raise StackAnalyzerError("addr2line failed to resolve lines.") + except OSError: + raise StackAnalyzerError("Failed to run addr2line.") + + lines = [line.strip() for line in line_text.splitlines()] + # Assume the output has at least one pair like "function\nlocation\n", and + # they always show up in pairs. + # Example: "handle_request\n + # common/usb_pd_protocol.c:1191\n" + assert len(lines) >= 2 and len(lines) % 2 == 0 + + line_infos = [] + for index in range(0, len(lines), 2): + (function_name, line_text) = lines[index : index + 2] + if line_text in ["??:0", ":?"]: + line_infos.append(None) + else: + result = self.ADDRTOLINE_RE.match(line_text) + # Assume the output is always well-formed. + assert result is not None + line_infos.append( + ( + function_name.strip(), + os.path.realpath(result.group("path").strip()), + int(result.group("linenum")), + ) + ) + + self.address_to_line_cache[cache_key] = line_infos + return line_infos + + def AnalyzeDisassembly(self, disasm_text): + """Parse the disassembly text, analyze, and build a map of all functions. + + Args: + disasm_text: Disassembly text. + + Returns: + function_map: Dict of functions. + """ + disasm_lines = [line.strip() for line in disasm_text.splitlines()] + + if "nds" in disasm_lines[1]: + analyzer = AndesAnalyzer() + elif "arm" in disasm_lines[1]: + analyzer = ArmAnalyzer() + elif "riscv" in disasm_lines[1]: + analyzer = RiscvAnalyzer() + else: + raise StackAnalyzerError("Unsupported architecture.") - symbol_path_map[name] = group_map + # Example: "08028c8c <motion_lid_calc>:" + function_signature_regex = re.compile( + r"^(?P<address>[0-9A-Fa-f]+)\s+<(?P<name>[^>]+)>:$" + ) - # Symbol matching. - function_group = None - group_map = symbol_path_map[name] - if len(group_map) > 0: - if path is None: - if len(group_map) > 1: - # There is ambiguity but the path isn't specified. - sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS - continue + def DetectFunctionHead(line): + """Check if the line is a function head. - # No path signature but all symbol signatures of functions are same. - # Assume they are the same functions, so there is no ambiguity. - (function_group,) = group_map.values() - else: - function_group = group_map.get(path) + Args: + line: Text of disassembly. - if function_group is None: - sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND - continue + Returns: + symbol: Function symbol. None if it isn't a function head. + """ + result = function_signature_regex.match(line) + if result is None: + return None - # The function_group is a list of all the same functions (according to - # our assumption) which should be annotated together. - signature_map[sig] = function_group + address = int(result.group("address"), 16) + symbol = symbol_map.get(address) - return (signature_map, sig_error_map) + # Check if the function exists and matches. + if symbol is None or symbol.symtype != "F": + return None - def LoadAnnotation(self): - """Load annotation rules. + return symbol - Returns: - Map of add rules, set of remove rules, set of text signatures which can't - be parsed. - """ - # Assume there is no ":" in the path. - # Example: "get_range.lto.2501[driver/accel_kionix.c:327]" - annotation_signature_regex = re.compile( - r'^(?P<name>[^\[]+)(\[(?P<path>[^:]+)(:(?P<linenum>\d+))?\])?$') - - def NormalizeSignature(signature_text): - """Parse and normalize the annotation signature. - - Args: - signature_text: Text of the annotation signature. - - Returns: - (function name, path, line number) of the signature. The path and line - number can be None if not exist. None if failed to parse. - """ - result = annotation_signature_regex.match(signature_text.strip()) - if result is None: - return None - - name_result = self.FUNCTION_PREFIX_NAME_RE.match( - result.group('name').strip()) - if name_result is None: - return None - - path = result.group('path') - if path is not None: - path = os.path.realpath(path.strip()) - - linenum = result.group('linenum') - if linenum is not None: - linenum = int(linenum.strip()) - - return (name_result.group('name').strip(), path, linenum) - - def ExpandArray(dic): - """Parse and expand a symbol array - - Args: - dic: Dictionary for the array annotation - - Returns: - array of (symbol name, None, None). - """ - # TODO(drinkcat): This function is quite inefficient, as it goes through - # the symbol table multiple times. - - begin_name = dic['name'] - end_name = dic['name'] + "_end" - offset = dic['offset'] if 'offset' in dic else 0 - stride = dic['stride'] - - begin_address = None - end_address = None - - for symbol in self.symbols: - if (symbol.name == begin_name): - begin_address = symbol.address - if (symbol.name == end_name): - end_address = symbol.address - - if (not begin_address or not end_address): - return None - - output = [] - # TODO(drinkcat): This is inefficient as we go from address to symbol - # object then to symbol name, and later on we'll go back from symbol name - # to symbol object. - for addr in range(begin_address+offset, end_address, stride): - # TODO(drinkcat): Not all architectures need to drop the first bit. - val = self.rodata[(addr-self.rodata_offset) // 4] & 0xfffffffe - name = None + # Build symbol map, indexed by symbol address. + symbol_map = {} for symbol in self.symbols: - if (symbol.address == val): - result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) - name = result.group('name') - break - - if not name: - raise StackAnalyzerError('Cannot find function for address %s.', - hex(val)) - - output.append((name, None, None)) - - return output - - add_rules = collections.defaultdict(set) - remove_rules = list() - invalid_sigtxts = set() - - if 'add' in self.annotation and self.annotation['add'] is not None: - for src_sigtxt, dst_sigtxts in self.annotation['add'].items(): - src_sig = NormalizeSignature(src_sigtxt) - if src_sig is None: - invalid_sigtxts.add(src_sigtxt) - continue - - for dst_sigtxt in dst_sigtxts: - if isinstance(dst_sigtxt, dict): - dst_sig = ExpandArray(dst_sigtxt) - if dst_sig is None: - invalid_sigtxts.add(str(dst_sigtxt)) + # If there are multiple symbols with same address, keeping any of them is + # good enough. + symbol_map[symbol.address] = symbol + + # Parse the disassembly text. We update the variable "line" to next line + # when needed. There are two steps of parser: + # + # Step 1: Searching for the function head. Once reach the function head, + # move to the next line, which is the first line of function body. + # + # Step 2: Parsing each instruction line of function body. Once reach a + # non-instruction line, stop parsing and analyze the parsed instructions. + # + # Finally turn back to the step 1 without updating the line, because the + # current non-instruction line can be another function head. + function_map = {} + # The following three variables are the states of the parsing processing. + # They will be initialized properly during the state changes. + function_symbol = None + function_end = None + instructions = [] + + # Remove heading and tailing spaces for each line. + line_index = 0 + while line_index < len(disasm_lines): + # Get the current line. + line = disasm_lines[line_index] + + if function_symbol is None: + # Step 1: Search for the function head. + + function_symbol = DetectFunctionHead(line) + if function_symbol is not None: + # Assume there is no empty function. If the function head is followed + # by EOF, it is an empty function. + assert line_index + 1 < len(disasm_lines) + + # Found the function head, initialize and turn to the step 2. + instructions = [] + # If symbol size exists, use it as a hint of function size. + if function_symbol.size > 0: + function_end = function_symbol.address + function_symbol.size + else: + function_end = None + else: - add_rules[src_sig].update(dst_sig) - else: - dst_sig = NormalizeSignature(dst_sigtxt) - if dst_sig is None: - invalid_sigtxts.add(dst_sigtxt) + # Step 2: Parse the function body. + + instruction = analyzer.ParseInstruction(line, function_end) + if instruction is not None: + instructions.append(instruction) + + if instruction is None or line_index + 1 == len(disasm_lines): + # Either the invalid instruction or EOF indicates the end of the + # function, finalize the function analysis. + + # Assume there is no empty function. + assert len(instructions) > 0 + + (stack_frame, callsites) = analyzer.AnalyzeFunction( + function_symbol, instructions + ) + # Assume the function addresses are unique in the disassembly. + assert function_symbol.address not in function_map + function_map[function_symbol.address] = Function( + function_symbol.address, + function_symbol.name, + stack_frame, + callsites, + ) + + # Initialize and turn back to the step 1. + function_symbol = None + + # If the current line isn't an instruction, it can be another function + # head, skip moving to the next line. + if instruction is None: + continue + + # Move to the next line. + line_index += 1 + + # Resolve callees of functions. + for function in function_map.values(): + for callsite in function.callsites: + if callsite.target is not None: + # Remain the callee as None if we can't resolve it. + callsite.callee = function_map.get(callsite.target) + + return function_map + + def MapAnnotation(self, function_map, signature_set): + """Map annotation signatures to functions. + + Args: + function_map: Function map. + signature_set: Set of annotation signatures. + + Returns: + Map of signatures to functions, map of signatures which can't be resolved. + """ + # Build the symbol map indexed by symbol name. If there are multiple symbols + # with the same name, add them into a set. (e.g. symbols of static function + # with the same name) + symbol_map = collections.defaultdict(set) + for symbol in self.symbols: + if symbol.symtype == "F": + # Function symbol. + result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) + if result is not None: + function = function_map.get(symbol.address) + # Ignore the symbol not in disassembly. + if function is not None: + # If there are multiple symbol with the same name and point to the + # same function, the set will deduplicate them. + symbol_map[result.group("name").strip()].add(function) + + # Build the signature map indexed by annotation signature. + signature_map = {} + sig_error_map = {} + symbol_path_map = {} + for sig in signature_set: + (name, path, _) = sig + + functions = symbol_map.get(name) + if functions is None: + sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND + continue + + if name not in symbol_path_map: + # Lazy symbol path resolving. Since the addr2line isn't fast, only + # resolve needed symbol paths. + group_map = collections.defaultdict(list) + for function in functions: + line_info = self.AddressToLine(function.address)[0] + if line_info is None: + continue + + (_, symbol_path, _) = line_info + + # Group the functions with the same symbol signature (symbol name + + # symbol path). Assume they are the same copies and do the same + # annotation operations of them because we don't know which copy is + # indicated by the users. + group_map[symbol_path].append(function) + + symbol_path_map[name] = group_map + + # Symbol matching. + function_group = None + group_map = symbol_path_map[name] + if len(group_map) > 0: + if path is None: + if len(group_map) > 1: + # There is ambiguity but the path isn't specified. + sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS + continue + + # No path signature but all symbol signatures of functions are same. + # Assume they are the same functions, so there is no ambiguity. + (function_group,) = group_map.values() + else: + function_group = group_map.get(path) + + if function_group is None: + sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND + continue + + # The function_group is a list of all the same functions (according to + # our assumption) which should be annotated together. + signature_map[sig] = function_group + + return (signature_map, sig_error_map) + + def LoadAnnotation(self): + """Load annotation rules. + + Returns: + Map of add rules, set of remove rules, set of text signatures which can't + be parsed. + """ + # Assume there is no ":" in the path. + # Example: "get_range.lto.2501[driver/accel_kionix.c:327]" + annotation_signature_regex = re.compile( + r"^(?P<name>[^\[]+)(\[(?P<path>[^:]+)(:(?P<linenum>\d+))?\])?$" + ) + + def NormalizeSignature(signature_text): + """Parse and normalize the annotation signature. + + Args: + signature_text: Text of the annotation signature. + + Returns: + (function name, path, line number) of the signature. The path and line + number can be None if not exist. None if failed to parse. + """ + result = annotation_signature_regex.match(signature_text.strip()) + if result is None: + return None + + name_result = self.FUNCTION_PREFIX_NAME_RE.match( + result.group("name").strip() + ) + if name_result is None: + return None + + path = result.group("path") + if path is not None: + path = os.path.realpath(path.strip()) + + linenum = result.group("linenum") + if linenum is not None: + linenum = int(linenum.strip()) + + return (name_result.group("name").strip(), path, linenum) + + def ExpandArray(dic): + """Parse and expand a symbol array + + Args: + dic: Dictionary for the array annotation + + Returns: + array of (symbol name, None, None). + """ + # TODO(drinkcat): This function is quite inefficient, as it goes through + # the symbol table multiple times. + + begin_name = dic["name"] + end_name = dic["name"] + "_end" + offset = dic["offset"] if "offset" in dic else 0 + stride = dic["stride"] + + begin_address = None + end_address = None + + for symbol in self.symbols: + if symbol.name == begin_name: + begin_address = symbol.address + if symbol.name == end_name: + end_address = symbol.address + + if not begin_address or not end_address: + return None + + output = [] + # TODO(drinkcat): This is inefficient as we go from address to symbol + # object then to symbol name, and later on we'll go back from symbol name + # to symbol object. + for addr in range(begin_address + offset, end_address, stride): + # TODO(drinkcat): Not all architectures need to drop the first bit. + val = self.rodata[(addr - self.rodata_offset) // 4] & 0xFFFFFFFE + name = None + for symbol in self.symbols: + if symbol.address == val: + result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name) + name = result.group("name") + break + + if not name: + raise StackAnalyzerError( + "Cannot find function for address %s.", hex(val) + ) + + output.append((name, None, None)) + + return output + + add_rules = collections.defaultdict(set) + remove_rules = list() + invalid_sigtxts = set() + + if "add" in self.annotation and self.annotation["add"] is not None: + for src_sigtxt, dst_sigtxts in self.annotation["add"].items(): + src_sig = NormalizeSignature(src_sigtxt) + if src_sig is None: + invalid_sigtxts.add(src_sigtxt) + continue + + for dst_sigtxt in dst_sigtxts: + if isinstance(dst_sigtxt, dict): + dst_sig = ExpandArray(dst_sigtxt) + if dst_sig is None: + invalid_sigtxts.add(str(dst_sigtxt)) + else: + add_rules[src_sig].update(dst_sig) + else: + dst_sig = NormalizeSignature(dst_sigtxt) + if dst_sig is None: + invalid_sigtxts.add(dst_sigtxt) + else: + add_rules[src_sig].add(dst_sig) + + if "remove" in self.annotation and self.annotation["remove"] is not None: + for sigtxt_path in self.annotation["remove"]: + if isinstance(sigtxt_path, str): + # The path has only one vertex. + sigtxt_path = [sigtxt_path] + + if len(sigtxt_path) == 0: + continue + + # Generate multiple remove paths from all the combinations of the + # signatures of each vertex. + sig_paths = [[]] + broken_flag = False + for sigtxt_node in sigtxt_path: + if isinstance(sigtxt_node, str): + # The vertex has only one signature. + sigtxt_set = {sigtxt_node} + elif isinstance(sigtxt_node, list): + # The vertex has multiple signatures. + sigtxt_set = set(sigtxt_node) + else: + # Assume the format of annotation is verified. There should be no + # invalid case. + assert False + + sig_set = set() + for sigtxt in sigtxt_set: + sig = NormalizeSignature(sigtxt) + if sig is None: + invalid_sigtxts.add(sigtxt) + broken_flag = True + elif not broken_flag: + sig_set.add(sig) + + if broken_flag: + continue + + # Append each signature of the current node to the all previous + # remove paths. + sig_paths = [path + [sig] for path in sig_paths for sig in sig_set] + + if not broken_flag: + # All signatures are normalized. The remove path has no error. + remove_rules.extend(sig_paths) + + return (add_rules, remove_rules, invalid_sigtxts) + + def ResolveAnnotation(self, function_map): + """Resolve annotation. + + Args: + function_map: Function map. + + Returns: + Set of added call edges, list of remove paths, set of eliminated + callsite addresses, set of annotation signatures which can't be resolved. + """ + + def StringifySignature(signature): + """Stringify the tupled signature. + + Args: + signature: Tupled signature. + + Returns: + Signature string. + """ + (name, path, linenum) = signature + bracket_text = "" + if path is not None: + path = os.path.relpath(path) + if linenum is None: + bracket_text = "[{}]".format(path) + else: + bracket_text = "[{}:{}]".format(path, linenum) + + return name + bracket_text + + (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation() + + signature_set = set() + for src_sig, dst_sigs in add_rules.items(): + signature_set.add(src_sig) + signature_set.update(dst_sigs) + + for remove_sigs in remove_rules: + signature_set.update(remove_sigs) + + # Map signatures to functions. + (signature_map, sig_error_map) = self.MapAnnotation(function_map, signature_set) + + # Build the indirect callsite map indexed by callsite signature. + indirect_map = collections.defaultdict(set) + for function in function_map.values(): + for callsite in function.callsites: + if callsite.target is not None: + continue + + # Found an indirect callsite. + line_info = self.AddressToLine(callsite.address)[0] + if line_info is None: + continue + + (name, path, linenum) = line_info + result = self.FUNCTION_PREFIX_NAME_RE.match(name) + if result is None: + continue + + indirect_map[(result.group("name").strip(), path, linenum)].add( + (function, callsite.address) + ) + + # Generate the annotation sets. + add_set = set() + remove_list = list() + eliminated_addrs = set() + + for src_sig, dst_sigs in add_rules.items(): + src_funcs = set(signature_map.get(src_sig, [])) + # Try to match the source signature to the indirect callsites. Even if it + # can't be found in disassembly. + indirect_calls = indirect_map.get(src_sig) + if indirect_calls is not None: + for function, callsite_address in indirect_calls: + # Add the caller of the indirect callsite to the source functions. + src_funcs.add(function) + # Assume each callsite can be represented by a unique address. + eliminated_addrs.add(callsite_address) + + if src_sig in sig_error_map: + # Assume the error is always the not found error. Since the signature + # found in indirect callsite map must be a full signature, it can't + # happen the ambiguous error. + assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND + # Found in inline stack, remove the not found error. + del sig_error_map[src_sig] + + for dst_sig in dst_sigs: + dst_funcs = signature_map.get(dst_sig) + if dst_funcs is None: + continue + + # Duplicate the call edge for all the same source and destination + # functions. + for src_func in src_funcs: + for dst_func in dst_funcs: + add_set.add((src_func, dst_func)) + + for remove_sigs in remove_rules: + # Since each signature can be mapped to multiple functions, generate + # multiple remove paths from all the combinations of these functions. + remove_paths = [[]] + skip_flag = False + for remove_sig in remove_sigs: + # Transform each signature to the corresponding functions. + remove_funcs = signature_map.get(remove_sig) + if remove_funcs is None: + # There is an unresolved signature in the remove path. Ignore the + # whole broken remove path. + skip_flag = True + break + else: + # Append each function of the current signature to the all previous + # remove paths. + remove_paths = [p + [f] for p in remove_paths for f in remove_funcs] + + if skip_flag: + # Ignore the broken remove path. + continue + + for remove_path in remove_paths: + # Deduplicate the remove paths. + if remove_path not in remove_list: + remove_list.append(remove_path) + + # Format the error messages. + failed_sigtxts = set() + for sigtxt in invalid_sigtxts: + failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID)) + + for sig, error in sig_error_map.items(): + failed_sigtxts.add((StringifySignature(sig), error)) + + return (add_set, remove_list, eliminated_addrs, failed_sigtxts) + + def PreprocessAnnotation( + self, function_map, add_set, remove_list, eliminated_addrs + ): + """Preprocess the annotation and callgraph. + + Add the missing call edges, and delete simple remove paths (the paths have + one or two vertices) from the function_map. + + Eliminate the annotated indirect callsites. + + Return the remaining remove list. + + Args: + function_map: Function map. + add_set: Set of missing call edges. + remove_list: List of remove paths. + eliminated_addrs: Set of eliminated callsite addresses. + + Returns: + List of remaining remove paths. + """ + + def CheckEdge(path): + """Check if all edges of the path are on the callgraph. + + Args: + path: Path. + + Returns: + True or False. + """ + for index in range(len(path) - 1): + if (path[index], path[index + 1]) not in edge_set: + return False + + return True + + for src_func, dst_func in add_set: + # TODO(cheyuw): Support tailing call annotation. + src_func.callsites.append(Callsite(None, dst_func.address, False, dst_func)) + + # Delete simple remove paths. + remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2) + edge_set = set() + for function in function_map.values(): + cleaned_callsites = [] + for callsite in function.callsites: + if (callsite.callee,) in remove_simple or ( + function, + callsite.callee, + ) in remove_simple: + continue + + if callsite.target is None and callsite.address in eliminated_addrs: + continue + + cleaned_callsites.append(callsite) + if callsite.callee is not None: + edge_set.add((function, callsite.callee)) + + function.callsites = cleaned_callsites + + return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)] + + def AnalyzeCallGraph(self, function_map, remove_list): + """Analyze callgraph. + + It will update the max stack size and path for each function. + + Args: + function_map: Function map. + remove_list: List of remove paths. + + Returns: + List of function cycles. + """ + + def Traverse(curr_state): + """Traverse the callgraph and calculate the max stack usages of functions. + + Args: + curr_state: Current state. + + Returns: + SCC lowest link. + """ + scc_index = scc_index_counter[0] + scc_index_counter[0] += 1 + scc_index_map[curr_state] = scc_index + scc_lowlink = scc_index + scc_stack.append(curr_state) + # Push the current state in the stack. We can use a set to maintain this + # because the stacked states are unique; otherwise we will find a cycle + # first. + stacked_states.add(curr_state) + + (curr_address, curr_positions) = curr_state + curr_func = function_map[curr_address] + + invalid_flag = False + new_positions = list(curr_positions) + for index, position in enumerate(curr_positions): + remove_path = remove_list[index] + + # The position of each remove path in the state is the length of the + # longest matching path between the prefix of the remove path and the + # suffix of the current traversing path. We maintain this length when + # appending the next callee to the traversing path. And it can be used + # to check if the remove path appears in the traversing path. + + # TODO(cheyuw): Implement KMP algorithm to match remove paths + # efficiently. + if remove_path[position] is curr_func: + # Matches the current function, extend the length. + new_positions[index] = position + 1 + if new_positions[index] == len(remove_path): + # The length of the longest matching path is equal to the length of + # the remove path, which means the suffix of the current traversing + # path matches the remove path. + invalid_flag = True + break + + else: + # We can't get the new longest matching path by extending the previous + # one directly. Fallback to search the new longest matching path. + + # If we can't find any matching path in the following search, reset + # the matching length to 0. + new_positions[index] = 0 + + # We want to find the new longest matching prefix of remove path with + # the suffix of the current traversing path. Because the new longest + # matching path won't be longer than the prevous one now, and part of + # the suffix matches the prefix of remove path, we can get the needed + # suffix from the previous matching prefix of the invalid path. + suffix = remove_path[:position] + [curr_func] + for offset in range(1, len(suffix)): + length = position - offset + if remove_path[:length] == suffix[offset:]: + new_positions[index] = length + break + + new_positions = tuple(new_positions) + + # If the current suffix is invalid, set the max stack usage to 0. + max_stack_usage = 0 + max_callee_state = None + self_loop = False + + if not invalid_flag: + # Max stack usage is at least equal to the stack frame. + max_stack_usage = curr_func.stack_frame + for callsite in curr_func.callsites: + callee = callsite.callee + if callee is None: + continue + + callee_state = (callee.address, new_positions) + if callee_state not in scc_index_map: + # Unvisited state. + scc_lowlink = min(scc_lowlink, Traverse(callee_state)) + elif callee_state in stacked_states: + # The state is shown in the stack. There is a cycle. + sub_stack_usage = 0 + scc_lowlink = min(scc_lowlink, scc_index_map[callee_state]) + if callee_state == curr_state: + self_loop = True + + done_result = done_states.get(callee_state) + if done_result is not None: + # Already done this state and use its result. If the state reaches a + # cycle, reusing the result will cause inaccuracy (the stack usage + # of cycle depends on where the entrance is). But it's fine since we + # can't get accurate stack usage under this situation, and we rely + # on user-provided annotations to break the cycle, after which the + # result will be accurate again. + (sub_stack_usage, _) = done_result + + if callsite.is_tail: + # For tailing call, since the callee reuses the stack frame of the + # caller, choose the larger one directly. + stack_usage = max(curr_func.stack_frame, sub_stack_usage) + else: + stack_usage = curr_func.stack_frame + sub_stack_usage + + if stack_usage > max_stack_usage: + max_stack_usage = stack_usage + max_callee_state = callee_state + + if scc_lowlink == scc_index: + group = [] + while scc_stack[-1] != curr_state: + scc_state = scc_stack.pop() + stacked_states.remove(scc_state) + group.append(scc_state) + + scc_stack.pop() + stacked_states.remove(curr_state) + + # If the cycle is not empty, record it. + if len(group) > 0 or self_loop: + group.append(curr_state) + cycle_groups.append(group) + + # Store the done result. + done_states[curr_state] = (max_stack_usage, max_callee_state) + + if curr_positions == initial_positions: + # If the current state is initial state, we traversed the callgraph by + # using the current function as start point. Update the stack usage of + # the function. + # If the function matches a single vertex remove path, this will set its + # max stack usage to 0, which is not expected (we still calculate its + # max stack usage, but prevent any function from calling it). However, + # all the single vertex remove paths have been preprocessed and removed. + curr_func.stack_max_usage = max_stack_usage + + # Reconstruct the max stack path by traversing the state transitions. + max_stack_path = [curr_func] + callee_state = max_callee_state + while callee_state is not None: + # The first element of state tuple is function address. + max_stack_path.append(function_map[callee_state[0]]) + done_result = done_states.get(callee_state) + # All of the descendants should be done. + assert done_result is not None + (_, callee_state) = done_result + + curr_func.stack_max_path = max_stack_path + + return scc_lowlink + + # The state is the concatenation of the current function address and the + # state of matching position. + initial_positions = (0,) * len(remove_list) + done_states = {} + stacked_states = set() + scc_index_counter = [0] + scc_index_map = {} + scc_stack = [] + cycle_groups = [] + for function in function_map.values(): + if function.stack_max_usage is None: + Traverse((function.address, initial_positions)) + + cycle_functions = [] + for group in cycle_groups: + cycle = set(function_map[state[0]] for state in group) + if cycle not in cycle_functions: + cycle_functions.append(cycle) + + return cycle_functions + + def Analyze(self): + """Run the stack analysis. + + Raises: + StackAnalyzerError: If disassembly fails. + """ + + def OutputInlineStack(address, prefix=""): + """Output beautiful inline stack. + + Args: + address: Address. + prefix: Prefix of each line. + + Returns: + Key for sorting, output text + """ + line_infos = self.AddressToLine(address, True) + + if line_infos[0] is None: + order_key = (None, None) else: - add_rules[src_sig].add(dst_sig) - - if 'remove' in self.annotation and self.annotation['remove'] is not None: - for sigtxt_path in self.annotation['remove']: - if isinstance(sigtxt_path, str): - # The path has only one vertex. - sigtxt_path = [sigtxt_path] - - if len(sigtxt_path) == 0: - continue - - # Generate multiple remove paths from all the combinations of the - # signatures of each vertex. - sig_paths = [[]] - broken_flag = False - for sigtxt_node in sigtxt_path: - if isinstance(sigtxt_node, str): - # The vertex has only one signature. - sigtxt_set = {sigtxt_node} - elif isinstance(sigtxt_node, list): - # The vertex has multiple signatures. - sigtxt_set = set(sigtxt_node) - else: - # Assume the format of annotation is verified. There should be no - # invalid case. - assert False - - sig_set = set() - for sigtxt in sigtxt_set: - sig = NormalizeSignature(sigtxt) - if sig is None: - invalid_sigtxts.add(sigtxt) - broken_flag = True - elif not broken_flag: - sig_set.add(sig) - - if broken_flag: - continue - - # Append each signature of the current node to the all previous - # remove paths. - sig_paths = [path + [sig] for path in sig_paths for sig in sig_set] - - if not broken_flag: - # All signatures are normalized. The remove path has no error. - remove_rules.extend(sig_paths) - - return (add_rules, remove_rules, invalid_sigtxts) + (_, path, linenum) = line_infos[0] + order_key = (linenum, path) + + line_texts = [] + for line_info in reversed(line_infos): + if line_info is None: + (function_name, path, linenum) = ("??", "??", 0) + else: + (function_name, path, linenum) = line_info + + line_texts.append( + "{}[{}:{}]".format(function_name, os.path.relpath(path), linenum) + ) + + output = "{}-> {} {:x}\n".format(prefix, line_texts[0], address) + for depth, line_text in enumerate(line_texts[1:]): + output += "{} {}- {}\n".format(prefix, " " * depth, line_text) + + # Remove the last newline character. + return (order_key, output.rstrip("\n")) + + # Analyze disassembly. + try: + disasm_text = subprocess.check_output( + [self.options.objdump, "-d", self.options.elf_path], encoding="utf-8" + ) + except subprocess.CalledProcessError: + raise StackAnalyzerError("objdump failed to disassemble.") + except OSError: + raise StackAnalyzerError("Failed to run objdump.") + + function_map = self.AnalyzeDisassembly(disasm_text) + result = self.ResolveAnnotation(function_map) + (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result + remove_list = self.PreprocessAnnotation( + function_map, add_set, remove_list, eliminated_addrs + ) + cycle_functions = self.AnalyzeCallGraph(function_map, remove_list) + + # Print the results of task-aware stack analysis. + extra_stack_frame = self.annotation.get( + "exception_frame_size", DEFAULT_EXCEPTION_FRAME_SIZE + ) + for task in self.tasklist: + routine_func = function_map[task.routine_address] + print( + "Task: {}, Max size: {} ({} + {}), Allocated size: {}".format( + task.name, + routine_func.stack_max_usage + extra_stack_frame, + routine_func.stack_max_usage, + extra_stack_frame, + task.stack_max_size, + ) + ) + + print("Call Trace:") + max_stack_path = routine_func.stack_max_path + # Assume the routine function is resolved. + assert max_stack_path is not None + for depth, curr_func in enumerate(max_stack_path): + line_info = self.AddressToLine(curr_func.address)[0] + if line_info is None: + (path, linenum) = ("??", 0) + else: + (_, path, linenum) = line_info + + print( + " {} ({}) [{}:{}] {:x}".format( + curr_func.name, + curr_func.stack_frame, + os.path.relpath(path), + linenum, + curr_func.address, + ) + ) + + if depth + 1 < len(max_stack_path): + succ_func = max_stack_path[depth + 1] + text_list = [] + for callsite in curr_func.callsites: + if callsite.callee is succ_func: + indent_prefix = " " + if callsite.address is None: + order_text = ( + None, + "{}-> [annotation]".format(indent_prefix), + ) + else: + order_text = OutputInlineStack( + callsite.address, indent_prefix + ) + + text_list.append(order_text) + + for _, text in sorted(text_list, key=lambda item: item[0]): + print(text) + + print("Unresolved indirect callsites:") + for function in function_map.values(): + indirect_callsites = [] + for callsite in function.callsites: + if callsite.target is None: + indirect_callsites.append(callsite.address) + + if len(indirect_callsites) > 0: + print(" In function {}:".format(function.name)) + text_list = [] + for address in indirect_callsites: + text_list.append(OutputInlineStack(address, " ")) + + for _, text in sorted(text_list, key=lambda item: item[0]): + print(text) + + print("Unresolved annotation signatures:") + for sigtxt, error in failed_sigtxts: + print(" {}: {}".format(sigtxt, error)) + + if len(cycle_functions) > 0: + print("There are cycles in the following function sets:") + for functions in cycle_functions: + print("[{}]".format(", ".join(function.name for function in functions))) - def ResolveAnnotation(self, function_map): - """Resolve annotation. - Args: - function_map: Function map. +def ParseArgs(): + """Parse commandline arguments. Returns: - Set of added call edges, list of remove paths, set of eliminated - callsite addresses, set of annotation signatures which can't be resolved. + options: Namespace from argparse.parse_args(). """ - def StringifySignature(signature): - """Stringify the tupled signature. - - Args: - signature: Tupled signature. - - Returns: - Signature string. - """ - (name, path, linenum) = signature - bracket_text = '' - if path is not None: - path = os.path.relpath(path) - if linenum is None: - bracket_text = '[{}]'.format(path) - else: - bracket_text = '[{}:{}]'.format(path, linenum) - - return name + bracket_text - - (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation() - - signature_set = set() - for src_sig, dst_sigs in add_rules.items(): - signature_set.add(src_sig) - signature_set.update(dst_sigs) - - for remove_sigs in remove_rules: - signature_set.update(remove_sigs) - - # Map signatures to functions. - (signature_map, sig_error_map) = self.MapAnnotation(function_map, - signature_set) - - # Build the indirect callsite map indexed by callsite signature. - indirect_map = collections.defaultdict(set) - for function in function_map.values(): - for callsite in function.callsites: - if callsite.target is not None: - continue - - # Found an indirect callsite. - line_info = self.AddressToLine(callsite.address)[0] - if line_info is None: - continue - - (name, path, linenum) = line_info - result = self.FUNCTION_PREFIX_NAME_RE.match(name) - if result is None: - continue - - indirect_map[(result.group('name').strip(), path, linenum)].add( - (function, callsite.address)) - - # Generate the annotation sets. - add_set = set() - remove_list = list() - eliminated_addrs = set() - - for src_sig, dst_sigs in add_rules.items(): - src_funcs = set(signature_map.get(src_sig, [])) - # Try to match the source signature to the indirect callsites. Even if it - # can't be found in disassembly. - indirect_calls = indirect_map.get(src_sig) - if indirect_calls is not None: - for function, callsite_address in indirect_calls: - # Add the caller of the indirect callsite to the source functions. - src_funcs.add(function) - # Assume each callsite can be represented by a unique address. - eliminated_addrs.add(callsite_address) - - if src_sig in sig_error_map: - # Assume the error is always the not found error. Since the signature - # found in indirect callsite map must be a full signature, it can't - # happen the ambiguous error. - assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND - # Found in inline stack, remove the not found error. - del sig_error_map[src_sig] - - for dst_sig in dst_sigs: - dst_funcs = signature_map.get(dst_sig) - if dst_funcs is None: - continue - - # Duplicate the call edge for all the same source and destination - # functions. - for src_func in src_funcs: - for dst_func in dst_funcs: - add_set.add((src_func, dst_func)) - - for remove_sigs in remove_rules: - # Since each signature can be mapped to multiple functions, generate - # multiple remove paths from all the combinations of these functions. - remove_paths = [[]] - skip_flag = False - for remove_sig in remove_sigs: - # Transform each signature to the corresponding functions. - remove_funcs = signature_map.get(remove_sig) - if remove_funcs is None: - # There is an unresolved signature in the remove path. Ignore the - # whole broken remove path. - skip_flag = True - break - else: - # Append each function of the current signature to the all previous - # remove paths. - remove_paths = [p + [f] for p in remove_paths for f in remove_funcs] - - if skip_flag: - # Ignore the broken remove path. - continue - - for remove_path in remove_paths: - # Deduplicate the remove paths. - if remove_path not in remove_list: - remove_list.append(remove_path) - - # Format the error messages. - failed_sigtxts = set() - for sigtxt in invalid_sigtxts: - failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID)) - - for sig, error in sig_error_map.items(): - failed_sigtxts.add((StringifySignature(sig), error)) - - return (add_set, remove_list, eliminated_addrs, failed_sigtxts) - - def PreprocessAnnotation(self, function_map, add_set, remove_list, - eliminated_addrs): - """Preprocess the annotation and callgraph. + parser = argparse.ArgumentParser(description="EC firmware stack analyzer.") + parser.add_argument("elf_path", help="the path of EC firmware ELF") + parser.add_argument( + "--export_taskinfo", + required=True, + help="the path of export_taskinfo.so utility", + ) + parser.add_argument( + "--section", + required=True, + help="the section.", + choices=[SECTION_RO, SECTION_RW], + ) + parser.add_argument("--objdump", default="objdump", help="the path of objdump") + parser.add_argument( + "--addr2line", default="addr2line", help="the path of addr2line" + ) + parser.add_argument( + "--annotation", default=None, help="the path of annotation file" + ) + + # TODO(cheyuw): Add an option for dumping stack usage of all functions. + + return parser.parse_args() - Add the missing call edges, and delete simple remove paths (the paths have - one or two vertices) from the function_map. - Eliminate the annotated indirect callsites. - - Return the remaining remove list. +def ParseSymbolText(symbol_text): + """Parse the content of the symbol text. Args: - function_map: Function map. - add_set: Set of missing call edges. - remove_list: List of remove paths. - eliminated_addrs: Set of eliminated callsite addresses. + symbol_text: Text of the symbols. Returns: - List of remaining remove paths. + symbols: Symbol list. """ - def CheckEdge(path): - """Check if all edges of the path are on the callgraph. - - Args: - path: Path. - - Returns: - True or False. - """ - for index in range(len(path) - 1): - if (path[index], path[index + 1]) not in edge_set: - return False - - return True - - for src_func, dst_func in add_set: - # TODO(cheyuw): Support tailing call annotation. - src_func.callsites.append( - Callsite(None, dst_func.address, False, dst_func)) - - # Delete simple remove paths. - remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2) - edge_set = set() - for function in function_map.values(): - cleaned_callsites = [] - for callsite in function.callsites: - if ((callsite.callee,) in remove_simple or - (function, callsite.callee) in remove_simple): - continue - - if callsite.target is None and callsite.address in eliminated_addrs: - continue - - cleaned_callsites.append(callsite) - if callsite.callee is not None: - edge_set.add((function, callsite.callee)) + # Example: "10093064 g F .text 0000015c .hidden hook_task" + symbol_regex = re.compile( + r"^(?P<address>[0-9A-Fa-f]+)\s+[lwg]\s+" + r"((?P<type>[OF])\s+)?\S+\s+" + r"(?P<size>[0-9A-Fa-f]+)\s+" + r"(\S+\s+)?(?P<name>\S+)$" + ) + + symbols = [] + for line in symbol_text.splitlines(): + line = line.strip() + result = symbol_regex.match(line) + if result is not None: + address = int(result.group("address"), 16) + symtype = result.group("type") + if symtype is None: + symtype = "O" - function.callsites = cleaned_callsites + size = int(result.group("size"), 16) + name = result.group("name") + symbols.append(Symbol(address, symtype, size, name)) - return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)] + return symbols - def AnalyzeCallGraph(self, function_map, remove_list): - """Analyze callgraph. - It will update the max stack size and path for each function. +def ParseRoDataText(rodata_text): + """Parse the content of rodata Args: - function_map: Function map. - remove_list: List of remove paths. + symbol_text: Text of the rodata dump. Returns: - List of function cycles. + symbols: Symbol list. """ - def Traverse(curr_state): - """Traverse the callgraph and calculate the max stack usages of functions. - - Args: - curr_state: Current state. - - Returns: - SCC lowest link. - """ - scc_index = scc_index_counter[0] - scc_index_counter[0] += 1 - scc_index_map[curr_state] = scc_index - scc_lowlink = scc_index - scc_stack.append(curr_state) - # Push the current state in the stack. We can use a set to maintain this - # because the stacked states are unique; otherwise we will find a cycle - # first. - stacked_states.add(curr_state) - - (curr_address, curr_positions) = curr_state - curr_func = function_map[curr_address] - - invalid_flag = False - new_positions = list(curr_positions) - for index, position in enumerate(curr_positions): - remove_path = remove_list[index] - - # The position of each remove path in the state is the length of the - # longest matching path between the prefix of the remove path and the - # suffix of the current traversing path. We maintain this length when - # appending the next callee to the traversing path. And it can be used - # to check if the remove path appears in the traversing path. - - # TODO(cheyuw): Implement KMP algorithm to match remove paths - # efficiently. - if remove_path[position] is curr_func: - # Matches the current function, extend the length. - new_positions[index] = position + 1 - if new_positions[index] == len(remove_path): - # The length of the longest matching path is equal to the length of - # the remove path, which means the suffix of the current traversing - # path matches the remove path. - invalid_flag = True - break - - else: - # We can't get the new longest matching path by extending the previous - # one directly. Fallback to search the new longest matching path. - - # If we can't find any matching path in the following search, reset - # the matching length to 0. - new_positions[index] = 0 - - # We want to find the new longest matching prefix of remove path with - # the suffix of the current traversing path. Because the new longest - # matching path won't be longer than the prevous one now, and part of - # the suffix matches the prefix of remove path, we can get the needed - # suffix from the previous matching prefix of the invalid path. - suffix = remove_path[:position] + [curr_func] - for offset in range(1, len(suffix)): - length = position - offset - if remove_path[:length] == suffix[offset:]: - new_positions[index] = length - break - - new_positions = tuple(new_positions) - - # If the current suffix is invalid, set the max stack usage to 0. - max_stack_usage = 0 - max_callee_state = None - self_loop = False - - if not invalid_flag: - # Max stack usage is at least equal to the stack frame. - max_stack_usage = curr_func.stack_frame - for callsite in curr_func.callsites: - callee = callsite.callee - if callee is None: + # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K... + # 100a7294 00000000 00000000 01000000 ............ + + base_offset = None + offset = None + rodata = [] + for line in rodata_text.splitlines(): + line = line.strip() + space = line.find(" ") + if space < 0: + continue + try: + address = int(line[0:space], 16) + except ValueError: continue - callee_state = (callee.address, new_positions) - if callee_state not in scc_index_map: - # Unvisited state. - scc_lowlink = min(scc_lowlink, Traverse(callee_state)) - elif callee_state in stacked_states: - # The state is shown in the stack. There is a cycle. - sub_stack_usage = 0 - scc_lowlink = min(scc_lowlink, scc_index_map[callee_state]) - if callee_state == curr_state: - self_loop = True - - done_result = done_states.get(callee_state) - if done_result is not None: - # Already done this state and use its result. If the state reaches a - # cycle, reusing the result will cause inaccuracy (the stack usage - # of cycle depends on where the entrance is). But it's fine since we - # can't get accurate stack usage under this situation, and we rely - # on user-provided annotations to break the cycle, after which the - # result will be accurate again. - (sub_stack_usage, _) = done_result - - if callsite.is_tail: - # For tailing call, since the callee reuses the stack frame of the - # caller, choose the larger one directly. - stack_usage = max(curr_func.stack_frame, sub_stack_usage) - else: - stack_usage = curr_func.stack_frame + sub_stack_usage - - if stack_usage > max_stack_usage: - max_stack_usage = stack_usage - max_callee_state = callee_state - - if scc_lowlink == scc_index: - group = [] - while scc_stack[-1] != curr_state: - scc_state = scc_stack.pop() - stacked_states.remove(scc_state) - group.append(scc_state) - - scc_stack.pop() - stacked_states.remove(curr_state) - - # If the cycle is not empty, record it. - if len(group) > 0 or self_loop: - group.append(curr_state) - cycle_groups.append(group) - - # Store the done result. - done_states[curr_state] = (max_stack_usage, max_callee_state) - - if curr_positions == initial_positions: - # If the current state is initial state, we traversed the callgraph by - # using the current function as start point. Update the stack usage of - # the function. - # If the function matches a single vertex remove path, this will set its - # max stack usage to 0, which is not expected (we still calculate its - # max stack usage, but prevent any function from calling it). However, - # all the single vertex remove paths have been preprocessed and removed. - curr_func.stack_max_usage = max_stack_usage - - # Reconstruct the max stack path by traversing the state transitions. - max_stack_path = [curr_func] - callee_state = max_callee_state - while callee_state is not None: - # The first element of state tuple is function address. - max_stack_path.append(function_map[callee_state[0]]) - done_result = done_states.get(callee_state) - # All of the descendants should be done. - assert done_result is not None - (_, callee_state) = done_result - - curr_func.stack_max_path = max_stack_path - - return scc_lowlink - - # The state is the concatenation of the current function address and the - # state of matching position. - initial_positions = (0,) * len(remove_list) - done_states = {} - stacked_states = set() - scc_index_counter = [0] - scc_index_map = {} - scc_stack = [] - cycle_groups = [] - for function in function_map.values(): - if function.stack_max_usage is None: - Traverse((function.address, initial_positions)) - - cycle_functions = [] - for group in cycle_groups: - cycle = set(function_map[state[0]] for state in group) - if cycle not in cycle_functions: - cycle_functions.append(cycle) - - return cycle_functions - - def Analyze(self): - """Run the stack analysis. - - Raises: - StackAnalyzerError: If disassembly fails. - """ - def OutputInlineStack(address, prefix=''): - """Output beautiful inline stack. - - Args: - address: Address. - prefix: Prefix of each line. - - Returns: - Key for sorting, output text - """ - line_infos = self.AddressToLine(address, True) - - if line_infos[0] is None: - order_key = (None, None) - else: - (_, path, linenum) = line_infos[0] - order_key = (linenum, path) - - line_texts = [] - for line_info in reversed(line_infos): - if line_info is None: - (function_name, path, linenum) = ('??', '??', 0) - else: - (function_name, path, linenum) = line_info - - line_texts.append('{}[{}:{}]'.format(function_name, - os.path.relpath(path), - linenum)) - - output = '{}-> {} {:x}\n'.format(prefix, line_texts[0], address) - for depth, line_text in enumerate(line_texts[1:]): - output += '{} {}- {}\n'.format(prefix, ' ' * depth, line_text) - - # Remove the last newline character. - return (order_key, output.rstrip('\n')) - - # Analyze disassembly. - try: - disasm_text = subprocess.check_output([self.options.objdump, - '-d', - self.options.elf_path], - encoding='utf-8') - except subprocess.CalledProcessError: - raise StackAnalyzerError('objdump failed to disassemble.') - except OSError: - raise StackAnalyzerError('Failed to run objdump.') - - function_map = self.AnalyzeDisassembly(disasm_text) - result = self.ResolveAnnotation(function_map) - (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result - remove_list = self.PreprocessAnnotation(function_map, - add_set, - remove_list, - eliminated_addrs) - cycle_functions = self.AnalyzeCallGraph(function_map, remove_list) - - # Print the results of task-aware stack analysis. - extra_stack_frame = self.annotation.get('exception_frame_size', - DEFAULT_EXCEPTION_FRAME_SIZE) - for task in self.tasklist: - routine_func = function_map[task.routine_address] - print('Task: {}, Max size: {} ({} + {}), Allocated size: {}'.format( - task.name, - routine_func.stack_max_usage + extra_stack_frame, - routine_func.stack_max_usage, - extra_stack_frame, - task.stack_max_size)) - - print('Call Trace:') - max_stack_path = routine_func.stack_max_path - # Assume the routine function is resolved. - assert max_stack_path is not None - for depth, curr_func in enumerate(max_stack_path): - line_info = self.AddressToLine(curr_func.address)[0] - if line_info is None: - (path, linenum) = ('??', 0) - else: - (_, path, linenum) = line_info - - print(' {} ({}) [{}:{}] {:x}'.format(curr_func.name, - curr_func.stack_frame, - os.path.relpath(path), - linenum, - curr_func.address)) - - if depth + 1 < len(max_stack_path): - succ_func = max_stack_path[depth + 1] - text_list = [] - for callsite in curr_func.callsites: - if callsite.callee is succ_func: - indent_prefix = ' ' - if callsite.address is None: - order_text = (None, '{}-> [annotation]'.format(indent_prefix)) - else: - order_text = OutputInlineStack(callsite.address, indent_prefix) - - text_list.append(order_text) - - for _, text in sorted(text_list, key=lambda item: item[0]): - print(text) - - print('Unresolved indirect callsites:') - for function in function_map.values(): - indirect_callsites = [] - for callsite in function.callsites: - if callsite.target is None: - indirect_callsites.append(callsite.address) - - if len(indirect_callsites) > 0: - print(' In function {}:'.format(function.name)) - text_list = [] - for address in indirect_callsites: - text_list.append(OutputInlineStack(address, ' ')) - - for _, text in sorted(text_list, key=lambda item: item[0]): - print(text) - - print('Unresolved annotation signatures:') - for sigtxt, error in failed_sigtxts: - print(' {}: {}'.format(sigtxt, error)) - - if len(cycle_functions) > 0: - print('There are cycles in the following function sets:') - for functions in cycle_functions: - print('[{}]'.format(', '.join(function.name for function in functions))) - - -def ParseArgs(): - """Parse commandline arguments. - - Returns: - options: Namespace from argparse.parse_args(). - """ - parser = argparse.ArgumentParser(description="EC firmware stack analyzer.") - parser.add_argument('elf_path', help="the path of EC firmware ELF") - parser.add_argument('--export_taskinfo', required=True, - help="the path of export_taskinfo.so utility") - parser.add_argument('--section', required=True, help='the section.', - choices=[SECTION_RO, SECTION_RW]) - parser.add_argument('--objdump', default='objdump', - help='the path of objdump') - parser.add_argument('--addr2line', default='addr2line', - help='the path of addr2line') - parser.add_argument('--annotation', default=None, - help='the path of annotation file') - - # TODO(cheyuw): Add an option for dumping stack usage of all functions. - - return parser.parse_args() - - -def ParseSymbolText(symbol_text): - """Parse the content of the symbol text. - - Args: - symbol_text: Text of the symbols. - - Returns: - symbols: Symbol list. - """ - # Example: "10093064 g F .text 0000015c .hidden hook_task" - symbol_regex = re.compile(r'^(?P<address>[0-9A-Fa-f]+)\s+[lwg]\s+' - r'((?P<type>[OF])\s+)?\S+\s+' - r'(?P<size>[0-9A-Fa-f]+)\s+' - r'(\S+\s+)?(?P<name>\S+)$') - - symbols = [] - for line in symbol_text.splitlines(): - line = line.strip() - result = symbol_regex.match(line) - if result is not None: - address = int(result.group('address'), 16) - symtype = result.group('type') - if symtype is None: - symtype = 'O' - - size = int(result.group('size'), 16) - name = result.group('name') - symbols.append(Symbol(address, symtype, size, name)) - - return symbols - - -def ParseRoDataText(rodata_text): - """Parse the content of rodata - - Args: - symbol_text: Text of the rodata dump. - - Returns: - symbols: Symbol list. - """ - # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K... - # 100a7294 00000000 00000000 01000000 ............ - - base_offset = None - offset = None - rodata = [] - for line in rodata_text.splitlines(): - line = line.strip() - space = line.find(' ') - if space < 0: - continue - try: - address = int(line[0:space], 16) - except ValueError: - continue - - if not base_offset: - base_offset = address - offset = address - elif address != offset: - raise StackAnalyzerError('objdump of rodata not contiguous.') + if not base_offset: + base_offset = address + offset = address + elif address != offset: + raise StackAnalyzerError("objdump of rodata not contiguous.") - for i in range(0, 4): - num = line[(space + 1 + i*9):(space + 9 + i*9)] - if len(num.strip()) > 0: - val = int(num, 16) - else: - val = 0 - # TODO(drinkcat): Not all platforms are necessarily big-endian - rodata.append((val & 0x000000ff) << 24 | - (val & 0x0000ff00) << 8 | - (val & 0x00ff0000) >> 8 | - (val & 0xff000000) >> 24) + for i in range(0, 4): + num = line[(space + 1 + i * 9) : (space + 9 + i * 9)] + if len(num.strip()) > 0: + val = int(num, 16) + else: + val = 0 + # TODO(drinkcat): Not all platforms are necessarily big-endian + rodata.append( + (val & 0x000000FF) << 24 + | (val & 0x0000FF00) << 8 + | (val & 0x00FF0000) >> 8 + | (val & 0xFF000000) >> 24 + ) - offset = offset + 4*4 + offset = offset + 4 * 4 - return (base_offset, rodata) + return (base_offset, rodata) def LoadTasklist(section, export_taskinfo, symbols): - """Load the task information. + """Load the task information. - Args: - section: Section (RO | RW). - export_taskinfo: Handle of export_taskinfo.so. - symbols: Symbol list. + Args: + section: Section (RO | RW). + export_taskinfo: Handle of export_taskinfo.so. + symbols: Symbol list. - Returns: - tasklist: Task list. - """ + Returns: + tasklist: Task list. + """ - TaskInfoPointer = ctypes.POINTER(TaskInfo) - taskinfos = TaskInfoPointer() - if section == SECTION_RO: - get_taskinfos_func = export_taskinfo.get_ro_taskinfos - else: - get_taskinfos_func = export_taskinfo.get_rw_taskinfos + TaskInfoPointer = ctypes.POINTER(TaskInfo) + taskinfos = TaskInfoPointer() + if section == SECTION_RO: + get_taskinfos_func = export_taskinfo.get_ro_taskinfos + else: + get_taskinfos_func = export_taskinfo.get_rw_taskinfos - taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos)) + taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos)) - tasklist = [] - for index in range(taskinfo_num): - taskinfo = taskinfos[index] - tasklist.append(Task(taskinfo.name.decode('utf-8'), - taskinfo.routine.decode('utf-8'), - taskinfo.stack_size)) + tasklist = [] + for index in range(taskinfo_num): + taskinfo = taskinfos[index] + tasklist.append( + Task( + taskinfo.name.decode("utf-8"), + taskinfo.routine.decode("utf-8"), + taskinfo.stack_size, + ) + ) - # Resolve routine address for each task. It's more efficient to resolve all - # routine addresses of tasks together. - routine_map = dict((task.routine_name, None) for task in tasklist) + # Resolve routine address for each task. It's more efficient to resolve all + # routine addresses of tasks together. + routine_map = dict((task.routine_name, None) for task in tasklist) - for symbol in symbols: - # Resolve task routine address. - if symbol.name in routine_map: - # Assume the symbol of routine is unique. - assert routine_map[symbol.name] is None - routine_map[symbol.name] = symbol.address + for symbol in symbols: + # Resolve task routine address. + if symbol.name in routine_map: + # Assume the symbol of routine is unique. + assert routine_map[symbol.name] is None + routine_map[symbol.name] = symbol.address - for task in tasklist: - address = routine_map[task.routine_name] - # Assume we have resolved all routine addresses. - assert address is not None - task.routine_address = address + for task in tasklist: + address = routine_map[task.routine_name] + # Assume we have resolved all routine addresses. + assert address is not None + task.routine_address = address - return tasklist + return tasklist def main(): - """Main function.""" - try: - options = ParseArgs() - - # Load annotation config. - if options.annotation is None: - annotation = {} - elif not os.path.exists(options.annotation): - print('Warning: Annotation file {} does not exist.' - .format(options.annotation)) - annotation = {} - else: - try: - with open(options.annotation, 'r') as annotation_file: - annotation = yaml.safe_load(annotation_file) - - except yaml.YAMLError: - raise StackAnalyzerError('Failed to parse annotation file {}.' - .format(options.annotation)) - except IOError: - raise StackAnalyzerError('Failed to open annotation file {}.' - .format(options.annotation)) - - # TODO(cheyuw): Do complete annotation format verification. - if not isinstance(annotation, dict): - raise StackAnalyzerError('Invalid annotation file {}.' - .format(options.annotation)) - - # Generate and parse the symbols. + """Main function.""" try: - symbol_text = subprocess.check_output([options.objdump, - '-t', - options.elf_path], - encoding='utf-8') - rodata_text = subprocess.check_output([options.objdump, - '-s', - '-j', '.rodata', - options.elf_path], - encoding='utf-8') - except subprocess.CalledProcessError: - raise StackAnalyzerError('objdump failed to dump symbol table or rodata.') - except OSError: - raise StackAnalyzerError('Failed to run objdump.') - - symbols = ParseSymbolText(symbol_text) - rodata = ParseRoDataText(rodata_text) - - # Load the tasklist. - try: - export_taskinfo = ctypes.CDLL(options.export_taskinfo) - except OSError: - raise StackAnalyzerError('Failed to load export_taskinfo.') - - tasklist = LoadTasklist(options.section, export_taskinfo, symbols) - - analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation) - analyzer.Analyze() - except StackAnalyzerError as e: - print('Error: {}'.format(e)) - - -if __name__ == '__main__': - main() + options = ParseArgs() + + # Load annotation config. + if options.annotation is None: + annotation = {} + elif not os.path.exists(options.annotation): + print( + "Warning: Annotation file {} does not exist.".format(options.annotation) + ) + annotation = {} + else: + try: + with open(options.annotation, "r") as annotation_file: + annotation = yaml.safe_load(annotation_file) + + except yaml.YAMLError: + raise StackAnalyzerError( + "Failed to parse annotation file {}.".format(options.annotation) + ) + except IOError: + raise StackAnalyzerError( + "Failed to open annotation file {}.".format(options.annotation) + ) + + # TODO(cheyuw): Do complete annotation format verification. + if not isinstance(annotation, dict): + raise StackAnalyzerError( + "Invalid annotation file {}.".format(options.annotation) + ) + + # Generate and parse the symbols. + try: + symbol_text = subprocess.check_output( + [options.objdump, "-t", options.elf_path], encoding="utf-8" + ) + rodata_text = subprocess.check_output( + [options.objdump, "-s", "-j", ".rodata", options.elf_path], + encoding="utf-8", + ) + except subprocess.CalledProcessError: + raise StackAnalyzerError("objdump failed to dump symbol table or rodata.") + except OSError: + raise StackAnalyzerError("Failed to run objdump.") + + symbols = ParseSymbolText(symbol_text) + rodata = ParseRoDataText(rodata_text) + + # Load the tasklist. + try: + export_taskinfo = ctypes.CDLL(options.export_taskinfo) + except OSError: + raise StackAnalyzerError("Failed to load export_taskinfo.") + + tasklist = LoadTasklist(options.section, export_taskinfo, symbols) + + analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation) + analyzer.Analyze() + except StackAnalyzerError as e: + print("Error: {}".format(e)) + + +if __name__ == "__main__": + main() diff --git a/extra/stack_analyzer/stack_analyzer_unittest.py b/extra/stack_analyzer/stack_analyzer_unittest.py index c36fa9da45..ad2837a8a4 100755 --- a/extra/stack_analyzer/stack_analyzer_unittest.py +++ b/extra/stack_analyzer/stack_analyzer_unittest.py @@ -11,820 +11,924 @@ from __future__ import print_function -import mock import os import subprocess import unittest +import mock import stack_analyzer as sa class ObjectTest(unittest.TestCase): - """Tests for classes of basic objects.""" - - def testTask(self): - task_a = sa.Task('a', 'a_task', 1234) - task_b = sa.Task('b', 'b_task', 5678, 0x1000) - self.assertEqual(task_a, task_a) - self.assertNotEqual(task_a, task_b) - self.assertNotEqual(task_a, None) - - def testSymbol(self): - symbol_a = sa.Symbol(0x1234, 'F', 32, 'a') - symbol_b = sa.Symbol(0x234, 'O', 42, 'b') - self.assertEqual(symbol_a, symbol_a) - self.assertNotEqual(symbol_a, symbol_b) - self.assertNotEqual(symbol_a, None) - - def testCallsite(self): - callsite_a = sa.Callsite(0x1002, 0x3000, False) - callsite_b = sa.Callsite(0x1002, 0x3000, True) - self.assertEqual(callsite_a, callsite_a) - self.assertNotEqual(callsite_a, callsite_b) - self.assertNotEqual(callsite_a, None) - - def testFunction(self): - func_a = sa.Function(0x100, 'a', 0, []) - func_b = sa.Function(0x200, 'b', 0, []) - self.assertEqual(func_a, func_a) - self.assertNotEqual(func_a, func_b) - self.assertNotEqual(func_a, None) + """Tests for classes of basic objects.""" + + def testTask(self): + task_a = sa.Task("a", "a_task", 1234) + task_b = sa.Task("b", "b_task", 5678, 0x1000) + self.assertEqual(task_a, task_a) + self.assertNotEqual(task_a, task_b) + self.assertNotEqual(task_a, None) + + def testSymbol(self): + symbol_a = sa.Symbol(0x1234, "F", 32, "a") + symbol_b = sa.Symbol(0x234, "O", 42, "b") + self.assertEqual(symbol_a, symbol_a) + self.assertNotEqual(symbol_a, symbol_b) + self.assertNotEqual(symbol_a, None) + + def testCallsite(self): + callsite_a = sa.Callsite(0x1002, 0x3000, False) + callsite_b = sa.Callsite(0x1002, 0x3000, True) + self.assertEqual(callsite_a, callsite_a) + self.assertNotEqual(callsite_a, callsite_b) + self.assertNotEqual(callsite_a, None) + + def testFunction(self): + func_a = sa.Function(0x100, "a", 0, []) + func_b = sa.Function(0x200, "b", 0, []) + self.assertEqual(func_a, func_a) + self.assertNotEqual(func_a, func_b) + self.assertNotEqual(func_a, None) class ArmAnalyzerTest(unittest.TestCase): - """Tests for class ArmAnalyzer.""" - - def AppendConditionCode(self, opcodes): - rets = [] - for opcode in opcodes: - rets.extend(opcode + cc for cc in sa.ArmAnalyzer.CONDITION_CODES) - - return rets - - def testInstructionMatching(self): - jump_list = self.AppendConditionCode(['b', 'bx']) - jump_list += (list(opcode + '.n' for opcode in jump_list) + - list(opcode + '.w' for opcode in jump_list)) - for opcode in jump_list: - self.assertIsNotNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match(opcode)) - - self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match('bl')) - self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match('blx')) - - cbz_list = ['cbz', 'cbnz', 'cbz.n', 'cbnz.n', 'cbz.w', 'cbnz.w'] - for opcode in cbz_list: - self.assertIsNotNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode)) - - self.assertIsNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match('cbn')) - - call_list = self.AppendConditionCode(['bl', 'blx']) - call_list += list(opcode + '.n' for opcode in call_list) - for opcode in call_list: - self.assertIsNotNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match(opcode)) - - self.assertIsNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match('ble')) - - result = sa.ArmAnalyzer.CALL_OPERAND_RE.match('53f90 <get_time+0x18>') - self.assertIsNotNone(result) - self.assertEqual(result.group(1), '53f90') - self.assertEqual(result.group(2), 'get_time+0x18') - - result = sa.ArmAnalyzer.CBZ_CBNZ_OPERAND_RE.match('r6, 53f90 <get+0x0>') - self.assertIsNotNone(result) - self.assertEqual(result.group(1), '53f90') - self.assertEqual(result.group(2), 'get+0x0') - - self.assertIsNotNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match('push')) - self.assertIsNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match('pushal')) - self.assertIsNotNone(sa.ArmAnalyzer.STM_OPCODE_RE.match('stmdb')) - self.assertIsNone(sa.ArmAnalyzer.STM_OPCODE_RE.match('lstm')) - self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('sub')) - self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('subs')) - self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('subw')) - self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('sub.w')) - self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('subs.w')) - - result = sa.ArmAnalyzer.SUB_OPERAND_RE.match('sp, sp, #1668 ; 0x684') - self.assertIsNotNone(result) - self.assertEqual(result.group(1), '1668') - result = sa.ArmAnalyzer.SUB_OPERAND_RE.match('sp, #1668') - self.assertIsNotNone(result) - self.assertEqual(result.group(1), '1668') - self.assertIsNone(sa.ArmAnalyzer.SUB_OPERAND_RE.match('sl, #1668')) - - def testAnalyzeFunction(self): - analyzer = sa.ArmAnalyzer() - symbol = sa.Symbol(0x10, 'F', 0x100, 'foo') - instructions = [ - (0x10, 'push', '{r4, r5, r6, r7, lr}'), - (0x12, 'subw', 'sp, sp, #16 ; 0x10'), - (0x16, 'movs', 'lr, r1'), - (0x18, 'beq.n', '26 <foo+0x26>'), - (0x1a, 'bl', '30 <foo+0x30>'), - (0x1e, 'bl', 'deadbeef <bar>'), - (0x22, 'blx', '0 <woo>'), - (0x26, 'push', '{r1}'), - (0x28, 'stmdb', 'sp!, {r4, r5, r6, r7, r8, r9, lr}'), - (0x2c, 'stmdb', 'sp!, {r4}'), - (0x30, 'stmdb', 'sp, {r4}'), - (0x34, 'bx.n', '10 <foo>'), - (0x36, 'bx.n', 'r3'), - (0x38, 'ldr', 'pc, [r10]'), - ] - (size, callsites) = analyzer.AnalyzeFunction(symbol, instructions) - self.assertEqual(size, 72) - expect_callsites = [sa.Callsite(0x1e, 0xdeadbeef, False), - sa.Callsite(0x22, 0x0, False), - sa.Callsite(0x34, 0x10, True), - sa.Callsite(0x36, None, True), - sa.Callsite(0x38, None, True)] - self.assertEqual(callsites, expect_callsites) + """Tests for class ArmAnalyzer.""" + + def AppendConditionCode(self, opcodes): + rets = [] + for opcode in opcodes: + rets.extend(opcode + cc for cc in sa.ArmAnalyzer.CONDITION_CODES) + + return rets + + def testInstructionMatching(self): + jump_list = self.AppendConditionCode(["b", "bx"]) + jump_list += list(opcode + ".n" for opcode in jump_list) + list( + opcode + ".w" for opcode in jump_list + ) + for opcode in jump_list: + self.assertIsNotNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match(opcode)) + + self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match("bl")) + self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match("blx")) + + cbz_list = ["cbz", "cbnz", "cbz.n", "cbnz.n", "cbz.w", "cbnz.w"] + for opcode in cbz_list: + self.assertIsNotNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode)) + + self.assertIsNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match("cbn")) + + call_list = self.AppendConditionCode(["bl", "blx"]) + call_list += list(opcode + ".n" for opcode in call_list) + for opcode in call_list: + self.assertIsNotNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match(opcode)) + + self.assertIsNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match("ble")) + + result = sa.ArmAnalyzer.CALL_OPERAND_RE.match("53f90 <get_time+0x18>") + self.assertIsNotNone(result) + self.assertEqual(result.group(1), "53f90") + self.assertEqual(result.group(2), "get_time+0x18") + + result = sa.ArmAnalyzer.CBZ_CBNZ_OPERAND_RE.match("r6, 53f90 <get+0x0>") + self.assertIsNotNone(result) + self.assertEqual(result.group(1), "53f90") + self.assertEqual(result.group(2), "get+0x0") + + self.assertIsNotNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match("push")) + self.assertIsNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match("pushal")) + self.assertIsNotNone(sa.ArmAnalyzer.STM_OPCODE_RE.match("stmdb")) + self.assertIsNone(sa.ArmAnalyzer.STM_OPCODE_RE.match("lstm")) + self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("sub")) + self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("subs")) + self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("subw")) + self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("sub.w")) + self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("subs.w")) + + result = sa.ArmAnalyzer.SUB_OPERAND_RE.match("sp, sp, #1668 ; 0x684") + self.assertIsNotNone(result) + self.assertEqual(result.group(1), "1668") + result = sa.ArmAnalyzer.SUB_OPERAND_RE.match("sp, #1668") + self.assertIsNotNone(result) + self.assertEqual(result.group(1), "1668") + self.assertIsNone(sa.ArmAnalyzer.SUB_OPERAND_RE.match("sl, #1668")) + + def testAnalyzeFunction(self): + analyzer = sa.ArmAnalyzer() + symbol = sa.Symbol(0x10, "F", 0x100, "foo") + instructions = [ + (0x10, "push", "{r4, r5, r6, r7, lr}"), + (0x12, "subw", "sp, sp, #16 ; 0x10"), + (0x16, "movs", "lr, r1"), + (0x18, "beq.n", "26 <foo+0x26>"), + (0x1A, "bl", "30 <foo+0x30>"), + (0x1E, "bl", "deadbeef <bar>"), + (0x22, "blx", "0 <woo>"), + (0x26, "push", "{r1}"), + (0x28, "stmdb", "sp!, {r4, r5, r6, r7, r8, r9, lr}"), + (0x2C, "stmdb", "sp!, {r4}"), + (0x30, "stmdb", "sp, {r4}"), + (0x34, "bx.n", "10 <foo>"), + (0x36, "bx.n", "r3"), + (0x38, "ldr", "pc, [r10]"), + ] + (size, callsites) = analyzer.AnalyzeFunction(symbol, instructions) + self.assertEqual(size, 72) + expect_callsites = [ + sa.Callsite(0x1E, 0xDEADBEEF, False), + sa.Callsite(0x22, 0x0, False), + sa.Callsite(0x34, 0x10, True), + sa.Callsite(0x36, None, True), + sa.Callsite(0x38, None, True), + ] + self.assertEqual(callsites, expect_callsites) class StackAnalyzerTest(unittest.TestCase): - """Tests for class StackAnalyzer.""" - - def setUp(self): - symbols = [sa.Symbol(0x1000, 'F', 0x15C, 'hook_task'), - sa.Symbol(0x2000, 'F', 0x51C, 'console_task'), - sa.Symbol(0x3200, 'O', 0x124, '__just_data'), - sa.Symbol(0x4000, 'F', 0x11C, 'touchpad_calc'), - sa.Symbol(0x5000, 'F', 0x12C, 'touchpad_calc.constprop.42'), - sa.Symbol(0x12000, 'F', 0x13C, 'trackpad_range'), - sa.Symbol(0x13000, 'F', 0x200, 'inlined_mul'), - sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul'), - sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul_alias'), - sa.Symbol(0x20000, 'O', 0x0, '__array'), - sa.Symbol(0x20010, 'O', 0x0, '__array_end'), - ] - tasklist = [sa.Task('HOOKS', 'hook_task', 2048, 0x1000), - sa.Task('CONSOLE', 'console_task', 460, 0x2000)] - # Array at 0x20000 that contains pointers to hook_task and console_task, - # with stride=8, offset=4 - rodata = (0x20000, [ 0xDEAD1000, 0x00001000, 0xDEAD2000, 0x00002000 ]) - options = mock.MagicMock(elf_path='./ec.RW.elf', - export_taskinfo='fake', - section='RW', - objdump='objdump', - addr2line='addr2line', - annotation=None) - self.analyzer = sa.StackAnalyzer(options, symbols, rodata, tasklist, {}) - - def testParseSymbolText(self): - symbol_text = ( - '0 g F .text e8 Foo\n' - '0000dead w F .text 000000e8 .hidden Bar\n' - 'deadbeef l O .bss 00000004 .hidden Woooo\n' - 'deadbee g O .rodata 00000008 __Hooo_ooo\n' - 'deadbee g .rodata 00000000 __foo_doo_coo_end\n' - ) - symbols = sa.ParseSymbolText(symbol_text) - expect_symbols = [sa.Symbol(0x0, 'F', 0xe8, 'Foo'), - sa.Symbol(0xdead, 'F', 0xe8, 'Bar'), - sa.Symbol(0xdeadbeef, 'O', 0x4, 'Woooo'), - sa.Symbol(0xdeadbee, 'O', 0x8, '__Hooo_ooo'), - sa.Symbol(0xdeadbee, 'O', 0x0, '__foo_doo_coo_end')] - self.assertEqual(symbols, expect_symbols) - - def testParseRoData(self): - rodata_text = ( - '\n' - 'Contents of section .rodata:\n' - ' 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n' - ) - rodata = sa.ParseRoDataText(rodata_text) - expect_rodata = (0x20000, - [ 0x0010adde, 0x00001000, 0x0020adde, 0x00002000 ]) - self.assertEqual(rodata, expect_rodata) - - def testLoadTasklist(self): - def tasklist_to_taskinfos(pointer, tasklist): - taskinfos = [] - for task in tasklist: - taskinfos.append(sa.TaskInfo(name=task.name.encode('utf-8'), - routine=task.routine_name.encode('utf-8'), - stack_size=task.stack_max_size)) - - TaskInfoArray = sa.TaskInfo * len(taskinfos) - pointer.contents.contents = TaskInfoArray(*taskinfos) - return len(taskinfos) - - def ro_taskinfos(pointer): - return tasklist_to_taskinfos(pointer, expect_ro_tasklist) - - def rw_taskinfos(pointer): - return tasklist_to_taskinfos(pointer, expect_rw_tasklist) - - expect_ro_tasklist = [ - sa.Task('HOOKS', 'hook_task', 2048, 0x1000), - ] - - expect_rw_tasklist = [ - sa.Task('HOOKS', 'hook_task', 2048, 0x1000), - sa.Task('WOOKS', 'hook_task', 4096, 0x1000), - sa.Task('CONSOLE', 'console_task', 460, 0x2000), - ] - - export_taskinfo = mock.MagicMock( - get_ro_taskinfos=mock.MagicMock(side_effect=ro_taskinfos), - get_rw_taskinfos=mock.MagicMock(side_effect=rw_taskinfos)) - - tasklist = sa.LoadTasklist('RO', export_taskinfo, self.analyzer.symbols) - self.assertEqual(tasklist, expect_ro_tasklist) - tasklist = sa.LoadTasklist('RW', export_taskinfo, self.analyzer.symbols) - self.assertEqual(tasklist, expect_rw_tasklist) - - def testResolveAnnotation(self): - self.analyzer.annotation = {} - (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() - self.assertEqual(add_rules, {}) - self.assertEqual(remove_rules, []) - self.assertEqual(invalid_sigtxts, set()) - - self.analyzer.annotation = {'add': None, 'remove': None} - (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() - self.assertEqual(add_rules, {}) - self.assertEqual(remove_rules, []) - self.assertEqual(invalid_sigtxts, set()) - - self.analyzer.annotation = { - 'add': None, - 'remove': [ - [['a', 'b'], ['0', '[', '2'], 'x'], - [['a', 'b[x:3]'], ['0', '1', '2'], 'x'], - ], - } - (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() - self.assertEqual(add_rules, {}) - self.assertEqual(list.sort(remove_rules), list.sort([ - [('a', None, None), ('1', None, None), ('x', None, None)], - [('a', None, None), ('0', None, None), ('x', None, None)], - [('a', None, None), ('2', None, None), ('x', None, None)], - [('b', os.path.abspath('x'), 3), ('1', None, None), ('x', None, None)], - [('b', os.path.abspath('x'), 3), ('0', None, None), ('x', None, None)], - [('b', os.path.abspath('x'), 3), ('2', None, None), ('x', None, None)], - ])) - self.assertEqual(invalid_sigtxts, {'['}) - - self.analyzer.annotation = { - 'add': { - 'touchpad_calc': [ dict(name='__array', stride=8, offset=4) ], + """Tests for class StackAnalyzer.""" + + def setUp(self): + symbols = [ + sa.Symbol(0x1000, "F", 0x15C, "hook_task"), + sa.Symbol(0x2000, "F", 0x51C, "console_task"), + sa.Symbol(0x3200, "O", 0x124, "__just_data"), + sa.Symbol(0x4000, "F", 0x11C, "touchpad_calc"), + sa.Symbol(0x5000, "F", 0x12C, "touchpad_calc.constprop.42"), + sa.Symbol(0x12000, "F", 0x13C, "trackpad_range"), + sa.Symbol(0x13000, "F", 0x200, "inlined_mul"), + sa.Symbol(0x13100, "F", 0x200, "inlined_mul"), + sa.Symbol(0x13100, "F", 0x200, "inlined_mul_alias"), + sa.Symbol(0x20000, "O", 0x0, "__array"), + sa.Symbol(0x20010, "O", 0x0, "__array_end"), + ] + tasklist = [ + sa.Task("HOOKS", "hook_task", 2048, 0x1000), + sa.Task("CONSOLE", "console_task", 460, 0x2000), + ] + # Array at 0x20000 that contains pointers to hook_task and console_task, + # with stride=8, offset=4 + rodata = (0x20000, [0xDEAD1000, 0x00001000, 0xDEAD2000, 0x00002000]) + options = mock.MagicMock( + elf_path="./ec.RW.elf", + export_taskinfo="fake", + section="RW", + objdump="objdump", + addr2line="addr2line", + annotation=None, + ) + self.analyzer = sa.StackAnalyzer(options, symbols, rodata, tasklist, {}) + + def testParseSymbolText(self): + symbol_text = ( + "0 g F .text e8 Foo\n" + "0000dead w F .text 000000e8 .hidden Bar\n" + "deadbeef l O .bss 00000004 .hidden Woooo\n" + "deadbee g O .rodata 00000008 __Hooo_ooo\n" + "deadbee g .rodata 00000000 __foo_doo_coo_end\n" + ) + symbols = sa.ParseSymbolText(symbol_text) + expect_symbols = [ + sa.Symbol(0x0, "F", 0xE8, "Foo"), + sa.Symbol(0xDEAD, "F", 0xE8, "Bar"), + sa.Symbol(0xDEADBEEF, "O", 0x4, "Woooo"), + sa.Symbol(0xDEADBEE, "O", 0x8, "__Hooo_ooo"), + sa.Symbol(0xDEADBEE, "O", 0x0, "__foo_doo_coo_end"), + ] + self.assertEqual(symbols, expect_symbols) + + def testParseRoData(self): + rodata_text = ( + "\n" + "Contents of section .rodata:\n" + " 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n" + ) + rodata = sa.ParseRoDataText(rodata_text) + expect_rodata = (0x20000, [0x0010ADDE, 0x00001000, 0x0020ADDE, 0x00002000]) + self.assertEqual(rodata, expect_rodata) + + def testLoadTasklist(self): + def tasklist_to_taskinfos(pointer, tasklist): + taskinfos = [] + for task in tasklist: + taskinfos.append( + sa.TaskInfo( + name=task.name.encode("utf-8"), + routine=task.routine_name.encode("utf-8"), + stack_size=task.stack_max_size, + ) + ) + + TaskInfoArray = sa.TaskInfo * len(taskinfos) + pointer.contents.contents = TaskInfoArray(*taskinfos) + return len(taskinfos) + + def ro_taskinfos(pointer): + return tasklist_to_taskinfos(pointer, expect_ro_tasklist) + + def rw_taskinfos(pointer): + return tasklist_to_taskinfos(pointer, expect_rw_tasklist) + + expect_ro_tasklist = [ + sa.Task("HOOKS", "hook_task", 2048, 0x1000), + ] + + expect_rw_tasklist = [ + sa.Task("HOOKS", "hook_task", 2048, 0x1000), + sa.Task("WOOKS", "hook_task", 4096, 0x1000), + sa.Task("CONSOLE", "console_task", 460, 0x2000), + ] + + export_taskinfo = mock.MagicMock( + get_ro_taskinfos=mock.MagicMock(side_effect=ro_taskinfos), + get_rw_taskinfos=mock.MagicMock(side_effect=rw_taskinfos), + ) + + tasklist = sa.LoadTasklist("RO", export_taskinfo, self.analyzer.symbols) + self.assertEqual(tasklist, expect_ro_tasklist) + tasklist = sa.LoadTasklist("RW", export_taskinfo, self.analyzer.symbols) + self.assertEqual(tasklist, expect_rw_tasklist) + + def testResolveAnnotation(self): + self.analyzer.annotation = {} + (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() + self.assertEqual(add_rules, {}) + self.assertEqual(remove_rules, []) + self.assertEqual(invalid_sigtxts, set()) + + self.analyzer.annotation = {"add": None, "remove": None} + (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() + self.assertEqual(add_rules, {}) + self.assertEqual(remove_rules, []) + self.assertEqual(invalid_sigtxts, set()) + + self.analyzer.annotation = { + "add": None, + "remove": [ + [["a", "b"], ["0", "[", "2"], "x"], + [["a", "b[x:3]"], ["0", "1", "2"], "x"], + ], + } + (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() + self.assertEqual(add_rules, {}) + self.assertEqual( + list.sort(remove_rules), + list.sort( + [ + [("a", None, None), ("1", None, None), ("x", None, None)], + [("a", None, None), ("0", None, None), ("x", None, None)], + [("a", None, None), ("2", None, None), ("x", None, None)], + [ + ("b", os.path.abspath("x"), 3), + ("1", None, None), + ("x", None, None), + ], + [ + ("b", os.path.abspath("x"), 3), + ("0", None, None), + ("x", None, None), + ], + [ + ("b", os.path.abspath("x"), 3), + ("2", None, None), + ("x", None, None), + ], + ] + ), + ) + self.assertEqual(invalid_sigtxts, {"["}) + + self.analyzer.annotation = { + "add": { + "touchpad_calc": [dict(name="__array", stride=8, offset=4)], + } + } + (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() + self.assertEqual( + add_rules, + { + ("touchpad_calc", None, None): set( + [("console_task", None, None), ("hook_task", None, None)] + ) + }, + ) + + funcs = { + 0x1000: sa.Function(0x1000, "hook_task", 0, []), + 0x2000: sa.Function(0x2000, "console_task", 0, []), + 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []), + 0x5000: sa.Function(0x5000, "touchpad_calc.constprop.42", 0, []), + 0x13000: sa.Function(0x13000, "inlined_mul", 0, []), + 0x13100: sa.Function(0x13100, "inlined_mul", 0, []), + } + funcs[0x1000].callsites = [sa.Callsite(0x1002, None, False, None)] + # Set address_to_line_cache to fake the results of addr2line. + self.analyzer.address_to_line_cache = { + (0x1000, False): [("hook_task", os.path.abspath("a.c"), 10)], + (0x1002, False): [("toot_calc", os.path.abspath("t.c"), 1234)], + (0x2000, False): [("console_task", os.path.abspath("b.c"), 20)], + (0x4000, False): [("toudhpad_calc", os.path.abspath("a.c"), 20)], + (0x5000, False): [ + ("touchpad_calc.constprop.42", os.path.abspath("b.c"), 40) + ], + (0x12000, False): [("trackpad_range", os.path.abspath("t.c"), 10)], + (0x13000, False): [("inlined_mul", os.path.abspath("x.c"), 12)], + (0x13100, False): [("inlined_mul", os.path.abspath("x.c"), 12)], + } + self.analyzer.annotation = { + "add": { + "hook_task.lto.573": ["touchpad_calc.lto.2501[a.c]"], + "console_task": ["touchpad_calc[b.c]", "inlined_mul_alias"], + "hook_task[q.c]": ["hook_task"], + "inlined_mul[x.c]": ["inlined_mul"], + "toot_calc[t.c:1234]": ["hook_task"], + }, + "remove": [ + ["touchpad?calc["], + "touchpad_calc", + ["touchpad_calc[a.c]"], + ["task_unk[a.c]"], + ["touchpad_calc[x/a.c]"], + ["trackpad_range"], + ["inlined_mul"], + ["inlined_mul", "console_task", "touchpad_calc[a.c]"], + ["inlined_mul", "inlined_mul_alias", "console_task"], + ["inlined_mul", "inlined_mul_alias", "console_task"], + ], + } + (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() + self.assertEqual(invalid_sigtxts, {"touchpad?calc["}) + + signature_set = set() + for src_sig, dst_sigs in add_rules.items(): + signature_set.add(src_sig) + signature_set.update(dst_sigs) + + for remove_sigs in remove_rules: + signature_set.update(remove_sigs) + + (signature_map, failed_sigs) = self.analyzer.MapAnnotation(funcs, signature_set) + result = self.analyzer.ResolveAnnotation(funcs) + (add_set, remove_list, eliminated_addrs, failed_sigs) = result + + expect_signature_map = { + ("hook_task", None, None): {funcs[0x1000]}, + ("touchpad_calc", os.path.abspath("a.c"), None): {funcs[0x4000]}, + ("touchpad_calc", os.path.abspath("b.c"), None): {funcs[0x5000]}, + ("console_task", None, None): {funcs[0x2000]}, + ("inlined_mul_alias", None, None): {funcs[0x13100]}, + ("inlined_mul", os.path.abspath("x.c"), None): { + funcs[0x13000], + funcs[0x13100], + }, + ("inlined_mul", None, None): {funcs[0x13000], funcs[0x13100]}, + } + self.assertEqual(len(signature_map), len(expect_signature_map)) + for sig, funclist in signature_map.items(): + self.assertEqual(set(funclist), expect_signature_map[sig]) + + self.assertEqual( + add_set, + { + (funcs[0x1000], funcs[0x4000]), + (funcs[0x1000], funcs[0x1000]), + (funcs[0x2000], funcs[0x5000]), + (funcs[0x2000], funcs[0x13100]), + (funcs[0x13000], funcs[0x13000]), + (funcs[0x13000], funcs[0x13100]), + (funcs[0x13100], funcs[0x13000]), + (funcs[0x13100], funcs[0x13100]), + }, + ) + expect_remove_list = [ + [funcs[0x4000]], + [funcs[0x13000]], + [funcs[0x13100]], + [funcs[0x13000], funcs[0x2000], funcs[0x4000]], + [funcs[0x13100], funcs[0x2000], funcs[0x4000]], + [funcs[0x13000], funcs[0x13100], funcs[0x2000]], + [funcs[0x13100], funcs[0x13100], funcs[0x2000]], + ] + self.assertEqual(len(remove_list), len(expect_remove_list)) + for remove_path in remove_list: + self.assertTrue(remove_path in expect_remove_list) + + self.assertEqual(eliminated_addrs, {0x1002}) + self.assertEqual( + failed_sigs, + { + ("touchpad?calc[", sa.StackAnalyzer.ANNOTATION_ERROR_INVALID), + ("touchpad_calc", sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS), + ("hook_task[q.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + ("task_unk[a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + ("touchpad_calc[x/a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + ("trackpad_range", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), + }, + ) + + def testPreprocessAnnotation(self): + funcs = { + 0x1000: sa.Function(0x1000, "hook_task", 0, []), + 0x2000: sa.Function(0x2000, "console_task", 0, []), + 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []), + } + funcs[0x1000].callsites = [sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])] + funcs[0x2000].callsites = [ + sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]), + sa.Callsite(0x2006, None, True, None), + ] + add_set = { + (funcs[0x2000], funcs[0x2000]), + (funcs[0x2000], funcs[0x4000]), + (funcs[0x4000], funcs[0x1000]), + (funcs[0x4000], funcs[0x2000]), } - } - (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() - self.assertEqual(add_rules, { - ('touchpad_calc', None, None): - set([('console_task', None, None), ('hook_task', None, None)])}) - - funcs = { - 0x1000: sa.Function(0x1000, 'hook_task', 0, []), - 0x2000: sa.Function(0x2000, 'console_task', 0, []), - 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), - 0x5000: sa.Function(0x5000, 'touchpad_calc.constprop.42', 0, []), - 0x13000: sa.Function(0x13000, 'inlined_mul', 0, []), - 0x13100: sa.Function(0x13100, 'inlined_mul', 0, []), - } - funcs[0x1000].callsites = [ - sa.Callsite(0x1002, None, False, None)] - # Set address_to_line_cache to fake the results of addr2line. - self.analyzer.address_to_line_cache = { - (0x1000, False): [('hook_task', os.path.abspath('a.c'), 10)], - (0x1002, False): [('toot_calc', os.path.abspath('t.c'), 1234)], - (0x2000, False): [('console_task', os.path.abspath('b.c'), 20)], - (0x4000, False): [('toudhpad_calc', os.path.abspath('a.c'), 20)], - (0x5000, False): [ - ('touchpad_calc.constprop.42', os.path.abspath('b.c'), 40)], - (0x12000, False): [('trackpad_range', os.path.abspath('t.c'), 10)], - (0x13000, False): [('inlined_mul', os.path.abspath('x.c'), 12)], - (0x13100, False): [('inlined_mul', os.path.abspath('x.c'), 12)], - } - self.analyzer.annotation = { - 'add': { - 'hook_task.lto.573': ['touchpad_calc.lto.2501[a.c]'], - 'console_task': ['touchpad_calc[b.c]', 'inlined_mul_alias'], - 'hook_task[q.c]': ['hook_task'], - 'inlined_mul[x.c]': ['inlined_mul'], - 'toot_calc[t.c:1234]': ['hook_task'], - }, - 'remove': [ - ['touchpad?calc['], - 'touchpad_calc', - ['touchpad_calc[a.c]'], - ['task_unk[a.c]'], - ['touchpad_calc[x/a.c]'], - ['trackpad_range'], - ['inlined_mul'], - ['inlined_mul', 'console_task', 'touchpad_calc[a.c]'], - ['inlined_mul', 'inlined_mul_alias', 'console_task'], - ['inlined_mul', 'inlined_mul_alias', 'console_task'], - ], - } - (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation() - self.assertEqual(invalid_sigtxts, {'touchpad?calc['}) - - signature_set = set() - for src_sig, dst_sigs in add_rules.items(): - signature_set.add(src_sig) - signature_set.update(dst_sigs) - - for remove_sigs in remove_rules: - signature_set.update(remove_sigs) - - (signature_map, failed_sigs) = self.analyzer.MapAnnotation(funcs, - signature_set) - result = self.analyzer.ResolveAnnotation(funcs) - (add_set, remove_list, eliminated_addrs, failed_sigs) = result - - expect_signature_map = { - ('hook_task', None, None): {funcs[0x1000]}, - ('touchpad_calc', os.path.abspath('a.c'), None): {funcs[0x4000]}, - ('touchpad_calc', os.path.abspath('b.c'), None): {funcs[0x5000]}, - ('console_task', None, None): {funcs[0x2000]}, - ('inlined_mul_alias', None, None): {funcs[0x13100]}, - ('inlined_mul', os.path.abspath('x.c'), None): {funcs[0x13000], - funcs[0x13100]}, - ('inlined_mul', None, None): {funcs[0x13000], funcs[0x13100]}, - } - self.assertEqual(len(signature_map), len(expect_signature_map)) - for sig, funclist in signature_map.items(): - self.assertEqual(set(funclist), expect_signature_map[sig]) - - self.assertEqual(add_set, { - (funcs[0x1000], funcs[0x4000]), - (funcs[0x1000], funcs[0x1000]), - (funcs[0x2000], funcs[0x5000]), - (funcs[0x2000], funcs[0x13100]), - (funcs[0x13000], funcs[0x13000]), - (funcs[0x13000], funcs[0x13100]), - (funcs[0x13100], funcs[0x13000]), - (funcs[0x13100], funcs[0x13100]), - }) - expect_remove_list = [ - [funcs[0x4000]], - [funcs[0x13000]], - [funcs[0x13100]], - [funcs[0x13000], funcs[0x2000], funcs[0x4000]], - [funcs[0x13100], funcs[0x2000], funcs[0x4000]], - [funcs[0x13000], funcs[0x13100], funcs[0x2000]], - [funcs[0x13100], funcs[0x13100], funcs[0x2000]], - ] - self.assertEqual(len(remove_list), len(expect_remove_list)) - for remove_path in remove_list: - self.assertTrue(remove_path in expect_remove_list) - - self.assertEqual(eliminated_addrs, {0x1002}) - self.assertEqual(failed_sigs, { - ('touchpad?calc[', sa.StackAnalyzer.ANNOTATION_ERROR_INVALID), - ('touchpad_calc', sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS), - ('hook_task[q.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), - ('task_unk[a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), - ('touchpad_calc[x/a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), - ('trackpad_range', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND), - }) - - def testPreprocessAnnotation(self): - funcs = { - 0x1000: sa.Function(0x1000, 'hook_task', 0, []), - 0x2000: sa.Function(0x2000, 'console_task', 0, []), - 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), - } - funcs[0x1000].callsites = [ - sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])] - funcs[0x2000].callsites = [ - sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]), - sa.Callsite(0x2006, None, True, None), - ] - add_set = { - (funcs[0x2000], funcs[0x2000]), - (funcs[0x2000], funcs[0x4000]), - (funcs[0x4000], funcs[0x1000]), - (funcs[0x4000], funcs[0x2000]), - } - remove_list = [ - [funcs[0x1000]], - [funcs[0x2000], funcs[0x2000]], - [funcs[0x4000], funcs[0x1000]], - [funcs[0x2000], funcs[0x4000], funcs[0x2000]], - [funcs[0x4000], funcs[0x1000], funcs[0x4000]], - ] - eliminated_addrs = {0x2006} - - remaining_remove_list = self.analyzer.PreprocessAnnotation(funcs, - add_set, - remove_list, - eliminated_addrs) - - expect_funcs = { - 0x1000: sa.Function(0x1000, 'hook_task', 0, []), - 0x2000: sa.Function(0x2000, 'console_task', 0, []), - 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), - } - expect_funcs[0x2000].callsites = [ - sa.Callsite(None, 0x4000, False, expect_funcs[0x4000])] - expect_funcs[0x4000].callsites = [ - sa.Callsite(None, 0x2000, False, expect_funcs[0x2000])] - self.assertEqual(funcs, expect_funcs) - self.assertEqual(remaining_remove_list, [ - [funcs[0x2000], funcs[0x4000], funcs[0x2000]], - ]) - - def testAndesAnalyzeDisassembly(self): - disasm_text = ( - '\n' - 'build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le' - '\n' - 'Disassembly of section .text:\n' - '\n' - '00000900 <wook_task>:\n' - ' ...\n' - '00001000 <hook_task>:\n' - ' 1000: fc 42\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n' - ' 1004: 47 70\t\tmovi55 $r0, #1\n' - ' 1006: b1 13\tbnezs8 100929de <flash_command_write>\n' - ' 1008: 00 01 5c fc\tbne $r6, $r0, 2af6a\n' - '00002000 <console_task>:\n' - ' 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n' - ' 2002: f0 0e fc c5\tjal 1000 <hook_task>\n' - ' 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n' - ' 200a: de ad be ef\tswi.gp $r0, [ + #-11036]\n' - '00004000 <touchpad_calc>:\n' - ' 4000: 47 70\t\tmovi55 $r0, #1\n' - '00010000 <look_task>:' - ) - function_map = self.analyzer.AnalyzeDisassembly(disasm_text) - func_hook_task = sa.Function(0x1000, 'hook_task', 48, [ - sa.Callsite(0x1006, 0x100929de, True, None)]) - expect_funcmap = { - 0x1000: func_hook_task, - 0x2000: sa.Function(0x2000, 'console_task', 16, - [sa.Callsite(0x2002, 0x1000, False, func_hook_task), - sa.Callsite(0x2006, 0x53968, True, None)]), - 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), - } - self.assertEqual(function_map, expect_funcmap) - - def testArmAnalyzeDisassembly(self): - disasm_text = ( - '\n' - 'build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm' - '\n' - 'Disassembly of section .text:\n' - '\n' - '00000900 <wook_task>:\n' - ' ...\n' - '00001000 <hook_task>:\n' - ' 1000: dead beef\tfake\n' - ' 1004: 4770\t\tbx lr\n' - ' 1006: b113\tcbz r3, 100929de <flash_command_write>\n' - ' 1008: 00015cfc\t.word 0x00015cfc\n' - '00002000 <console_task>:\n' - ' 2000: b508\t\tpush {r3, lr} ; malformed comments,; r0, r1 \n' - ' 2002: f00e fcc5\tbl 1000 <hook_task>\n' - ' 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n' - ' 200a: dead beef\tfake\n' - '00004000 <touchpad_calc>:\n' - ' 4000: 4770\t\tbx lr\n' - '00010000 <look_task>:' - ) - function_map = self.analyzer.AnalyzeDisassembly(disasm_text) - func_hook_task = sa.Function(0x1000, 'hook_task', 0, [ - sa.Callsite(0x1006, 0x100929de, True, None)]) - expect_funcmap = { - 0x1000: func_hook_task, - 0x2000: sa.Function(0x2000, 'console_task', 8, - [sa.Callsite(0x2002, 0x1000, False, func_hook_task), - sa.Callsite(0x2006, 0x53968, True, None)]), - 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []), - } - self.assertEqual(function_map, expect_funcmap) - - def testAnalyzeCallGraph(self): - funcs = { - 0x1000: sa.Function(0x1000, 'hook_task', 0, []), - 0x2000: sa.Function(0x2000, 'console_task', 8, []), - 0x3000: sa.Function(0x3000, 'task_a', 12, []), - 0x4000: sa.Function(0x4000, 'task_b', 96, []), - 0x5000: sa.Function(0x5000, 'task_c', 32, []), - 0x6000: sa.Function(0x6000, 'task_d', 100, []), - 0x7000: sa.Function(0x7000, 'task_e', 24, []), - 0x8000: sa.Function(0x8000, 'task_f', 20, []), - 0x9000: sa.Function(0x9000, 'task_g', 20, []), - 0x10000: sa.Function(0x10000, 'task_x', 16, []), - } - funcs[0x1000].callsites = [ - sa.Callsite(0x1002, 0x3000, False, funcs[0x3000]), - sa.Callsite(0x1006, 0x4000, False, funcs[0x4000])] - funcs[0x2000].callsites = [ - sa.Callsite(0x2002, 0x5000, False, funcs[0x5000]), - sa.Callsite(0x2006, 0x2000, False, funcs[0x2000]), - sa.Callsite(0x200a, 0x10000, False, funcs[0x10000])] - funcs[0x3000].callsites = [ - sa.Callsite(0x3002, 0x4000, False, funcs[0x4000]), - sa.Callsite(0x3006, 0x1000, False, funcs[0x1000])] - funcs[0x4000].callsites = [ - sa.Callsite(0x4002, 0x6000, True, funcs[0x6000]), - sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]), - sa.Callsite(0x400a, 0x8000, False, funcs[0x8000])] - funcs[0x5000].callsites = [ - sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])] - funcs[0x7000].callsites = [ - sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])] - funcs[0x8000].callsites = [ - sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])] - funcs[0x9000].callsites = [ - sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])] - funcs[0x10000].callsites = [ - sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])] - - cycles = self.analyzer.AnalyzeCallGraph(funcs, [ - [funcs[0x2000]] * 2, - [funcs[0x10000], funcs[0x2000]] * 3, - [funcs[0x1000], funcs[0x3000], funcs[0x1000]] - ]) - - expect_func_stack = { - 0x1000: (268, [funcs[0x1000], - funcs[0x3000], - funcs[0x4000], - funcs[0x8000], - funcs[0x9000], - funcs[0x4000], - funcs[0x7000]]), - 0x2000: (208, [funcs[0x2000], - funcs[0x10000], - funcs[0x2000], - funcs[0x10000], - funcs[0x2000], - funcs[0x5000], - funcs[0x4000], - funcs[0x7000]]), - 0x3000: (280, [funcs[0x3000], - funcs[0x1000], - funcs[0x3000], - funcs[0x4000], - funcs[0x8000], - funcs[0x9000], - funcs[0x4000], - funcs[0x7000]]), - 0x4000: (120, [funcs[0x4000], funcs[0x7000]]), - 0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]), - 0x6000: (100, [funcs[0x6000]]), - 0x7000: (24, [funcs[0x7000]]), - 0x8000: (160, [funcs[0x8000], - funcs[0x9000], - funcs[0x4000], - funcs[0x7000]]), - 0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]), - 0x10000: (200, [funcs[0x10000], - funcs[0x2000], - funcs[0x10000], - funcs[0x2000], - funcs[0x5000], - funcs[0x4000], - funcs[0x7000]]), - } - expect_cycles = [ - {funcs[0x4000], funcs[0x8000], funcs[0x9000]}, - {funcs[0x7000]}, - ] - for func in funcs.values(): - (stack_max_usage, stack_max_path) = expect_func_stack[func.address] - self.assertEqual(func.stack_max_usage, stack_max_usage) - self.assertEqual(func.stack_max_path, stack_max_path) - - self.assertEqual(len(cycles), len(expect_cycles)) - for cycle in cycles: - self.assertTrue(cycle in expect_cycles) - - @mock.patch('subprocess.check_output') - def testAddressToLine(self, checkoutput_mock): - checkoutput_mock.return_value = 'fake_func\n/test.c:1' - self.assertEqual(self.analyzer.AddressToLine(0x1234), - [('fake_func', '/test.c', 1)]) - checkoutput_mock.assert_called_once_with( - ['addr2line', '-f', '-e', './ec.RW.elf', '1234'], encoding='utf-8') - checkoutput_mock.reset_mock() - - checkoutput_mock.return_value = 'fake_func\n/a.c:1\nbake_func\n/b.c:2\n' - self.assertEqual(self.analyzer.AddressToLine(0x1234, True), - [('fake_func', '/a.c', 1), ('bake_func', '/b.c', 2)]) - checkoutput_mock.assert_called_once_with( - ['addr2line', '-f', '-e', './ec.RW.elf', '1234', '-i'], - encoding='utf-8') - checkoutput_mock.reset_mock() - - checkoutput_mock.return_value = 'fake_func\n/test.c:1 (discriminator 128)' - self.assertEqual(self.analyzer.AddressToLine(0x12345), - [('fake_func', '/test.c', 1)]) - checkoutput_mock.assert_called_once_with( - ['addr2line', '-f', '-e', './ec.RW.elf', '12345'], encoding='utf-8') - checkoutput_mock.reset_mock() - - checkoutput_mock.return_value = '??\n:?\nbake_func\n/b.c:2\n' - self.assertEqual(self.analyzer.AddressToLine(0x123456), - [None, ('bake_func', '/b.c', 2)]) - checkoutput_mock.assert_called_once_with( - ['addr2line', '-f', '-e', './ec.RW.elf', '123456'], encoding='utf-8') - checkoutput_mock.reset_mock() - - with self.assertRaisesRegexp(sa.StackAnalyzerError, - 'addr2line failed to resolve lines.'): - checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '') - self.analyzer.AddressToLine(0x5678) - - with self.assertRaisesRegexp(sa.StackAnalyzerError, - 'Failed to run addr2line.'): - checkoutput_mock.side_effect = OSError() - self.analyzer.AddressToLine(0x9012) - - @mock.patch('subprocess.check_output') - @mock.patch('stack_analyzer.StackAnalyzer.AddressToLine') - def testAndesAnalyze(self, addrtoline_mock, checkoutput_mock): - disasm_text = ( - '\n' - 'build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le' - '\n' - 'Disassembly of section .text:\n' - '\n' - '00000900 <wook_task>:\n' - ' ...\n' - '00001000 <hook_task>:\n' - ' 1000: fc 00\t\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n' - ' 1002: 47 70\t\tmovi55 $r0, #1\n' - ' 1006: 00 01 5c fc\tbne $r6, $r0, 2af6a\n' - '00002000 <console_task>:\n' - ' 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n' - ' 2002: f0 0e fc c5\tjal 1000 <hook_task>\n' - ' 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n' - ' 200a: 12 34 56 78\tjral5 $r0\n' - ) - - addrtoline_mock.return_value = [('??', '??', 0)] - self.analyzer.annotation = { - 'exception_frame_size': 64, - 'remove': [['fake_func']], - } - - with mock.patch('builtins.print') as print_mock: - checkoutput_mock.return_value = disasm_text - self.analyzer.Analyze() - print_mock.assert_has_calls([ - mock.call( - 'Task: HOOKS, Max size: 96 (32 + 64), Allocated size: 2048'), - mock.call('Call Trace:'), - mock.call(' hook_task (32) [??:0] 1000'), - mock.call( - 'Task: CONSOLE, Max size: 112 (48 + 64), Allocated size: 460'), - mock.call('Call Trace:'), - mock.call(' console_task (16) [??:0] 2000'), - mock.call(' -> ??[??:0] 2002'), - mock.call(' hook_task (32) [??:0] 1000'), - mock.call('Unresolved indirect callsites:'), - mock.call(' In function console_task:'), - mock.call(' -> ??[??:0] 200a'), - mock.call('Unresolved annotation signatures:'), - mock.call(' fake_func: function is not found'), - ]) - - with self.assertRaisesRegexp(sa.StackAnalyzerError, - 'Failed to run objdump.'): - checkoutput_mock.side_effect = OSError() - self.analyzer.Analyze() - - with self.assertRaisesRegexp(sa.StackAnalyzerError, - 'objdump failed to disassemble.'): - checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '') - self.analyzer.Analyze() - - @mock.patch('subprocess.check_output') - @mock.patch('stack_analyzer.StackAnalyzer.AddressToLine') - def testArmAnalyze(self, addrtoline_mock, checkoutput_mock): - disasm_text = ( - '\n' - 'build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm' - '\n' - 'Disassembly of section .text:\n' - '\n' - '00000900 <wook_task>:\n' - ' ...\n' - '00001000 <hook_task>:\n' - ' 1000: b508\t\tpush {r3, lr}\n' - ' 1002: 4770\t\tbx lr\n' - ' 1006: 00015cfc\t.word 0x00015cfc\n' - '00002000 <console_task>:\n' - ' 2000: b508\t\tpush {r3, lr}\n' - ' 2002: f00e fcc5\tbl 1000 <hook_task>\n' - ' 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n' - ' 200a: 1234 5678\tb.w sl\n' - ) - - addrtoline_mock.return_value = [('??', '??', 0)] - self.analyzer.annotation = { - 'exception_frame_size': 64, - 'remove': [['fake_func']], - } - - with mock.patch('builtins.print') as print_mock: - checkoutput_mock.return_value = disasm_text - self.analyzer.Analyze() - print_mock.assert_has_calls([ - mock.call( - 'Task: HOOKS, Max size: 72 (8 + 64), Allocated size: 2048'), - mock.call('Call Trace:'), - mock.call(' hook_task (8) [??:0] 1000'), - mock.call( - 'Task: CONSOLE, Max size: 80 (16 + 64), Allocated size: 460'), - mock.call('Call Trace:'), - mock.call(' console_task (8) [??:0] 2000'), - mock.call(' -> ??[??:0] 2002'), - mock.call(' hook_task (8) [??:0] 1000'), - mock.call('Unresolved indirect callsites:'), - mock.call(' In function console_task:'), - mock.call(' -> ??[??:0] 200a'), - mock.call('Unresolved annotation signatures:'), - mock.call(' fake_func: function is not found'), - ]) - - with self.assertRaisesRegexp(sa.StackAnalyzerError, - 'Failed to run objdump.'): - checkoutput_mock.side_effect = OSError() - self.analyzer.Analyze() - - with self.assertRaisesRegexp(sa.StackAnalyzerError, - 'objdump failed to disassemble.'): - checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '') - self.analyzer.Analyze() - - @mock.patch('subprocess.check_output') - @mock.patch('stack_analyzer.ParseArgs') - def testMain(self, parseargs_mock, checkoutput_mock): - symbol_text = ('1000 g F .text 0000015c .hidden hook_task\n' - '2000 g F .text 0000051c .hidden console_task\n') - rodata_text = ( - '\n' - 'Contents of section .rodata:\n' - ' 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n' - ) - - args = mock.MagicMock(elf_path='./ec.RW.elf', - export_taskinfo='fake', - section='RW', - objdump='objdump', - addr2line='addr2line', - annotation='fake') - parseargs_mock.return_value = args - - with mock.patch('os.path.exists') as path_mock: - path_mock.return_value = False - with mock.patch('builtins.print') as print_mock: - with mock.patch('builtins.open', mock.mock_open()) as open_mock: - sa.main() - print_mock.assert_any_call( - 'Warning: Annotation file fake does not exist.') - - with mock.patch('os.path.exists') as path_mock: - path_mock.return_value = True - with mock.patch('builtins.print') as print_mock: - with mock.patch('builtins.open', mock.mock_open()) as open_mock: - open_mock.side_effect = IOError() - sa.main() - print_mock.assert_called_once_with( - 'Error: Failed to open annotation file fake.') - - with mock.patch('builtins.print') as print_mock: - with mock.patch('builtins.open', mock.mock_open()) as open_mock: - open_mock.return_value.read.side_effect = ['{', ''] - sa.main() - open_mock.assert_called_once_with('fake', 'r') - print_mock.assert_called_once_with( - 'Error: Failed to parse annotation file fake.') - - with mock.patch('builtins.print') as print_mock: - with mock.patch('builtins.open', - mock.mock_open(read_data='')) as open_mock: - sa.main() - print_mock.assert_called_once_with( - 'Error: Invalid annotation file fake.') - - args.annotation = None - - with mock.patch('builtins.print') as print_mock: - checkoutput_mock.side_effect = [symbol_text, rodata_text] - sa.main() - print_mock.assert_called_once_with( - 'Error: Failed to load export_taskinfo.') - - with mock.patch('builtins.print') as print_mock: - checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '') - sa.main() - print_mock.assert_called_once_with( - 'Error: objdump failed to dump symbol table or rodata.') - - with mock.patch('builtins.print') as print_mock: - checkoutput_mock.side_effect = OSError() - sa.main() - print_mock.assert_called_once_with('Error: Failed to run objdump.') - - -if __name__ == '__main__': - unittest.main() + remove_list = [ + [funcs[0x1000]], + [funcs[0x2000], funcs[0x2000]], + [funcs[0x4000], funcs[0x1000]], + [funcs[0x2000], funcs[0x4000], funcs[0x2000]], + [funcs[0x4000], funcs[0x1000], funcs[0x4000]], + ] + eliminated_addrs = {0x2006} + + remaining_remove_list = self.analyzer.PreprocessAnnotation( + funcs, add_set, remove_list, eliminated_addrs + ) + + expect_funcs = { + 0x1000: sa.Function(0x1000, "hook_task", 0, []), + 0x2000: sa.Function(0x2000, "console_task", 0, []), + 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []), + } + expect_funcs[0x2000].callsites = [ + sa.Callsite(None, 0x4000, False, expect_funcs[0x4000]) + ] + expect_funcs[0x4000].callsites = [ + sa.Callsite(None, 0x2000, False, expect_funcs[0x2000]) + ] + self.assertEqual(funcs, expect_funcs) + self.assertEqual( + remaining_remove_list, + [ + [funcs[0x2000], funcs[0x4000], funcs[0x2000]], + ], + ) + + def testAndesAnalyzeDisassembly(self): + disasm_text = ( + "\n" + "build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le" + "\n" + "Disassembly of section .text:\n" + "\n" + "00000900 <wook_task>:\n" + " ...\n" + "00001000 <hook_task>:\n" + " 1000: fc 42\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n" + " 1004: 47 70\t\tmovi55 $r0, #1\n" + " 1006: b1 13\tbnezs8 100929de <flash_command_write>\n" + " 1008: 00 01 5c fc\tbne $r6, $r0, 2af6a\n" + "00002000 <console_task>:\n" + " 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n" + " 2002: f0 0e fc c5\tjal 1000 <hook_task>\n" + " 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n" + " 200a: de ad be ef\tswi.gp $r0, [ + #-11036]\n" + "00004000 <touchpad_calc>:\n" + " 4000: 47 70\t\tmovi55 $r0, #1\n" + "00010000 <look_task>:" + ) + function_map = self.analyzer.AnalyzeDisassembly(disasm_text) + func_hook_task = sa.Function( + 0x1000, "hook_task", 48, [sa.Callsite(0x1006, 0x100929DE, True, None)] + ) + expect_funcmap = { + 0x1000: func_hook_task, + 0x2000: sa.Function( + 0x2000, + "console_task", + 16, + [ + sa.Callsite(0x2002, 0x1000, False, func_hook_task), + sa.Callsite(0x2006, 0x53968, True, None), + ], + ), + 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []), + } + self.assertEqual(function_map, expect_funcmap) + + def testArmAnalyzeDisassembly(self): + disasm_text = ( + "\n" + "build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm" + "\n" + "Disassembly of section .text:\n" + "\n" + "00000900 <wook_task>:\n" + " ...\n" + "00001000 <hook_task>:\n" + " 1000: dead beef\tfake\n" + " 1004: 4770\t\tbx lr\n" + " 1006: b113\tcbz r3, 100929de <flash_command_write>\n" + " 1008: 00015cfc\t.word 0x00015cfc\n" + "00002000 <console_task>:\n" + " 2000: b508\t\tpush {r3, lr} ; malformed comments,; r0, r1 \n" + " 2002: f00e fcc5\tbl 1000 <hook_task>\n" + " 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n" + " 200a: dead beef\tfake\n" + "00004000 <touchpad_calc>:\n" + " 4000: 4770\t\tbx lr\n" + "00010000 <look_task>:" + ) + function_map = self.analyzer.AnalyzeDisassembly(disasm_text) + func_hook_task = sa.Function( + 0x1000, "hook_task", 0, [sa.Callsite(0x1006, 0x100929DE, True, None)] + ) + expect_funcmap = { + 0x1000: func_hook_task, + 0x2000: sa.Function( + 0x2000, + "console_task", + 8, + [ + sa.Callsite(0x2002, 0x1000, False, func_hook_task), + sa.Callsite(0x2006, 0x53968, True, None), + ], + ), + 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []), + } + self.assertEqual(function_map, expect_funcmap) + + def testAnalyzeCallGraph(self): + funcs = { + 0x1000: sa.Function(0x1000, "hook_task", 0, []), + 0x2000: sa.Function(0x2000, "console_task", 8, []), + 0x3000: sa.Function(0x3000, "task_a", 12, []), + 0x4000: sa.Function(0x4000, "task_b", 96, []), + 0x5000: sa.Function(0x5000, "task_c", 32, []), + 0x6000: sa.Function(0x6000, "task_d", 100, []), + 0x7000: sa.Function(0x7000, "task_e", 24, []), + 0x8000: sa.Function(0x8000, "task_f", 20, []), + 0x9000: sa.Function(0x9000, "task_g", 20, []), + 0x10000: sa.Function(0x10000, "task_x", 16, []), + } + funcs[0x1000].callsites = [ + sa.Callsite(0x1002, 0x3000, False, funcs[0x3000]), + sa.Callsite(0x1006, 0x4000, False, funcs[0x4000]), + ] + funcs[0x2000].callsites = [ + sa.Callsite(0x2002, 0x5000, False, funcs[0x5000]), + sa.Callsite(0x2006, 0x2000, False, funcs[0x2000]), + sa.Callsite(0x200A, 0x10000, False, funcs[0x10000]), + ] + funcs[0x3000].callsites = [ + sa.Callsite(0x3002, 0x4000, False, funcs[0x4000]), + sa.Callsite(0x3006, 0x1000, False, funcs[0x1000]), + ] + funcs[0x4000].callsites = [ + sa.Callsite(0x4002, 0x6000, True, funcs[0x6000]), + sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]), + sa.Callsite(0x400A, 0x8000, False, funcs[0x8000]), + ] + funcs[0x5000].callsites = [sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])] + funcs[0x7000].callsites = [sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])] + funcs[0x8000].callsites = [sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])] + funcs[0x9000].callsites = [sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])] + funcs[0x10000].callsites = [sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])] + + cycles = self.analyzer.AnalyzeCallGraph( + funcs, + [ + [funcs[0x2000]] * 2, + [funcs[0x10000], funcs[0x2000]] * 3, + [funcs[0x1000], funcs[0x3000], funcs[0x1000]], + ], + ) + + expect_func_stack = { + 0x1000: ( + 268, + [ + funcs[0x1000], + funcs[0x3000], + funcs[0x4000], + funcs[0x8000], + funcs[0x9000], + funcs[0x4000], + funcs[0x7000], + ], + ), + 0x2000: ( + 208, + [ + funcs[0x2000], + funcs[0x10000], + funcs[0x2000], + funcs[0x10000], + funcs[0x2000], + funcs[0x5000], + funcs[0x4000], + funcs[0x7000], + ], + ), + 0x3000: ( + 280, + [ + funcs[0x3000], + funcs[0x1000], + funcs[0x3000], + funcs[0x4000], + funcs[0x8000], + funcs[0x9000], + funcs[0x4000], + funcs[0x7000], + ], + ), + 0x4000: (120, [funcs[0x4000], funcs[0x7000]]), + 0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]), + 0x6000: (100, [funcs[0x6000]]), + 0x7000: (24, [funcs[0x7000]]), + 0x8000: (160, [funcs[0x8000], funcs[0x9000], funcs[0x4000], funcs[0x7000]]), + 0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]), + 0x10000: ( + 200, + [ + funcs[0x10000], + funcs[0x2000], + funcs[0x10000], + funcs[0x2000], + funcs[0x5000], + funcs[0x4000], + funcs[0x7000], + ], + ), + } + expect_cycles = [ + {funcs[0x4000], funcs[0x8000], funcs[0x9000]}, + {funcs[0x7000]}, + ] + for func in funcs.values(): + (stack_max_usage, stack_max_path) = expect_func_stack[func.address] + self.assertEqual(func.stack_max_usage, stack_max_usage) + self.assertEqual(func.stack_max_path, stack_max_path) + + self.assertEqual(len(cycles), len(expect_cycles)) + for cycle in cycles: + self.assertTrue(cycle in expect_cycles) + + @mock.patch("subprocess.check_output") + def testAddressToLine(self, checkoutput_mock): + checkoutput_mock.return_value = "fake_func\n/test.c:1" + self.assertEqual( + self.analyzer.AddressToLine(0x1234), [("fake_func", "/test.c", 1)] + ) + checkoutput_mock.assert_called_once_with( + ["addr2line", "-f", "-e", "./ec.RW.elf", "1234"], encoding="utf-8" + ) + checkoutput_mock.reset_mock() + + checkoutput_mock.return_value = "fake_func\n/a.c:1\nbake_func\n/b.c:2\n" + self.assertEqual( + self.analyzer.AddressToLine(0x1234, True), + [("fake_func", "/a.c", 1), ("bake_func", "/b.c", 2)], + ) + checkoutput_mock.assert_called_once_with( + ["addr2line", "-f", "-e", "./ec.RW.elf", "1234", "-i"], encoding="utf-8" + ) + checkoutput_mock.reset_mock() + + checkoutput_mock.return_value = "fake_func\n/test.c:1 (discriminator 128)" + self.assertEqual( + self.analyzer.AddressToLine(0x12345), [("fake_func", "/test.c", 1)] + ) + checkoutput_mock.assert_called_once_with( + ["addr2line", "-f", "-e", "./ec.RW.elf", "12345"], encoding="utf-8" + ) + checkoutput_mock.reset_mock() + + checkoutput_mock.return_value = "??\n:?\nbake_func\n/b.c:2\n" + self.assertEqual( + self.analyzer.AddressToLine(0x123456), [None, ("bake_func", "/b.c", 2)] + ) + checkoutput_mock.assert_called_once_with( + ["addr2line", "-f", "-e", "./ec.RW.elf", "123456"], encoding="utf-8" + ) + checkoutput_mock.reset_mock() + + with self.assertRaisesRegexp( + sa.StackAnalyzerError, "addr2line failed to resolve lines." + ): + checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "") + self.analyzer.AddressToLine(0x5678) + + with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run addr2line."): + checkoutput_mock.side_effect = OSError() + self.analyzer.AddressToLine(0x9012) + + @mock.patch("subprocess.check_output") + @mock.patch("stack_analyzer.StackAnalyzer.AddressToLine") + def testAndesAnalyze(self, addrtoline_mock, checkoutput_mock): + disasm_text = ( + "\n" + "build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le" + "\n" + "Disassembly of section .text:\n" + "\n" + "00000900 <wook_task>:\n" + " ...\n" + "00001000 <hook_task>:\n" + " 1000: fc 00\t\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n" + " 1002: 47 70\t\tmovi55 $r0, #1\n" + " 1006: 00 01 5c fc\tbne $r6, $r0, 2af6a\n" + "00002000 <console_task>:\n" + " 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n" + " 2002: f0 0e fc c5\tjal 1000 <hook_task>\n" + " 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n" + " 200a: 12 34 56 78\tjral5 $r0\n" + ) + + addrtoline_mock.return_value = [("??", "??", 0)] + self.analyzer.annotation = { + "exception_frame_size": 64, + "remove": [["fake_func"]], + } + + with mock.patch("builtins.print") as print_mock: + checkoutput_mock.return_value = disasm_text + self.analyzer.Analyze() + print_mock.assert_has_calls( + [ + mock.call( + "Task: HOOKS, Max size: 96 (32 + 64), Allocated size: 2048" + ), + mock.call("Call Trace:"), + mock.call(" hook_task (32) [??:0] 1000"), + mock.call( + "Task: CONSOLE, Max size: 112 (48 + 64), Allocated size: 460" + ), + mock.call("Call Trace:"), + mock.call(" console_task (16) [??:0] 2000"), + mock.call(" -> ??[??:0] 2002"), + mock.call(" hook_task (32) [??:0] 1000"), + mock.call("Unresolved indirect callsites:"), + mock.call(" In function console_task:"), + mock.call(" -> ??[??:0] 200a"), + mock.call("Unresolved annotation signatures:"), + mock.call(" fake_func: function is not found"), + ] + ) + + with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run objdump."): + checkoutput_mock.side_effect = OSError() + self.analyzer.Analyze() + + with self.assertRaisesRegexp( + sa.StackAnalyzerError, "objdump failed to disassemble." + ): + checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "") + self.analyzer.Analyze() + + @mock.patch("subprocess.check_output") + @mock.patch("stack_analyzer.StackAnalyzer.AddressToLine") + def testArmAnalyze(self, addrtoline_mock, checkoutput_mock): + disasm_text = ( + "\n" + "build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm" + "\n" + "Disassembly of section .text:\n" + "\n" + "00000900 <wook_task>:\n" + " ...\n" + "00001000 <hook_task>:\n" + " 1000: b508\t\tpush {r3, lr}\n" + " 1002: 4770\t\tbx lr\n" + " 1006: 00015cfc\t.word 0x00015cfc\n" + "00002000 <console_task>:\n" + " 2000: b508\t\tpush {r3, lr}\n" + " 2002: f00e fcc5\tbl 1000 <hook_task>\n" + " 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n" + " 200a: 1234 5678\tb.w sl\n" + ) + + addrtoline_mock.return_value = [("??", "??", 0)] + self.analyzer.annotation = { + "exception_frame_size": 64, + "remove": [["fake_func"]], + } + + with mock.patch("builtins.print") as print_mock: + checkoutput_mock.return_value = disasm_text + self.analyzer.Analyze() + print_mock.assert_has_calls( + [ + mock.call( + "Task: HOOKS, Max size: 72 (8 + 64), Allocated size: 2048" + ), + mock.call("Call Trace:"), + mock.call(" hook_task (8) [??:0] 1000"), + mock.call( + "Task: CONSOLE, Max size: 80 (16 + 64), Allocated size: 460" + ), + mock.call("Call Trace:"), + mock.call(" console_task (8) [??:0] 2000"), + mock.call(" -> ??[??:0] 2002"), + mock.call(" hook_task (8) [??:0] 1000"), + mock.call("Unresolved indirect callsites:"), + mock.call(" In function console_task:"), + mock.call(" -> ??[??:0] 200a"), + mock.call("Unresolved annotation signatures:"), + mock.call(" fake_func: function is not found"), + ] + ) + + with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run objdump."): + checkoutput_mock.side_effect = OSError() + self.analyzer.Analyze() + + with self.assertRaisesRegexp( + sa.StackAnalyzerError, "objdump failed to disassemble." + ): + checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "") + self.analyzer.Analyze() + + @mock.patch("subprocess.check_output") + @mock.patch("stack_analyzer.ParseArgs") + def testMain(self, parseargs_mock, checkoutput_mock): + symbol_text = ( + "1000 g F .text 0000015c .hidden hook_task\n" + "2000 g F .text 0000051c .hidden console_task\n" + ) + rodata_text = ( + "\n" + "Contents of section .rodata:\n" + " 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n" + ) + + args = mock.MagicMock( + elf_path="./ec.RW.elf", + export_taskinfo="fake", + section="RW", + objdump="objdump", + addr2line="addr2line", + annotation="fake", + ) + parseargs_mock.return_value = args + + with mock.patch("os.path.exists") as path_mock: + path_mock.return_value = False + with mock.patch("builtins.print") as print_mock: + with mock.patch("builtins.open", mock.mock_open()) as open_mock: + sa.main() + print_mock.assert_any_call( + "Warning: Annotation file fake does not exist." + ) + + with mock.patch("os.path.exists") as path_mock: + path_mock.return_value = True + with mock.patch("builtins.print") as print_mock: + with mock.patch("builtins.open", mock.mock_open()) as open_mock: + open_mock.side_effect = IOError() + sa.main() + print_mock.assert_called_once_with( + "Error: Failed to open annotation file fake." + ) + + with mock.patch("builtins.print") as print_mock: + with mock.patch("builtins.open", mock.mock_open()) as open_mock: + open_mock.return_value.read.side_effect = ["{", ""] + sa.main() + open_mock.assert_called_once_with("fake", "r") + print_mock.assert_called_once_with( + "Error: Failed to parse annotation file fake." + ) + + with mock.patch("builtins.print") as print_mock: + with mock.patch( + "builtins.open", mock.mock_open(read_data="") + ) as open_mock: + sa.main() + print_mock.assert_called_once_with( + "Error: Invalid annotation file fake." + ) + + args.annotation = None + + with mock.patch("builtins.print") as print_mock: + checkoutput_mock.side_effect = [symbol_text, rodata_text] + sa.main() + print_mock.assert_called_once_with("Error: Failed to load export_taskinfo.") + + with mock.patch("builtins.print") as print_mock: + checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "") + sa.main() + print_mock.assert_called_once_with( + "Error: objdump failed to dump symbol table or rodata." + ) + + with mock.patch("builtins.print") as print_mock: + checkoutput_mock.side_effect = OSError() + sa.main() + print_mock.assert_called_once_with("Error: Failed to run objdump.") + + +if __name__ == "__main__": + unittest.main() |