summaryrefslogtreecommitdiff
path: root/extra/stack_analyzer
diff options
context:
space:
mode:
Diffstat (limited to 'extra/stack_analyzer')
-rwxr-xr-xextra/stack_analyzer/run_tests.sh2
-rwxr-xr-xextra/stack_analyzer/stack_analyzer.py3674
-rwxr-xr-xextra/stack_analyzer/stack_analyzer_unittest.py1777
3 files changed, 2927 insertions, 2526 deletions
diff --git a/extra/stack_analyzer/run_tests.sh b/extra/stack_analyzer/run_tests.sh
index 5662f60b8b..d5e65045c3 100755
--- a/extra/stack_analyzer/run_tests.sh
+++ b/extra/stack_analyzer/run_tests.sh
@@ -1,6 +1,6 @@
#!/bin/bash
#
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py
index 77d16d5450..2431545c6a 100755
--- a/extra/stack_analyzer/stack_analyzer.py
+++ b/extra/stack_analyzer/stack_analyzer.py
@@ -1,11 +1,7 @@
#!/usr/bin/env python3
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Statically analyze stack usage of EC firmware.
@@ -25,1848 +21,2090 @@ import ctypes
import os
import re
import subprocess
-import yaml
+import yaml # pylint:disable=import-error
-SECTION_RO = 'RO'
-SECTION_RW = 'RW'
+SECTION_RO = "RO"
+SECTION_RW = "RW"
# Default size of extra stack frame needed by exception context switch.
# This value is for cortex-m with FPU enabled.
DEFAULT_EXCEPTION_FRAME_SIZE = 224
class StackAnalyzerError(Exception):
- """Exception class for stack analyzer utility."""
+ """Exception class for stack analyzer utility."""
class TaskInfo(ctypes.Structure):
- """Taskinfo ctypes structure.
-
- The structure definition is corresponding to the "struct taskinfo"
- in "util/export_taskinfo.so.c".
- """
- _fields_ = [('name', ctypes.c_char_p),
- ('routine', ctypes.c_char_p),
- ('stack_size', ctypes.c_uint32)]
+ """Taskinfo ctypes structure.
+ The structure definition is corresponding to the "struct taskinfo"
+ in "util/export_taskinfo.so.c".
+ """
-class Task(object):
- """Task information.
+ _fields_ = [
+ ("name", ctypes.c_char_p),
+ ("routine", ctypes.c_char_p),
+ ("stack_size", ctypes.c_uint32),
+ ]
- Attributes:
- name: Task name.
- routine_name: Routine function name.
- stack_max_size: Max stack size.
- routine_address: Resolved routine address. None if it hasn't been resolved.
- """
- def __init__(self, name, routine_name, stack_max_size, routine_address=None):
- """Constructor.
+class Task(object):
+ """Task information.
- Args:
+ Attributes:
name: Task name.
routine_name: Routine function name.
stack_max_size: Max stack size.
- routine_address: Resolved routine address.
- """
- self.name = name
- self.routine_name = routine_name
- self.stack_max_size = stack_max_size
- self.routine_address = routine_address
-
- def __eq__(self, other):
- """Task equality.
-
- Args:
- other: The compared object.
-
- Returns:
- True if equal, False if not.
+ routine_address: Resolved routine address. None if it hasn't been resolved.
"""
- if not isinstance(other, Task):
- return False
- return (self.name == other.name and
- self.routine_name == other.routine_name and
- self.stack_max_size == other.stack_max_size and
- self.routine_address == other.routine_address)
+ def __init__(
+ self, name, routine_name, stack_max_size, routine_address=None
+ ):
+ """Constructor.
+
+ Args:
+ name: Task name.
+ routine_name: Routine function name.
+ stack_max_size: Max stack size.
+ routine_address: Resolved routine address.
+ """
+ self.name = name
+ self.routine_name = routine_name
+ self.stack_max_size = stack_max_size
+ self.routine_address = routine_address
+
+ def __eq__(self, other):
+ """Task equality.
+
+ Args:
+ other: The compared object.
+
+ Returns:
+ True if equal, False if not.
+ """
+ if not isinstance(other, Task):
+ return False
+
+ return (
+ self.name == other.name
+ and self.routine_name == other.routine_name
+ and self.stack_max_size == other.stack_max_size
+ and self.routine_address == other.routine_address
+ )
class Symbol(object):
- """Symbol information.
+ """Symbol information.
- Attributes:
- address: Symbol address.
- symtype: Symbol type, 'O' (data, object) or 'F' (function).
- size: Symbol size.
- name: Symbol name.
- """
-
- def __init__(self, address, symtype, size, name):
- """Constructor.
-
- Args:
+ Attributes:
address: Symbol address.
- symtype: Symbol type.
+ symtype: Symbol type, 'O' (data, object) or 'F' (function).
size: Symbol size.
name: Symbol name.
"""
- assert symtype in ['O', 'F']
- self.address = address
- self.symtype = symtype
- self.size = size
- self.name = name
- def __eq__(self, other):
- """Symbol equality.
-
- Args:
- other: The compared object.
-
- Returns:
- True if equal, False if not.
- """
- if not isinstance(other, Symbol):
- return False
-
- return (self.address == other.address and
- self.symtype == other.symtype and
- self.size == other.size and
- self.name == other.name)
+ def __init__(self, address, symtype, size, name):
+ """Constructor.
+
+ Args:
+ address: Symbol address.
+ symtype: Symbol type.
+ size: Symbol size.
+ name: Symbol name.
+ """
+ assert symtype in ["O", "F"]
+ self.address = address
+ self.symtype = symtype
+ self.size = size
+ self.name = name
+
+ def __eq__(self, other):
+ """Symbol equality.
+
+ Args:
+ other: The compared object.
+
+ Returns:
+ True if equal, False if not.
+ """
+ if not isinstance(other, Symbol):
+ return False
+
+ return (
+ self.address == other.address
+ and self.symtype == other.symtype
+ and self.size == other.size
+ and self.name == other.name
+ )
class Callsite(object):
- """Function callsite.
-
- Attributes:
- address: Address of callsite location. None if it is unknown.
- target: Callee address. None if it is unknown.
- is_tail: A bool indicates that it is a tailing call.
- callee: Resolved callee function. None if it hasn't been resolved.
- """
+ """Function callsite.
- def __init__(self, address, target, is_tail, callee=None):
- """Constructor.
-
- Args:
+ Attributes:
address: Address of callsite location. None if it is unknown.
target: Callee address. None if it is unknown.
- is_tail: A bool indicates that it is a tailing call. (function jump to
- another function without restoring the stack frame)
- callee: Resolved callee function.
+ is_tail: A bool indicates that it is a tailing call.
+ callee: Resolved callee function. None if it hasn't been resolved.
"""
- # It makes no sense that both address and target are unknown.
- assert not (address is None and target is None)
- self.address = address
- self.target = target
- self.is_tail = is_tail
- self.callee = callee
-
- def __eq__(self, other):
- """Callsite equality.
- Args:
- other: The compared object.
-
- Returns:
- True if equal, False if not.
- """
- if not isinstance(other, Callsite):
- return False
-
- if not (self.address == other.address and
- self.target == other.target and
- self.is_tail == other.is_tail):
- return False
-
- if self.callee is None:
- return other.callee is None
- elif other.callee is None:
- return False
-
- # Assume the addresses of functions are unique.
- return self.callee.address == other.callee.address
+ def __init__(self, address, target, is_tail, callee=None):
+ """Constructor.
+
+ Args:
+ address: Address of callsite location. None if it is unknown.
+ target: Callee address. None if it is unknown.
+ is_tail: A bool indicates that it is a tailing call. (function jump to
+ another function without restoring the stack frame)
+ callee: Resolved callee function.
+ """
+ # It makes no sense that both address and target are unknown.
+ assert not (address is None and target is None)
+ self.address = address
+ self.target = target
+ self.is_tail = is_tail
+ self.callee = callee
+
+ def __eq__(self, other):
+ """Callsite equality.
+
+ Args:
+ other: The compared object.
+
+ Returns:
+ True if equal, False if not.
+ """
+ if not isinstance(other, Callsite):
+ return False
+
+ if not (
+ self.address == other.address
+ and self.target == other.target
+ and self.is_tail == other.is_tail
+ ):
+ return False
+
+ if self.callee is None:
+ return other.callee is None
+ elif other.callee is None:
+ return False
+
+ # Assume the addresses of functions are unique.
+ return self.callee.address == other.callee.address
class Function(object):
- """Function.
+ """Function.
- Attributes:
- address: Address of function.
- name: Name of function from its symbol.
- stack_frame: Size of stack frame.
- callsites: Callsite list.
- stack_max_usage: Max stack usage. None if it hasn't been analyzed.
- stack_max_path: Max stack usage path. None if it hasn't been analyzed.
- """
-
- def __init__(self, address, name, stack_frame, callsites):
- """Constructor.
-
- Args:
+ Attributes:
address: Address of function.
name: Name of function from its symbol.
stack_frame: Size of stack frame.
callsites: Callsite list.
+ stack_max_usage: Max stack usage. None if it hasn't been analyzed.
+ stack_max_path: Max stack usage path. None if it hasn't been analyzed.
"""
- self.address = address
- self.name = name
- self.stack_frame = stack_frame
- self.callsites = callsites
- self.stack_max_usage = None
- self.stack_max_path = None
-
- def __eq__(self, other):
- """Function equality.
-
- Args:
- other: The compared object.
-
- Returns:
- True if equal, False if not.
- """
- if not isinstance(other, Function):
- return False
- if not (self.address == other.address and
- self.name == other.name and
- self.stack_frame == other.stack_frame and
- self.callsites == other.callsites and
- self.stack_max_usage == other.stack_max_usage):
- return False
+ def __init__(self, address, name, stack_frame, callsites):
+ """Constructor.
+
+ Args:
+ address: Address of function.
+ name: Name of function from its symbol.
+ stack_frame: Size of stack frame.
+ callsites: Callsite list.
+ """
+ self.address = address
+ self.name = name
+ self.stack_frame = stack_frame
+ self.callsites = callsites
+ self.stack_max_usage = None
+ self.stack_max_path = None
+
+ def __eq__(self, other):
+ """Function equality.
+
+ Args:
+ other: The compared object.
+
+ Returns:
+ True if equal, False if not.
+ """
+ if not isinstance(other, Function):
+ return False
+
+ if not (
+ self.address == other.address
+ and self.name == other.name
+ and self.stack_frame == other.stack_frame
+ and self.callsites == other.callsites
+ and self.stack_max_usage == other.stack_max_usage
+ ):
+ return False
+
+ if self.stack_max_path is None:
+ return other.stack_max_path is None
+ elif other.stack_max_path is None:
+ return False
+
+ if len(self.stack_max_path) != len(other.stack_max_path):
+ return False
+
+ for self_func, other_func in zip(
+ self.stack_max_path, other.stack_max_path
+ ):
+ # Assume the addresses of functions are unique.
+ if self_func.address != other_func.address:
+ return False
+
+ return True
+
+ def __hash__(self):
+ return id(self)
- if self.stack_max_path is None:
- return other.stack_max_path is None
- elif other.stack_max_path is None:
- return False
-
- if len(self.stack_max_path) != len(other.stack_max_path):
- return False
-
- for self_func, other_func in zip(self.stack_max_path, other.stack_max_path):
- # Assume the addresses of functions are unique.
- if self_func.address != other_func.address:
- return False
-
- return True
-
- def __hash__(self):
- return id(self)
class AndesAnalyzer(object):
- """Disassembly analyzer for Andes architecture.
-
- Public Methods:
- AnalyzeFunction: Analyze stack frame and callsites of the function.
- """
-
- GENERAL_PURPOSE_REGISTER_SIZE = 4
-
- # Possible condition code suffixes.
- CONDITION_CODES = [ 'eq', 'eqz', 'gez', 'gtz', 'lez', 'ltz', 'ne', 'nez',
- 'eqc', 'nec', 'nezs', 'nes', 'eqs']
- CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES))
-
- IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>'
- # Branch instructions.
- JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$' \
- .format(CONDITION_CODES_RE))
- # Call instructions.
- CALL_OPCODE_RE = re.compile \
- (r'^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$')
- CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE))
- # Ignore lp register because it's for return.
- INDIRECT_CALL_OPERAND_RE = re.compile \
- (r'^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$')
- # TODO: Handle other kinds of store instructions.
- PUSH_OPCODE_RE = re.compile(r'^push(\d{1,})$')
- PUSH_OPERAND_RE = re.compile(r'^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}')
- SMW_OPCODE_RE = re.compile(r'^smw(\.\w\w|\.\w\w\w)$')
- SMW_OPERAND_RE = re.compile(r'^(\$r\d{1,}|\$\wp), \[\$\wp\], '
- r'(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}')
- OPERANDGROUP_RE = re.compile(r'^\$r\d{1,}\~\$r\d{1,}')
-
- LWI_OPCODE_RE = re.compile(r'^lwi(\.\w\w)$')
- LWI_PC_OPERAND_RE = re.compile(r'^\$pc, \[([^\]]+)\]')
- # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec"
- # Assume there is always a "\t" after the hex data.
- DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+'
- r'(?P<words>[0-9A-Fa-f ]+)'
- r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?')
-
- def ParseInstruction(self, line, function_end):
- """Parse the line of instruction.
+ """Disassembly analyzer for Andes architecture.
- Args:
- line: Text of disassembly.
- function_end: End address of the current function. None if unknown.
-
- Returns:
- (address, words, opcode, operand_text): The instruction address, words,
- opcode, and the text of operands.
- None if it isn't an instruction line.
+ Public Methods:
+ AnalyzeFunction: Analyze stack frame and callsites of the function.
"""
- result = self.DISASM_REGEX_RE.match(line)
- if result is None:
- return None
-
- address = int(result.group('address'), 16)
- # Check if it's out of bound.
- if function_end is not None and address >= function_end:
- return None
-
- opcode = result.group('opcode').strip()
- operand_text = result.group('operand')
- words = result.group('words')
- if operand_text is None:
- operand_text = ''
- else:
- operand_text = operand_text.strip()
-
- return (address, words, opcode, operand_text)
-
- def AnalyzeFunction(self, function_symbol, instructions):
-
- stack_frame = 0
- callsites = []
- for address, words, opcode, operand_text in instructions:
- is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
- is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
-
- if is_jump_opcode or is_call_opcode:
- is_tail = is_jump_opcode
-
- result = self.CALL_OPERAND_RE.match(operand_text)
+ GENERAL_PURPOSE_REGISTER_SIZE = 4
+
+ # Possible condition code suffixes.
+ CONDITION_CODES = [
+ "eq",
+ "eqz",
+ "gez",
+ "gtz",
+ "lez",
+ "ltz",
+ "ne",
+ "nez",
+ "eqc",
+ "nec",
+ "nezs",
+ "nes",
+ "eqs",
+ ]
+ CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES))
+
+ IMM_ADDRESS_RE = r"([0-9A-Fa-f]+)\s+<([^>]+)>"
+ # Branch instructions.
+ JUMP_OPCODE_RE = re.compile(
+ r"^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$".format(CONDITION_CODES_RE)
+ )
+ # Call instructions.
+ CALL_OPCODE_RE = re.compile(
+ r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$"
+ )
+ CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE))
+ # Ignore lp register because it's for return.
+ INDIRECT_CALL_OPERAND_RE = re.compile(
+ r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$"
+ )
+ # TODO: Handle other kinds of store instructions.
+ PUSH_OPCODE_RE = re.compile(r"^push(\d{1,})$")
+ PUSH_OPERAND_RE = re.compile(r"^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}")
+ SMW_OPCODE_RE = re.compile(r"^smw(\.\w\w|\.\w\w\w)$")
+ SMW_OPERAND_RE = re.compile(
+ r"^(\$r\d{1,}|\$\wp), \[\$\wp\], "
+ r"(\$r\d{1,}|\$\wp), \#\d\w\d \! \{([^\]]+)\}"
+ )
+ OPERANDGROUP_RE = re.compile(r"^\$r\d{1,}\~\$r\d{1,}")
+
+ LWI_OPCODE_RE = re.compile(r"^lwi(\.\w\w)$")
+ LWI_PC_OPERAND_RE = re.compile(r"^\$pc, \[([^\]]+)\]")
+ # Example: "34280: 3f c8 0f ec addi.gp $fp, #0xfec"
+ # Assume there is always a "\t" after the hex data.
+ DISASM_REGEX_RE = re.compile(
+ r"^(?P<address>[0-9A-Fa-f]+):\s+"
+ r"(?P<words>[0-9A-Fa-f ]+)"
+ r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?"
+ )
+
+ def ParseInstruction(self, line, function_end):
+ """Parse the line of instruction.
+
+ Args:
+ line: Text of disassembly.
+ function_end: End address of the current function. None if unknown.
+
+ Returns:
+ (address, words, opcode, operand_text): The instruction address, words,
+ opcode, and the text of operands.
+ None if it isn't an instruction line.
+ """
+ result = self.DISASM_REGEX_RE.match(line)
if result is None:
- if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None):
- # Found an indirect call.
- callsites.append(Callsite(address, None, is_tail))
-
+ return None
+
+ address = int(result.group("address"), 16)
+ # Check if it's out of bound.
+ if function_end is not None and address >= function_end:
+ return None
+
+ opcode = result.group("opcode").strip()
+ operand_text = result.group("operand")
+ words = result.group("words")
+ if operand_text is None:
+ operand_text = ""
else:
- target_address = int(result.group(1), 16)
- # Filter out the in-function target (branches and in-function calls,
- # which are actually branches).
- if not (function_symbol.size > 0 and
- function_symbol.address < target_address <
- (function_symbol.address + function_symbol.size)):
- # Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
-
- elif self.LWI_OPCODE_RE.match(opcode) is not None:
- result = self.LWI_PC_OPERAND_RE.match(operand_text)
- if result is not None:
- # Ignore "lwi $pc, [$sp], xx" because it's usually a return.
- if result.group(1) != '$sp':
- # Found an indirect call.
- callsites.append(Callsite(address, None, True))
-
- elif self.PUSH_OPCODE_RE.match(opcode) is not None:
- # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp}
- if self.PUSH_OPERAND_RE.match(operand_text) is not None:
- # capture fc 20
- imm5u = int(words.split(' ')[1], 16)
- # sp = sp - (imm5u << 3)
- imm8u = (imm5u<<3) & 0xff
- stack_frame += imm8u
-
- result = self.PUSH_OPERAND_RE.match(operand_text)
- operandgroup_text = result.group(1)
- # capture $rx~$ry
- if self.OPERANDGROUP_RE.match(operandgroup_text) is not None:
- # capture number & transfer string to integer
- oprandgrouphead = operandgroup_text.split(',')[0]
- rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0])))
- ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1])))
-
- stack_frame += ((len(operandgroup_text.split(','))+ry-rx) *
- self.GENERAL_PURPOSE_REGISTER_SIZE)
- else:
- stack_frame += (len(operandgroup_text.split(',')) *
- self.GENERAL_PURPOSE_REGISTER_SIZE)
-
- elif self.SMW_OPCODE_RE.match(opcode) is not None:
- # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp}
- if self.SMW_OPERAND_RE.match(operand_text) is not None:
- result = self.SMW_OPERAND_RE.match(operand_text)
- operandgroup_text = result.group(3)
- # capture $rx~$ry
- if self.OPERANDGROUP_RE.match(operandgroup_text) is not None:
- # capture number & transfer string to integer
- oprandgrouphead = operandgroup_text.split(',')[0]
- rx=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[0])))
- ry=int(''.join(filter(str.isdigit, oprandgrouphead.split('~')[1])))
-
- stack_frame += ((len(operandgroup_text.split(','))+ry-rx) *
- self.GENERAL_PURPOSE_REGISTER_SIZE)
- else:
- stack_frame += (len(operandgroup_text.split(',')) *
- self.GENERAL_PURPOSE_REGISTER_SIZE)
-
- return (stack_frame, callsites)
+ operand_text = operand_text.strip()
+
+ return (address, words, opcode, operand_text)
+
+ def AnalyzeFunction(self, function_symbol, instructions):
+
+ stack_frame = 0
+ callsites = []
+ for address, words, opcode, operand_text in instructions:
+ is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
+ is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
+
+ if is_jump_opcode or is_call_opcode:
+ is_tail = is_jump_opcode
+
+ result = self.CALL_OPERAND_RE.match(operand_text)
+
+ if result is None:
+ if (
+ self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
+ # Found an indirect call.
+ callsites.append(Callsite(address, None, is_tail))
+
+ else:
+ target_address = int(result.group(1), 16)
+ # Filter out the in-function target (branches and in-function calls,
+ # which are actually branches).
+ if not (
+ function_symbol.size > 0
+ and function_symbol.address
+ < target_address
+ < (function_symbol.address + function_symbol.size)
+ ):
+ # Maybe it is a callsite.
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
+
+ elif self.LWI_OPCODE_RE.match(opcode) is not None:
+ result = self.LWI_PC_OPERAND_RE.match(operand_text)
+ if result is not None:
+ # Ignore "lwi $pc, [$sp], xx" because it's usually a return.
+ if result.group(1) != "$sp":
+ # Found an indirect call.
+ callsites.append(Callsite(address, None, True))
+
+ elif self.PUSH_OPCODE_RE.match(opcode) is not None:
+ # Example: fc 20 push25 $r8, #0 ! {$r6~$r8, $fp, $gp, $lp}
+ if self.PUSH_OPERAND_RE.match(operand_text) is not None:
+ # capture fc 20
+ imm5u = int(words.split(" ")[1], 16)
+ # sp = sp - (imm5u << 3)
+ imm8u = (imm5u << 3) & 0xFF
+ stack_frame += imm8u
+
+ result = self.PUSH_OPERAND_RE.match(operand_text)
+ operandgroup_text = result.group(1)
+ # capture $rx~$ry
+ if (
+ self.OPERANDGROUP_RE.match(operandgroup_text)
+ is not None
+ ):
+ # capture number & transfer string to integer
+ oprandgrouphead = operandgroup_text.split(",")[0]
+ rx = int(
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[0]
+ )
+ )
+ )
+ ry = int(
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[1]
+ )
+ )
+ )
+
+ stack_frame += (
+ len(operandgroup_text.split(",")) + ry - rx
+ ) * self.GENERAL_PURPOSE_REGISTER_SIZE
+ else:
+ stack_frame += (
+ len(operandgroup_text.split(","))
+ * self.GENERAL_PURPOSE_REGISTER_SIZE
+ )
+
+ elif self.SMW_OPCODE_RE.match(opcode) is not None:
+ # Example: smw.adm $r6, [$sp], $r10, #0x2 ! {$r6~$r10, $lp}
+ if self.SMW_OPERAND_RE.match(operand_text) is not None:
+ result = self.SMW_OPERAND_RE.match(operand_text)
+ operandgroup_text = result.group(3)
+ # capture $rx~$ry
+ if (
+ self.OPERANDGROUP_RE.match(operandgroup_text)
+ is not None
+ ):
+ # capture number & transfer string to integer
+ oprandgrouphead = operandgroup_text.split(",")[0]
+ rx = int(
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[0]
+ )
+ )
+ )
+ ry = int(
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[1]
+ )
+ )
+ )
+
+ stack_frame += (
+ len(operandgroup_text.split(",")) + ry - rx
+ ) * self.GENERAL_PURPOSE_REGISTER_SIZE
+ else:
+ stack_frame += (
+ len(operandgroup_text.split(","))
+ * self.GENERAL_PURPOSE_REGISTER_SIZE
+ )
+
+ return (stack_frame, callsites)
-class ArmAnalyzer(object):
- """Disassembly analyzer for ARM architecture.
-
- Public Methods:
- AnalyzeFunction: Analyze stack frame and callsites of the function.
- """
-
- GENERAL_PURPOSE_REGISTER_SIZE = 4
-
- # Possible condition code suffixes.
- CONDITION_CODES = ['', 'eq', 'ne', 'cs', 'hs', 'cc', 'lo', 'mi', 'pl', 'vs',
- 'vc', 'hi', 'ls', 'ge', 'lt', 'gt', 'le']
- CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES))
- # Assume there is no function name containing ">".
- IMM_ADDRESS_RE = r'([0-9A-Fa-f]+)\s+<([^>]+)>'
-
- # Fuzzy regular expressions for instruction and operand parsing.
- # Branch instructions.
- JUMP_OPCODE_RE = re.compile(
- r'^(b{0}|bx{0})(\.\w)?$'.format(CONDITION_CODES_RE))
- # Call instructions.
- CALL_OPCODE_RE = re.compile(
- r'^(bl{0}|blx{0})(\.\w)?$'.format(CONDITION_CODES_RE))
- CALL_OPERAND_RE = re.compile(r'^{}$'.format(IMM_ADDRESS_RE))
- CBZ_CBNZ_OPCODE_RE = re.compile(r'^(cbz|cbnz)(\.\w)?$')
- # Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>"
- CBZ_CBNZ_OPERAND_RE = re.compile(r'^[^,]+,\s+{}$'.format(IMM_ADDRESS_RE))
- # Ignore lr register because it's for return.
- INDIRECT_CALL_OPERAND_RE = re.compile(r'^r\d+|sb|sl|fp|ip|sp|pc$')
- # TODO(cheyuw): Handle conditional versions of following
- # instructions.
- # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc).
- LDR_OPCODE_RE = re.compile(r'^ldr(\.\w)?$')
- # Example: "pc, [sp], #4"
- LDR_PC_OPERAND_RE = re.compile(r'^pc, \[([^\]]+)\]')
- # TODO(cheyuw): Handle other kinds of stm instructions.
- PUSH_OPCODE_RE = re.compile(r'^push$')
- STM_OPCODE_RE = re.compile(r'^stmdb$')
- # Stack subtraction instructions.
- SUB_OPCODE_RE = re.compile(r'^sub(s|w)?(\.\w)?$')
- SUB_OPERAND_RE = re.compile(r'^sp[^#]+#(\d+)')
- # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68"
- # Assume there is always a "\t" after the hex data.
- DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+'
- r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?')
-
- def ParseInstruction(self, line, function_end):
- """Parse the line of instruction.
- Args:
- line: Text of disassembly.
- function_end: End address of the current function. None if unknown.
+class ArmAnalyzer(object):
+ """Disassembly analyzer for ARM architecture.
- Returns:
- (address, opcode, operand_text): The instruction address, opcode,
- and the text of operands. None if it
- isn't an instruction line.
+ Public Methods:
+ AnalyzeFunction: Analyze stack frame and callsites of the function.
"""
- result = self.DISASM_REGEX_RE.match(line)
- if result is None:
- return None
-
- address = int(result.group('address'), 16)
- # Check if it's out of bound.
- if function_end is not None and address >= function_end:
- return None
-
- opcode = result.group('opcode').strip()
- operand_text = result.group('operand')
- if operand_text is None:
- operand_text = ''
- else:
- operand_text = operand_text.strip()
-
- return (address, opcode, operand_text)
-
- def AnalyzeFunction(self, function_symbol, instructions):
- """Analyze function, resolve the size of stack frame and callsites.
-
- Args:
- function_symbol: Function symbol.
- instructions: Instruction list.
-
- Returns:
- (stack_frame, callsites): Size of stack frame, callsite list.
- """
- stack_frame = 0
- callsites = []
- for address, opcode, operand_text in instructions:
- is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
- is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
- is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None
- if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode:
- is_tail = is_jump_opcode or is_cbz_cbnz_opcode
-
- if is_cbz_cbnz_opcode:
- result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text)
- else:
- result = self.CALL_OPERAND_RE.match(operand_text)
+ GENERAL_PURPOSE_REGISTER_SIZE = 4
+
+ # Possible condition code suffixes.
+ CONDITION_CODES = [
+ "",
+ "eq",
+ "ne",
+ "cs",
+ "hs",
+ "cc",
+ "lo",
+ "mi",
+ "pl",
+ "vs",
+ "vc",
+ "hi",
+ "ls",
+ "ge",
+ "lt",
+ "gt",
+ "le",
+ ]
+ CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES))
+ # Assume there is no function name containing ">".
+ IMM_ADDRESS_RE = r"([0-9A-Fa-f]+)\s+<([^>]+)>"
+
+ # Fuzzy regular expressions for instruction and operand parsing.
+ # Branch instructions.
+ JUMP_OPCODE_RE = re.compile(
+ r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE)
+ )
+ # Call instructions.
+ CALL_OPCODE_RE = re.compile(
+ r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE)
+ )
+ CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE))
+ CBZ_CBNZ_OPCODE_RE = re.compile(r"^(cbz|cbnz)(\.\w)?$")
+ # Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>"
+ CBZ_CBNZ_OPERAND_RE = re.compile(r"^[^,]+,\s+{}$".format(IMM_ADDRESS_RE))
+ # Ignore lr register because it's for return.
+ INDIRECT_CALL_OPERAND_RE = re.compile(r"^r\d+|sb|sl|fp|ip|sp|pc$")
+ # TODO(cheyuw): Handle conditional versions of following
+ # instructions.
+ # TODO(cheyuw): Handle other kinds of pc modifying instructions (e.g. mov pc).
+ LDR_OPCODE_RE = re.compile(r"^ldr(\.\w)?$")
+ # Example: "pc, [sp], #4"
+ LDR_PC_OPERAND_RE = re.compile(r"^pc, \[([^\]]+)\]")
+ # TODO(cheyuw): Handle other kinds of stm instructions.
+ PUSH_OPCODE_RE = re.compile(r"^push$")
+ STM_OPCODE_RE = re.compile(r"^stmdb$")
+ # Stack subtraction instructions.
+ SUB_OPCODE_RE = re.compile(r"^sub(s|w)?(\.\w)?$")
+ SUB_OPERAND_RE = re.compile(r"^sp[^#]+#(\d+)")
+ # Example: "44d94: f893 0068 ldrb.w r0, [r3, #104] ; 0x68"
+ # Assume there is always a "\t" after the hex data.
+ DISASM_REGEX_RE = re.compile(
+ r"^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+"
+ r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?"
+ )
+
+ def ParseInstruction(self, line, function_end):
+ """Parse the line of instruction.
+
+ Args:
+ line: Text of disassembly.
+ function_end: End address of the current function. None if unknown.
+
+ Returns:
+ (address, opcode, operand_text): The instruction address, opcode,
+ and the text of operands. None if it
+ isn't an instruction line.
+ """
+ result = self.DISASM_REGEX_RE.match(line)
if result is None:
- # Failed to match immediate address, maybe it is an indirect call.
- # CBZ and CBNZ can't be indirect calls.
- if (not is_cbz_cbnz_opcode and
- self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None):
- # Found an indirect call.
- callsites.append(Callsite(address, None, is_tail))
+ return None
- else:
- target_address = int(result.group(1), 16)
- # Filter out the in-function target (branches and in-function calls,
- # which are actually branches).
- if not (function_symbol.size > 0 and
- function_symbol.address < target_address <
- (function_symbol.address + function_symbol.size)):
- # Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
-
- elif self.LDR_OPCODE_RE.match(opcode) is not None:
- result = self.LDR_PC_OPERAND_RE.match(operand_text)
- if result is not None:
- # Ignore "ldr pc, [sp], xx" because it's usually a return.
- if result.group(1) != 'sp':
- # Found an indirect call.
- callsites.append(Callsite(address, None, True))
-
- elif self.PUSH_OPCODE_RE.match(opcode) is not None:
- # Example: "{r4, r5, r6, r7, lr}"
- stack_frame += (len(operand_text.split(',')) *
- self.GENERAL_PURPOSE_REGISTER_SIZE)
- elif self.SUB_OPCODE_RE.match(opcode) is not None:
- result = self.SUB_OPERAND_RE.match(operand_text)
- if result is not None:
- stack_frame += int(result.group(1))
- else:
- # Unhandled stack register subtraction.
- assert not operand_text.startswith('sp')
+ address = int(result.group("address"), 16)
+ # Check if it's out of bound.
+ if function_end is not None and address >= function_end:
+ return None
- elif self.STM_OPCODE_RE.match(opcode) is not None:
- if operand_text.startswith('sp!'):
- # Subtract and writeback to stack register.
- # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}"
- # Get the text of pushed register list.
- unused_sp, unused_sep, parameter_text = operand_text.partition(',')
- stack_frame += (len(parameter_text.split(',')) *
- self.GENERAL_PURPOSE_REGISTER_SIZE)
+ opcode = result.group("opcode").strip()
+ operand_text = result.group("operand")
+ if operand_text is None:
+ operand_text = ""
+ else:
+ operand_text = operand_text.strip()
+
+ return (address, opcode, operand_text)
+
+ def AnalyzeFunction(self, function_symbol, instructions):
+ """Analyze function, resolve the size of stack frame and callsites.
+
+ Args:
+ function_symbol: Function symbol.
+ instructions: Instruction list.
+
+ Returns:
+ (stack_frame, callsites): Size of stack frame, callsite list.
+ """
+ stack_frame = 0
+ callsites = []
+ for address, opcode, operand_text in instructions:
+ is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
+ is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
+ is_cbz_cbnz_opcode = (
+ self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None
+ )
+ if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode:
+ is_tail = is_jump_opcode or is_cbz_cbnz_opcode
+
+ if is_cbz_cbnz_opcode:
+ result = self.CBZ_CBNZ_OPERAND_RE.match(operand_text)
+ else:
+ result = self.CALL_OPERAND_RE.match(operand_text)
+
+ if result is None:
+ # Failed to match immediate address, maybe it is an indirect call.
+ # CBZ and CBNZ can't be indirect calls.
+ if (
+ not is_cbz_cbnz_opcode
+ and self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
+ # Found an indirect call.
+ callsites.append(Callsite(address, None, is_tail))
+
+ else:
+ target_address = int(result.group(1), 16)
+ # Filter out the in-function target (branches and in-function calls,
+ # which are actually branches).
+ if not (
+ function_symbol.size > 0
+ and function_symbol.address
+ < target_address
+ < (function_symbol.address + function_symbol.size)
+ ):
+ # Maybe it is a callsite.
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
+
+ elif self.LDR_OPCODE_RE.match(opcode) is not None:
+ result = self.LDR_PC_OPERAND_RE.match(operand_text)
+ if result is not None:
+ # Ignore "ldr pc, [sp], xx" because it's usually a return.
+ if result.group(1) != "sp":
+ # Found an indirect call.
+ callsites.append(Callsite(address, None, True))
+
+ elif self.PUSH_OPCODE_RE.match(opcode) is not None:
+ # Example: "{r4, r5, r6, r7, lr}"
+ stack_frame += (
+ len(operand_text.split(","))
+ * self.GENERAL_PURPOSE_REGISTER_SIZE
+ )
+ elif self.SUB_OPCODE_RE.match(opcode) is not None:
+ result = self.SUB_OPERAND_RE.match(operand_text)
+ if result is not None:
+ stack_frame += int(result.group(1))
+ else:
+ # Unhandled stack register subtraction.
+ assert not operand_text.startswith("sp")
+
+ elif self.STM_OPCODE_RE.match(opcode) is not None:
+ if operand_text.startswith("sp!"):
+ # Subtract and writeback to stack register.
+ # Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}"
+ # Get the text of pushed register list.
+ (
+ unused_sp,
+ unused_sep,
+ parameter_text,
+ ) = operand_text.partition(",")
+ stack_frame += (
+ len(parameter_text.split(","))
+ * self.GENERAL_PURPOSE_REGISTER_SIZE
+ )
+
+ return (stack_frame, callsites)
- return (stack_frame, callsites)
class RiscvAnalyzer(object):
- """Disassembly analyzer for RISC-V architecture.
-
- Public Methods:
- AnalyzeFunction: Analyze stack frame and callsites of the function.
- """
-
- # Possible condition code suffixes.
- CONDITION_CODES = [ 'eqz', 'nez', 'lez', 'gez', 'ltz', 'gtz', 'gt', 'le',
- 'gtu', 'leu', 'eq', 'ne', 'ge', 'lt', 'ltu', 'geu']
- CONDITION_CODES_RE = '({})'.format('|'.join(CONDITION_CODES))
- # Branch instructions.
- JUMP_OPCODE_RE = re.compile(r'^(b{0}|j|jr)$'.format(CONDITION_CODES_RE))
- # Call instructions.
- CALL_OPCODE_RE = re.compile(r'^(jal|jalr)$')
- # Example: "j 8009b318 <set_state_prl_hr>" or
- # "jal ra,800a4394 <power_get_signals>" or
- # "bltu t0,t1,80080300 <data_loop>"
- JUMP_ADDRESS_RE = r'((\w(\w|\d\d),){0,2})([0-9A-Fa-f]+)\s+<([^>]+)>'
- CALL_OPERAND_RE = re.compile(r'^{}$'.format(JUMP_ADDRESS_RE))
- # Capture address, Example: 800a4394
- CAPTURE_ADDRESS = re.compile(r'[0-9A-Fa-f]{8}')
- # Indirect jump, Example: jalr a5
- INDIRECT_CALL_OPERAND_RE = re.compile(r'^t\d+|s\d+|a\d+$')
- # Example: addi
- ADDI_OPCODE_RE = re.compile(r'^addi$')
- # Allocate stack instructions.
- ADDI_OPERAND_RE = re.compile(r'^(sp,sp,-\d+)$')
- # Example: "800804b6: 1101 addi sp,sp,-32"
- DISASM_REGEX_RE = re.compile(r'^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+'
- r'\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?')
-
- def ParseInstruction(self, line, function_end):
- """Parse the line of instruction.
+ """Disassembly analyzer for RISC-V architecture.
- Args:
- line: Text of disassembly.
- function_end: End address of the current function. None if unknown.
-
- Returns:
- (address, opcode, operand_text): The instruction address, opcode,
- and the text of operands. None if it
- isn't an instruction line.
+ Public Methods:
+ AnalyzeFunction: Analyze stack frame and callsites of the function.
"""
- result = self.DISASM_REGEX_RE.match(line)
- if result is None:
- return None
-
- address = int(result.group('address'), 16)
- # Check if it's out of bound.
- if function_end is not None and address >= function_end:
- return None
-
- opcode = result.group('opcode').strip()
- operand_text = result.group('operand')
- if operand_text is None:
- operand_text = ''
- else:
- operand_text = operand_text.strip()
-
- return (address, opcode, operand_text)
-
- def AnalyzeFunction(self, function_symbol, instructions):
-
- stack_frame = 0
- callsites = []
- for address, opcode, operand_text in instructions:
- is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
- is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
- if is_jump_opcode or is_call_opcode:
- is_tail = is_jump_opcode
-
- result = self.CALL_OPERAND_RE.match(operand_text)
+ # Possible condition code suffixes.
+ CONDITION_CODES = [
+ "eqz",
+ "nez",
+ "lez",
+ "gez",
+ "ltz",
+ "gtz",
+ "gt",
+ "le",
+ "gtu",
+ "leu",
+ "eq",
+ "ne",
+ "ge",
+ "lt",
+ "ltu",
+ "geu",
+ ]
+ CONDITION_CODES_RE = "({})".format("|".join(CONDITION_CODES))
+ # Branch instructions.
+ JUMP_OPCODE_RE = re.compile(r"^(b{0}|j|jr)$".format(CONDITION_CODES_RE))
+ # Call instructions.
+ CALL_OPCODE_RE = re.compile(r"^(jal|jalr)$")
+ # Example: "j 8009b318 <set_state_prl_hr>" or
+ # "jal ra,800a4394 <power_get_signals>" or
+ # "bltu t0,t1,80080300 <data_loop>"
+ JUMP_ADDRESS_RE = r"((\w(\w|\d\d),){0,2})([0-9A-Fa-f]+)\s+<([^>]+)>"
+ CALL_OPERAND_RE = re.compile(r"^{}$".format(JUMP_ADDRESS_RE))
+ # Capture address, Example: 800a4394
+ CAPTURE_ADDRESS = re.compile(r"[0-9A-Fa-f]{8}")
+ # Indirect jump, Example: jalr a5
+ INDIRECT_CALL_OPERAND_RE = re.compile(r"^t\d+|s\d+|a\d+$")
+ # Example: addi
+ ADDI_OPCODE_RE = re.compile(r"^addi$")
+ # Allocate stack instructions.
+ ADDI_OPERAND_RE = re.compile(r"^(sp,sp,-\d+)$")
+ # Example: "800804b6: 1101 addi sp,sp,-32"
+ DISASM_REGEX_RE = re.compile(
+ r"^(?P<address>[0-9A-Fa-f]+):\s+[0-9A-Fa-f ]+"
+ r"\t\s*(?P<opcode>\S+)(\s+(?P<operand>[^;]*))?"
+ )
+
+ def ParseInstruction(self, line, function_end):
+ """Parse the line of instruction.
+
+ Args:
+ line: Text of disassembly.
+ function_end: End address of the current function. None if unknown.
+
+ Returns:
+ (address, opcode, operand_text): The instruction address, opcode,
+ and the text of operands. None if it
+ isn't an instruction line.
+ """
+ result = self.DISASM_REGEX_RE.match(line)
if result is None:
- if (self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None):
- # Found an indirect call.
- callsites.append(Callsite(address, None, is_tail))
-
- else:
- # Capture address form operand_text and then convert to string
- address_str = "".join(self.CAPTURE_ADDRESS.findall(operand_text))
- # String to integer
- target_address = int(address_str, 16)
- # Filter out the in-function target (branches and in-function calls,
- # which are actually branches).
- if not (function_symbol.size > 0 and
- function_symbol.address < target_address <
- (function_symbol.address + function_symbol.size)):
- # Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
-
- elif self.ADDI_OPCODE_RE.match(opcode) is not None:
- # Example: sp,sp,-32
- if self.ADDI_OPERAND_RE.match(operand_text) is not None:
- stack_frame += abs(int(operand_text.split(",")[2]))
-
- return (stack_frame, callsites)
-
-class StackAnalyzer(object):
- """Class to analyze stack usage.
-
- Public Methods:
- Analyze: Run the stack analysis.
- """
-
- C_FUNCTION_NAME = r'_A-Za-z0-9'
-
- # Assume there is no ":" in the path.
- # Example: "driver/accel_kionix.c:321 (discriminator 3)"
- ADDRTOLINE_RE = re.compile(
- r'^(?P<path>[^:]+):(?P<linenum>\d+)(\s+\(discriminator\s+\d+\))?$')
- # To eliminate the suffix appended by compilers, try to extract the
- # C function name from the prefix of symbol name.
- # Example: "SHA256_transform.constprop.28"
- FUNCTION_PREFIX_NAME_RE = re.compile(
- r'^(?P<name>[{0}]+)([^{0}].*)?$'.format(C_FUNCTION_NAME))
-
- # Errors of annotation resolving.
- ANNOTATION_ERROR_INVALID = 'invalid signature'
- ANNOTATION_ERROR_NOTFOUND = 'function is not found'
- ANNOTATION_ERROR_AMBIGUOUS = 'signature is ambiguous'
-
- def __init__(self, options, symbols, rodata, tasklist, annotation):
- """Constructor.
-
- Args:
- options: Namespace from argparse.parse_args().
- symbols: Symbol list.
- rodata: Content of .rodata section (offset, data)
- tasklist: Task list.
- annotation: Annotation config.
- """
- self.options = options
- self.symbols = symbols
- self.rodata_offset = rodata[0]
- self.rodata = rodata[1]
- self.tasklist = tasklist
- self.annotation = annotation
- self.address_to_line_cache = {}
-
- def AddressToLine(self, address, resolve_inline=False):
- """Convert address to line.
+ return None
- Args:
- address: Target address.
- resolve_inline: Output the stack of inlining.
-
- Returns:
- lines: List of the corresponding lines.
-
- Raises:
- StackAnalyzerError: If addr2line is failed.
- """
- cache_key = (address, resolve_inline)
- if cache_key in self.address_to_line_cache:
- return self.address_to_line_cache[cache_key]
-
- try:
- args = [self.options.addr2line,
- '-f',
- '-e',
- self.options.elf_path,
- '{:x}'.format(address)]
- if resolve_inline:
- args.append('-i')
-
- line_text = subprocess.check_output(args, encoding='utf-8')
- except subprocess.CalledProcessError:
- raise StackAnalyzerError('addr2line failed to resolve lines.')
- except OSError:
- raise StackAnalyzerError('Failed to run addr2line.')
-
- lines = [line.strip() for line in line_text.splitlines()]
- # Assume the output has at least one pair like "function\nlocation\n", and
- # they always show up in pairs.
- # Example: "handle_request\n
- # common/usb_pd_protocol.c:1191\n"
- assert len(lines) >= 2 and len(lines) % 2 == 0
-
- line_infos = []
- for index in range(0, len(lines), 2):
- (function_name, line_text) = lines[index:index + 2]
- if line_text in ['??:0', ':?']:
- line_infos.append(None)
- else:
- result = self.ADDRTOLINE_RE.match(line_text)
- # Assume the output is always well-formed.
- assert result is not None
- line_infos.append((function_name.strip(),
- os.path.realpath(result.group('path').strip()),
- int(result.group('linenum'))))
-
- self.address_to_line_cache[cache_key] = line_infos
- return line_infos
-
- def AnalyzeDisassembly(self, disasm_text):
- """Parse the disassembly text, analyze, and build a map of all functions.
-
- Args:
- disasm_text: Disassembly text.
+ address = int(result.group("address"), 16)
+ # Check if it's out of bound.
+ if function_end is not None and address >= function_end:
+ return None
- Returns:
- function_map: Dict of functions.
- """
- disasm_lines = [line.strip() for line in disasm_text.splitlines()]
-
- if 'nds' in disasm_lines[1]:
- analyzer = AndesAnalyzer()
- elif 'arm' in disasm_lines[1]:
- analyzer = ArmAnalyzer()
- elif 'riscv' in disasm_lines[1]:
- analyzer = RiscvAnalyzer()
- else:
- raise StackAnalyzerError('Unsupported architecture.')
-
- # Example: "08028c8c <motion_lid_calc>:"
- function_signature_regex = re.compile(
- r'^(?P<address>[0-9A-Fa-f]+)\s+<(?P<name>[^>]+)>:$')
-
- def DetectFunctionHead(line):
- """Check if the line is a function head.
-
- Args:
- line: Text of disassembly.
-
- Returns:
- symbol: Function symbol. None if it isn't a function head.
- """
- result = function_signature_regex.match(line)
- if result is None:
- return None
-
- address = int(result.group('address'), 16)
- symbol = symbol_map.get(address)
-
- # Check if the function exists and matches.
- if symbol is None or symbol.symtype != 'F':
- return None
-
- return symbol
-
- # Build symbol map, indexed by symbol address.
- symbol_map = {}
- for symbol in self.symbols:
- # If there are multiple symbols with same address, keeping any of them is
- # good enough.
- symbol_map[symbol.address] = symbol
-
- # Parse the disassembly text. We update the variable "line" to next line
- # when needed. There are two steps of parser:
- #
- # Step 1: Searching for the function head. Once reach the function head,
- # move to the next line, which is the first line of function body.
- #
- # Step 2: Parsing each instruction line of function body. Once reach a
- # non-instruction line, stop parsing and analyze the parsed instructions.
- #
- # Finally turn back to the step 1 without updating the line, because the
- # current non-instruction line can be another function head.
- function_map = {}
- # The following three variables are the states of the parsing processing.
- # They will be initialized properly during the state changes.
- function_symbol = None
- function_end = None
- instructions = []
-
- # Remove heading and tailing spaces for each line.
- line_index = 0
- while line_index < len(disasm_lines):
- # Get the current line.
- line = disasm_lines[line_index]
-
- if function_symbol is None:
- # Step 1: Search for the function head.
-
- function_symbol = DetectFunctionHead(line)
- if function_symbol is not None:
- # Assume there is no empty function. If the function head is followed
- # by EOF, it is an empty function.
- assert line_index + 1 < len(disasm_lines)
-
- # Found the function head, initialize and turn to the step 2.
- instructions = []
- # If symbol size exists, use it as a hint of function size.
- if function_symbol.size > 0:
- function_end = function_symbol.address + function_symbol.size
- else:
- function_end = None
-
- else:
- # Step 2: Parse the function body.
-
- instruction = analyzer.ParseInstruction(line, function_end)
- if instruction is not None:
- instructions.append(instruction)
-
- if instruction is None or line_index + 1 == len(disasm_lines):
- # Either the invalid instruction or EOF indicates the end of the
- # function, finalize the function analysis.
-
- # Assume there is no empty function.
- assert len(instructions) > 0
-
- (stack_frame, callsites) = analyzer.AnalyzeFunction(function_symbol,
- instructions)
- # Assume the function addresses are unique in the disassembly.
- assert function_symbol.address not in function_map
- function_map[function_symbol.address] = Function(
- function_symbol.address,
- function_symbol.name,
- stack_frame,
- callsites)
-
- # Initialize and turn back to the step 1.
- function_symbol = None
-
- # If the current line isn't an instruction, it can be another function
- # head, skip moving to the next line.
- if instruction is None:
- continue
-
- # Move to the next line.
- line_index += 1
-
- # Resolve callees of functions.
- for function in function_map.values():
- for callsite in function.callsites:
- if callsite.target is not None:
- # Remain the callee as None if we can't resolve it.
- callsite.callee = function_map.get(callsite.target)
-
- return function_map
+ opcode = result.group("opcode").strip()
+ operand_text = result.group("operand")
+ if operand_text is None:
+ operand_text = ""
+ else:
+ operand_text = operand_text.strip()
+
+ return (address, opcode, operand_text)
+
+ def AnalyzeFunction(self, function_symbol, instructions):
+
+ stack_frame = 0
+ callsites = []
+ for address, opcode, operand_text in instructions:
+ is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
+ is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
+
+ if is_jump_opcode or is_call_opcode:
+ is_tail = is_jump_opcode
+
+ result = self.CALL_OPERAND_RE.match(operand_text)
+ if result is None:
+ if (
+ self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
+ # Found an indirect call.
+ callsites.append(Callsite(address, None, is_tail))
+
+ else:
+ # Capture address form operand_text and then convert to string
+ address_str = "".join(
+ self.CAPTURE_ADDRESS.findall(operand_text)
+ )
+ # String to integer
+ target_address = int(address_str, 16)
+ # Filter out the in-function target (branches and in-function calls,
+ # which are actually branches).
+ if not (
+ function_symbol.size > 0
+ and function_symbol.address
+ < target_address
+ < (function_symbol.address + function_symbol.size)
+ ):
+ # Maybe it is a callsite.
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
+
+ elif self.ADDI_OPCODE_RE.match(opcode) is not None:
+ # Example: sp,sp,-32
+ if self.ADDI_OPERAND_RE.match(operand_text) is not None:
+ stack_frame += abs(int(operand_text.split(",")[2]))
+
+ return (stack_frame, callsites)
- def MapAnnotation(self, function_map, signature_set):
- """Map annotation signatures to functions.
- Args:
- function_map: Function map.
- signature_set: Set of annotation signatures.
+class StackAnalyzer(object):
+ """Class to analyze stack usage.
- Returns:
- Map of signatures to functions, map of signatures which can't be resolved.
+ Public Methods:
+ Analyze: Run the stack analysis.
"""
- # Build the symbol map indexed by symbol name. If there are multiple symbols
- # with the same name, add them into a set. (e.g. symbols of static function
- # with the same name)
- symbol_map = collections.defaultdict(set)
- for symbol in self.symbols:
- if symbol.symtype == 'F':
- # Function symbol.
- result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name)
- if result is not None:
- function = function_map.get(symbol.address)
- # Ignore the symbol not in disassembly.
- if function is not None:
- # If there are multiple symbol with the same name and point to the
- # same function, the set will deduplicate them.
- symbol_map[result.group('name').strip()].add(function)
-
- # Build the signature map indexed by annotation signature.
- signature_map = {}
- sig_error_map = {}
- symbol_path_map = {}
- for sig in signature_set:
- (name, path, _) = sig
-
- functions = symbol_map.get(name)
- if functions is None:
- sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND
- continue
-
- if name not in symbol_path_map:
- # Lazy symbol path resolving. Since the addr2line isn't fast, only
- # resolve needed symbol paths.
- group_map = collections.defaultdict(list)
- for function in functions:
- line_info = self.AddressToLine(function.address)[0]
- if line_info is None:
- continue
- (_, symbol_path, _) = line_info
+ C_FUNCTION_NAME = r"_A-Za-z0-9"
- # Group the functions with the same symbol signature (symbol name +
- # symbol path). Assume they are the same copies and do the same
- # annotation operations of them because we don't know which copy is
- # indicated by the users.
- group_map[symbol_path].append(function)
+ # Assume there is no ":" in the path.
+ # Example: "driver/accel_kionix.c:321 (discriminator 3)"
+ ADDRTOLINE_RE = re.compile(
+ r"^(?P<path>[^:]+):(?P<linenum>\d+)(\s+\(discriminator\s+\d+\))?$"
+ )
+ # To eliminate the suffix appended by compilers, try to extract the
+ # C function name from the prefix of symbol name.
+ # Example: "SHA256_transform.constprop.28"
+ FUNCTION_PREFIX_NAME_RE = re.compile(
+ r"^(?P<name>[{0}]+)([^{0}].*)?$".format(C_FUNCTION_NAME)
+ )
+
+ # Errors of annotation resolving.
+ ANNOTATION_ERROR_INVALID = "invalid signature"
+ ANNOTATION_ERROR_NOTFOUND = "function is not found"
+ ANNOTATION_ERROR_AMBIGUOUS = "signature is ambiguous"
+
+ def __init__(self, options, symbols, rodata, tasklist, annotation):
+ """Constructor.
+
+ Args:
+ options: Namespace from argparse.parse_args().
+ symbols: Symbol list.
+ rodata: Content of .rodata section (offset, data)
+ tasklist: Task list.
+ annotation: Annotation config.
+ """
+ self.options = options
+ self.symbols = symbols
+ self.rodata_offset = rodata[0]
+ self.rodata = rodata[1]
+ self.tasklist = tasklist
+ self.annotation = annotation
+ self.address_to_line_cache = {}
+
+ def AddressToLine(self, address, resolve_inline=False):
+ """Convert address to line.
+
+ Args:
+ address: Target address.
+ resolve_inline: Output the stack of inlining.
+
+ Returns:
+ lines: List of the corresponding lines.
+
+ Raises:
+ StackAnalyzerError: If addr2line is failed.
+ """
+ cache_key = (address, resolve_inline)
+ if cache_key in self.address_to_line_cache:
+ return self.address_to_line_cache[cache_key]
+
+ try:
+ args = [
+ self.options.addr2line,
+ "-f",
+ "-e",
+ self.options.elf_path,
+ "{:x}".format(address),
+ ]
+ if resolve_inline:
+ args.append("-i")
+
+ line_text = subprocess.check_output(args, encoding="utf-8")
+ except subprocess.CalledProcessError:
+ raise StackAnalyzerError("addr2line failed to resolve lines.")
+ except OSError:
+ raise StackAnalyzerError("Failed to run addr2line.")
+
+ lines = [line.strip() for line in line_text.splitlines()]
+ # Assume the output has at least one pair like "function\nlocation\n", and
+ # they always show up in pairs.
+ # Example: "handle_request\n
+ # common/usb_pd_protocol.c:1191\n"
+ assert len(lines) >= 2 and len(lines) % 2 == 0
+
+ line_infos = []
+ for index in range(0, len(lines), 2):
+ (function_name, line_text) = lines[index : index + 2]
+ if line_text in ["??:0", ":?"]:
+ line_infos.append(None)
+ else:
+ result = self.ADDRTOLINE_RE.match(line_text)
+ # Assume the output is always well-formed.
+ assert result is not None
+ line_infos.append(
+ (
+ function_name.strip(),
+ os.path.realpath(result.group("path").strip()),
+ int(result.group("linenum")),
+ )
+ )
+
+ self.address_to_line_cache[cache_key] = line_infos
+ return line_infos
+
+ def AnalyzeDisassembly(self, disasm_text):
+ """Parse the disassembly text, analyze, and build a map of all functions.
+
+ Args:
+ disasm_text: Disassembly text.
+
+ Returns:
+ function_map: Dict of functions.
+ """
+ disasm_lines = [line.strip() for line in disasm_text.splitlines()]
+
+ if "nds" in disasm_lines[1]:
+ analyzer = AndesAnalyzer()
+ elif "arm" in disasm_lines[1]:
+ analyzer = ArmAnalyzer()
+ elif "riscv" in disasm_lines[1]:
+ analyzer = RiscvAnalyzer()
+ else:
+ raise StackAnalyzerError("Unsupported architecture.")
- symbol_path_map[name] = group_map
+ # Example: "08028c8c <motion_lid_calc>:"
+ function_signature_regex = re.compile(
+ r"^(?P<address>[0-9A-Fa-f]+)\s+<(?P<name>[^>]+)>:$"
+ )
- # Symbol matching.
- function_group = None
- group_map = symbol_path_map[name]
- if len(group_map) > 0:
- if path is None:
- if len(group_map) > 1:
- # There is ambiguity but the path isn't specified.
- sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS
- continue
+ def DetectFunctionHead(line):
+ """Check if the line is a function head.
- # No path signature but all symbol signatures of functions are same.
- # Assume they are the same functions, so there is no ambiguity.
- (function_group,) = group_map.values()
- else:
- function_group = group_map.get(path)
+ Args:
+ line: Text of disassembly.
- if function_group is None:
- sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND
- continue
+ Returns:
+ symbol: Function symbol. None if it isn't a function head.
+ """
+ result = function_signature_regex.match(line)
+ if result is None:
+ return None
- # The function_group is a list of all the same functions (according to
- # our assumption) which should be annotated together.
- signature_map[sig] = function_group
+ address = int(result.group("address"), 16)
+ symbol = symbol_map.get(address)
- return (signature_map, sig_error_map)
+ # Check if the function exists and matches.
+ if symbol is None or symbol.symtype != "F":
+ return None
- def LoadAnnotation(self):
- """Load annotation rules.
+ return symbol
- Returns:
- Map of add rules, set of remove rules, set of text signatures which can't
- be parsed.
- """
- # Assume there is no ":" in the path.
- # Example: "get_range.lto.2501[driver/accel_kionix.c:327]"
- annotation_signature_regex = re.compile(
- r'^(?P<name>[^\[]+)(\[(?P<path>[^:]+)(:(?P<linenum>\d+))?\])?$')
-
- def NormalizeSignature(signature_text):
- """Parse and normalize the annotation signature.
-
- Args:
- signature_text: Text of the annotation signature.
-
- Returns:
- (function name, path, line number) of the signature. The path and line
- number can be None if not exist. None if failed to parse.
- """
- result = annotation_signature_regex.match(signature_text.strip())
- if result is None:
- return None
-
- name_result = self.FUNCTION_PREFIX_NAME_RE.match(
- result.group('name').strip())
- if name_result is None:
- return None
-
- path = result.group('path')
- if path is not None:
- path = os.path.realpath(path.strip())
-
- linenum = result.group('linenum')
- if linenum is not None:
- linenum = int(linenum.strip())
-
- return (name_result.group('name').strip(), path, linenum)
-
- def ExpandArray(dic):
- """Parse and expand a symbol array
-
- Args:
- dic: Dictionary for the array annotation
-
- Returns:
- array of (symbol name, None, None).
- """
- # TODO(drinkcat): This function is quite inefficient, as it goes through
- # the symbol table multiple times.
-
- begin_name = dic['name']
- end_name = dic['name'] + "_end"
- offset = dic['offset'] if 'offset' in dic else 0
- stride = dic['stride']
-
- begin_address = None
- end_address = None
-
- for symbol in self.symbols:
- if (symbol.name == begin_name):
- begin_address = symbol.address
- if (symbol.name == end_name):
- end_address = symbol.address
-
- if (not begin_address or not end_address):
- return None
-
- output = []
- # TODO(drinkcat): This is inefficient as we go from address to symbol
- # object then to symbol name, and later on we'll go back from symbol name
- # to symbol object.
- for addr in range(begin_address+offset, end_address, stride):
- # TODO(drinkcat): Not all architectures need to drop the first bit.
- val = self.rodata[(addr-self.rodata_offset) // 4] & 0xfffffffe
- name = None
+ # Build symbol map, indexed by symbol address.
+ symbol_map = {}
for symbol in self.symbols:
- if (symbol.address == val):
- result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name)
- name = result.group('name')
- break
-
- if not name:
- raise StackAnalyzerError('Cannot find function for address %s.',
- hex(val))
-
- output.append((name, None, None))
-
- return output
-
- add_rules = collections.defaultdict(set)
- remove_rules = list()
- invalid_sigtxts = set()
-
- if 'add' in self.annotation and self.annotation['add'] is not None:
- for src_sigtxt, dst_sigtxts in self.annotation['add'].items():
- src_sig = NormalizeSignature(src_sigtxt)
- if src_sig is None:
- invalid_sigtxts.add(src_sigtxt)
- continue
-
- for dst_sigtxt in dst_sigtxts:
- if isinstance(dst_sigtxt, dict):
- dst_sig = ExpandArray(dst_sigtxt)
- if dst_sig is None:
- invalid_sigtxts.add(str(dst_sigtxt))
+ # If there are multiple symbols with same address, keeping any of them is
+ # good enough.
+ symbol_map[symbol.address] = symbol
+
+ # Parse the disassembly text. We update the variable "line" to next line
+ # when needed. There are two steps of parser:
+ #
+ # Step 1: Searching for the function head. Once reach the function head,
+ # move to the next line, which is the first line of function body.
+ #
+ # Step 2: Parsing each instruction line of function body. Once reach a
+ # non-instruction line, stop parsing and analyze the parsed instructions.
+ #
+ # Finally turn back to the step 1 without updating the line, because the
+ # current non-instruction line can be another function head.
+ function_map = {}
+ # The following three variables are the states of the parsing processing.
+ # They will be initialized properly during the state changes.
+ function_symbol = None
+ function_end = None
+ instructions = []
+
+ # Remove heading and tailing spaces for each line.
+ line_index = 0
+ while line_index < len(disasm_lines):
+ # Get the current line.
+ line = disasm_lines[line_index]
+
+ if function_symbol is None:
+ # Step 1: Search for the function head.
+
+ function_symbol = DetectFunctionHead(line)
+ if function_symbol is not None:
+ # Assume there is no empty function. If the function head is followed
+ # by EOF, it is an empty function.
+ assert line_index + 1 < len(disasm_lines)
+
+ # Found the function head, initialize and turn to the step 2.
+ instructions = []
+ # If symbol size exists, use it as a hint of function size.
+ if function_symbol.size > 0:
+ function_end = (
+ function_symbol.address + function_symbol.size
+ )
+ else:
+ function_end = None
+
else:
- add_rules[src_sig].update(dst_sig)
- else:
- dst_sig = NormalizeSignature(dst_sigtxt)
- if dst_sig is None:
- invalid_sigtxts.add(dst_sigtxt)
+ # Step 2: Parse the function body.
+
+ instruction = analyzer.ParseInstruction(line, function_end)
+ if instruction is not None:
+ instructions.append(instruction)
+
+ if instruction is None or line_index + 1 == len(disasm_lines):
+ # Either the invalid instruction or EOF indicates the end of the
+ # function, finalize the function analysis.
+
+ # Assume there is no empty function.
+ assert len(instructions) > 0
+
+ (stack_frame, callsites) = analyzer.AnalyzeFunction(
+ function_symbol, instructions
+ )
+ # Assume the function addresses are unique in the disassembly.
+ assert function_symbol.address not in function_map
+ function_map[function_symbol.address] = Function(
+ function_symbol.address,
+ function_symbol.name,
+ stack_frame,
+ callsites,
+ )
+
+ # Initialize and turn back to the step 1.
+ function_symbol = None
+
+ # If the current line isn't an instruction, it can be another function
+ # head, skip moving to the next line.
+ if instruction is None:
+ continue
+
+ # Move to the next line.
+ line_index += 1
+
+ # Resolve callees of functions.
+ for function in function_map.values():
+ for callsite in function.callsites:
+ if callsite.target is not None:
+ # Remain the callee as None if we can't resolve it.
+ callsite.callee = function_map.get(callsite.target)
+
+ return function_map
+
+ def MapAnnotation(self, function_map, signature_set):
+ """Map annotation signatures to functions.
+
+ Args:
+ function_map: Function map.
+ signature_set: Set of annotation signatures.
+
+ Returns:
+ Map of signatures to functions, map of signatures which can't be resolved.
+ """
+ # Build the symbol map indexed by symbol name. If there are multiple symbols
+ # with the same name, add them into a set. (e.g. symbols of static function
+ # with the same name)
+ symbol_map = collections.defaultdict(set)
+ for symbol in self.symbols:
+ if symbol.symtype == "F":
+ # Function symbol.
+ result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name)
+ if result is not None:
+ function = function_map.get(symbol.address)
+ # Ignore the symbol not in disassembly.
+ if function is not None:
+ # If there are multiple symbol with the same name and point to the
+ # same function, the set will deduplicate them.
+ symbol_map[result.group("name").strip()].add(function)
+
+ # Build the signature map indexed by annotation signature.
+ signature_map = {}
+ sig_error_map = {}
+ symbol_path_map = {}
+ for sig in signature_set:
+ (name, path, _) = sig
+
+ functions = symbol_map.get(name)
+ if functions is None:
+ sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND
+ continue
+
+ if name not in symbol_path_map:
+ # Lazy symbol path resolving. Since the addr2line isn't fast, only
+ # resolve needed symbol paths.
+ group_map = collections.defaultdict(list)
+ for function in functions:
+ line_info = self.AddressToLine(function.address)[0]
+ if line_info is None:
+ continue
+
+ (_, symbol_path, _) = line_info
+
+ # Group the functions with the same symbol signature (symbol name +
+ # symbol path). Assume they are the same copies and do the same
+ # annotation operations of them because we don't know which copy is
+ # indicated by the users.
+ group_map[symbol_path].append(function)
+
+ symbol_path_map[name] = group_map
+
+ # Symbol matching.
+ function_group = None
+ group_map = symbol_path_map[name]
+ if len(group_map) > 0:
+ if path is None:
+ if len(group_map) > 1:
+ # There is ambiguity but the path isn't specified.
+ sig_error_map[sig] = self.ANNOTATION_ERROR_AMBIGUOUS
+ continue
+
+ # No path signature but all symbol signatures of functions are same.
+ # Assume they are the same functions, so there is no ambiguity.
+ (function_group,) = group_map.values()
+ else:
+ function_group = group_map.get(path)
+
+ if function_group is None:
+ sig_error_map[sig] = self.ANNOTATION_ERROR_NOTFOUND
+ continue
+
+ # The function_group is a list of all the same functions (according to
+ # our assumption) which should be annotated together.
+ signature_map[sig] = function_group
+
+ return (signature_map, sig_error_map)
+
+ def LoadAnnotation(self):
+ """Load annotation rules.
+
+ Returns:
+ Map of add rules, set of remove rules, set of text signatures which can't
+ be parsed.
+ """
+ # Assume there is no ":" in the path.
+ # Example: "get_range.lto.2501[driver/accel_kionix.c:327]"
+ annotation_signature_regex = re.compile(
+ r"^(?P<name>[^\[]+)(\[(?P<path>[^:]+)(:(?P<linenum>\d+))?\])?$"
+ )
+
+ def NormalizeSignature(signature_text):
+ """Parse and normalize the annotation signature.
+
+ Args:
+ signature_text: Text of the annotation signature.
+
+ Returns:
+ (function name, path, line number) of the signature. The path and line
+ number can be None if not exist. None if failed to parse.
+ """
+ result = annotation_signature_regex.match(signature_text.strip())
+ if result is None:
+ return None
+
+ name_result = self.FUNCTION_PREFIX_NAME_RE.match(
+ result.group("name").strip()
+ )
+ if name_result is None:
+ return None
+
+ path = result.group("path")
+ if path is not None:
+ path = os.path.realpath(path.strip())
+
+ linenum = result.group("linenum")
+ if linenum is not None:
+ linenum = int(linenum.strip())
+
+ return (name_result.group("name").strip(), path, linenum)
+
+ def ExpandArray(dic):
+ """Parse and expand a symbol array
+
+ Args:
+ dic: Dictionary for the array annotation
+
+ Returns:
+ array of (symbol name, None, None).
+ """
+ # TODO(drinkcat): This function is quite inefficient, as it goes through
+ # the symbol table multiple times.
+
+ begin_name = dic["name"]
+ end_name = dic["name"] + "_end"
+ offset = dic["offset"] if "offset" in dic else 0
+ stride = dic["stride"]
+
+ begin_address = None
+ end_address = None
+
+ for symbol in self.symbols:
+ if symbol.name == begin_name:
+ begin_address = symbol.address
+ if symbol.name == end_name:
+ end_address = symbol.address
+
+ if not begin_address or not end_address:
+ return None
+
+ output = []
+ # TODO(drinkcat): This is inefficient as we go from address to symbol
+ # object then to symbol name, and later on we'll go back from symbol name
+ # to symbol object.
+ for addr in range(begin_address + offset, end_address, stride):
+ # TODO(drinkcat): Not all architectures need to drop the first bit.
+ val = self.rodata[(addr - self.rodata_offset) // 4] & 0xFFFFFFFE
+ name = None
+ for symbol in self.symbols:
+ if symbol.address == val:
+ result = self.FUNCTION_PREFIX_NAME_RE.match(symbol.name)
+ name = result.group("name")
+ break
+
+ if not name:
+ raise StackAnalyzerError(
+ "Cannot find function for address %s." % hex(val)
+ )
+
+ output.append((name, None, None))
+
+ return output
+
+ add_rules = collections.defaultdict(set)
+ remove_rules = list()
+ invalid_sigtxts = set()
+
+ if "add" in self.annotation and self.annotation["add"] is not None:
+ for src_sigtxt, dst_sigtxts in self.annotation["add"].items():
+ src_sig = NormalizeSignature(src_sigtxt)
+ if src_sig is None:
+ invalid_sigtxts.add(src_sigtxt)
+ continue
+
+ for dst_sigtxt in dst_sigtxts:
+ if isinstance(dst_sigtxt, dict):
+ dst_sig = ExpandArray(dst_sigtxt)
+ if dst_sig is None:
+ invalid_sigtxts.add(str(dst_sigtxt))
+ else:
+ add_rules[src_sig].update(dst_sig)
+ else:
+ dst_sig = NormalizeSignature(dst_sigtxt)
+ if dst_sig is None:
+ invalid_sigtxts.add(dst_sigtxt)
+ else:
+ add_rules[src_sig].add(dst_sig)
+
+ if (
+ "remove" in self.annotation
+ and self.annotation["remove"] is not None
+ ):
+ for sigtxt_path in self.annotation["remove"]:
+ if isinstance(sigtxt_path, str):
+ # The path has only one vertex.
+ sigtxt_path = [sigtxt_path]
+
+ if len(sigtxt_path) == 0:
+ continue
+
+ # Generate multiple remove paths from all the combinations of the
+ # signatures of each vertex.
+ sig_paths = [[]]
+ broken_flag = False
+ for sigtxt_node in sigtxt_path:
+ if isinstance(sigtxt_node, str):
+ # The vertex has only one signature.
+ sigtxt_set = {sigtxt_node}
+ elif isinstance(sigtxt_node, list):
+ # The vertex has multiple signatures.
+ sigtxt_set = set(sigtxt_node)
+ else:
+ # Assume the format of annotation is verified. There should be no
+ # invalid case.
+ assert False
+
+ sig_set = set()
+ for sigtxt in sigtxt_set:
+ sig = NormalizeSignature(sigtxt)
+ if sig is None:
+ invalid_sigtxts.add(sigtxt)
+ broken_flag = True
+ elif not broken_flag:
+ sig_set.add(sig)
+
+ if broken_flag:
+ continue
+
+ # Append each signature of the current node to the all previous
+ # remove paths.
+ sig_paths = [
+ path + [sig] for path in sig_paths for sig in sig_set
+ ]
+
+ if not broken_flag:
+ # All signatures are normalized. The remove path has no error.
+ remove_rules.extend(sig_paths)
+
+ return (add_rules, remove_rules, invalid_sigtxts)
+
+ def ResolveAnnotation(self, function_map):
+ """Resolve annotation.
+
+ Args:
+ function_map: Function map.
+
+ Returns:
+ Set of added call edges, list of remove paths, set of eliminated
+ callsite addresses, set of annotation signatures which can't be resolved.
+ """
+
+ def StringifySignature(signature):
+ """Stringify the tupled signature.
+
+ Args:
+ signature: Tupled signature.
+
+ Returns:
+ Signature string.
+ """
+ (name, path, linenum) = signature
+ bracket_text = ""
+ if path is not None:
+ path = os.path.relpath(path)
+ if linenum is None:
+ bracket_text = "[{}]".format(path)
+ else:
+ bracket_text = "[{}:{}]".format(path, linenum)
+
+ return name + bracket_text
+
+ (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation()
+
+ signature_set = set()
+ for src_sig, dst_sigs in add_rules.items():
+ signature_set.add(src_sig)
+ signature_set.update(dst_sigs)
+
+ for remove_sigs in remove_rules:
+ signature_set.update(remove_sigs)
+
+ # Map signatures to functions.
+ (signature_map, sig_error_map) = self.MapAnnotation(
+ function_map, signature_set
+ )
+
+ # Build the indirect callsite map indexed by callsite signature.
+ indirect_map = collections.defaultdict(set)
+ for function in function_map.values():
+ for callsite in function.callsites:
+ if callsite.target is not None:
+ continue
+
+ # Found an indirect callsite.
+ line_info = self.AddressToLine(callsite.address)[0]
+ if line_info is None:
+ continue
+
+ (name, path, linenum) = line_info
+ result = self.FUNCTION_PREFIX_NAME_RE.match(name)
+ if result is None:
+ continue
+
+ indirect_map[(result.group("name").strip(), path, linenum)].add(
+ (function, callsite.address)
+ )
+
+ # Generate the annotation sets.
+ add_set = set()
+ remove_list = list()
+ eliminated_addrs = set()
+
+ for src_sig, dst_sigs in add_rules.items():
+ src_funcs = set(signature_map.get(src_sig, []))
+ # Try to match the source signature to the indirect callsites. Even if it
+ # can't be found in disassembly.
+ indirect_calls = indirect_map.get(src_sig)
+ if indirect_calls is not None:
+ for function, callsite_address in indirect_calls:
+ # Add the caller of the indirect callsite to the source functions.
+ src_funcs.add(function)
+ # Assume each callsite can be represented by a unique address.
+ eliminated_addrs.add(callsite_address)
+
+ if src_sig in sig_error_map:
+ # Assume the error is always the not found error. Since the signature
+ # found in indirect callsite map must be a full signature, it can't
+ # happen the ambiguous error.
+ assert (
+ sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND
+ )
+ # Found in inline stack, remove the not found error.
+ del sig_error_map[src_sig]
+
+ for dst_sig in dst_sigs:
+ dst_funcs = signature_map.get(dst_sig)
+ if dst_funcs is None:
+ continue
+
+ # Duplicate the call edge for all the same source and destination
+ # functions.
+ for src_func in src_funcs:
+ for dst_func in dst_funcs:
+ add_set.add((src_func, dst_func))
+
+ for remove_sigs in remove_rules:
+ # Since each signature can be mapped to multiple functions, generate
+ # multiple remove paths from all the combinations of these functions.
+ remove_paths = [[]]
+ skip_flag = False
+ for remove_sig in remove_sigs:
+ # Transform each signature to the corresponding functions.
+ remove_funcs = signature_map.get(remove_sig)
+ if remove_funcs is None:
+ # There is an unresolved signature in the remove path. Ignore the
+ # whole broken remove path.
+ skip_flag = True
+ break
+ else:
+ # Append each function of the current signature to the all previous
+ # remove paths.
+ remove_paths = [
+ p + [f] for p in remove_paths for f in remove_funcs
+ ]
+
+ if skip_flag:
+ # Ignore the broken remove path.
+ continue
+
+ for remove_path in remove_paths:
+ # Deduplicate the remove paths.
+ if remove_path not in remove_list:
+ remove_list.append(remove_path)
+
+ # Format the error messages.
+ failed_sigtxts = set()
+ for sigtxt in invalid_sigtxts:
+ failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID))
+
+ for sig, error in sig_error_map.items():
+ failed_sigtxts.add((StringifySignature(sig), error))
+
+ return (add_set, remove_list, eliminated_addrs, failed_sigtxts)
+
+ def PreprocessAnnotation(
+ self, function_map, add_set, remove_list, eliminated_addrs
+ ):
+ """Preprocess the annotation and callgraph.
+
+ Add the missing call edges, and delete simple remove paths (the paths have
+ one or two vertices) from the function_map.
+
+ Eliminate the annotated indirect callsites.
+
+ Return the remaining remove list.
+
+ Args:
+ function_map: Function map.
+ add_set: Set of missing call edges.
+ remove_list: List of remove paths.
+ eliminated_addrs: Set of eliminated callsite addresses.
+
+ Returns:
+ List of remaining remove paths.
+ """
+
+ def CheckEdge(path):
+ """Check if all edges of the path are on the callgraph.
+
+ Args:
+ path: Path.
+
+ Returns:
+ True or False.
+ """
+ for index in range(len(path) - 1):
+ if (path[index], path[index + 1]) not in edge_set:
+ return False
+
+ return True
+
+ for src_func, dst_func in add_set:
+ # TODO(cheyuw): Support tailing call annotation.
+ src_func.callsites.append(
+ Callsite(None, dst_func.address, False, dst_func)
+ )
+
+ # Delete simple remove paths.
+ remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2)
+ edge_set = set()
+ for function in function_map.values():
+ cleaned_callsites = []
+ for callsite in function.callsites:
+ if (callsite.callee,) in remove_simple or (
+ function,
+ callsite.callee,
+ ) in remove_simple:
+ continue
+
+ if (
+ callsite.target is None
+ and callsite.address in eliminated_addrs
+ ):
+ continue
+
+ cleaned_callsites.append(callsite)
+ if callsite.callee is not None:
+ edge_set.add((function, callsite.callee))
+
+ function.callsites = cleaned_callsites
+
+ return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)]
+
+ def AnalyzeCallGraph(self, function_map, remove_list):
+ """Analyze callgraph.
+
+ It will update the max stack size and path for each function.
+
+ Args:
+ function_map: Function map.
+ remove_list: List of remove paths.
+
+ Returns:
+ List of function cycles.
+ """
+
+ def Traverse(curr_state):
+ """Traverse the callgraph and calculate the max stack usages of functions.
+
+ Args:
+ curr_state: Current state.
+
+ Returns:
+ SCC lowest link.
+ """
+ scc_index = scc_index_counter[0]
+ scc_index_counter[0] += 1
+ scc_index_map[curr_state] = scc_index
+ scc_lowlink = scc_index
+ scc_stack.append(curr_state)
+ # Push the current state in the stack. We can use a set to maintain this
+ # because the stacked states are unique; otherwise we will find a cycle
+ # first.
+ stacked_states.add(curr_state)
+
+ (curr_address, curr_positions) = curr_state
+ curr_func = function_map[curr_address]
+
+ invalid_flag = False
+ new_positions = list(curr_positions)
+ for index, position in enumerate(curr_positions):
+ remove_path = remove_list[index]
+
+ # The position of each remove path in the state is the length of the
+ # longest matching path between the prefix of the remove path and the
+ # suffix of the current traversing path. We maintain this length when
+ # appending the next callee to the traversing path. And it can be used
+ # to check if the remove path appears in the traversing path.
+
+ # TODO(cheyuw): Implement KMP algorithm to match remove paths
+ # efficiently.
+ if remove_path[position] is curr_func:
+ # Matches the current function, extend the length.
+ new_positions[index] = position + 1
+ if new_positions[index] == len(remove_path):
+ # The length of the longest matching path is equal to the length of
+ # the remove path, which means the suffix of the current traversing
+ # path matches the remove path.
+ invalid_flag = True
+ break
+
+ else:
+ # We can't get the new longest matching path by extending the previous
+ # one directly. Fallback to search the new longest matching path.
+
+ # If we can't find any matching path in the following search, reset
+ # the matching length to 0.
+ new_positions[index] = 0
+
+ # We want to find the new longest matching prefix of remove path with
+ # the suffix of the current traversing path. Because the new longest
+ # matching path won't be longer than the prevous one now, and part of
+ # the suffix matches the prefix of remove path, we can get the needed
+ # suffix from the previous matching prefix of the invalid path.
+ suffix = remove_path[:position] + [curr_func]
+ for offset in range(1, len(suffix)):
+ length = position - offset
+ if remove_path[:length] == suffix[offset:]:
+ new_positions[index] = length
+ break
+
+ new_positions = tuple(new_positions)
+
+ # If the current suffix is invalid, set the max stack usage to 0.
+ max_stack_usage = 0
+ max_callee_state = None
+ self_loop = False
+
+ if not invalid_flag:
+ # Max stack usage is at least equal to the stack frame.
+ max_stack_usage = curr_func.stack_frame
+ for callsite in curr_func.callsites:
+ callee = callsite.callee
+ if callee is None:
+ continue
+
+ callee_state = (callee.address, new_positions)
+ if callee_state not in scc_index_map:
+ # Unvisited state.
+ scc_lowlink = min(scc_lowlink, Traverse(callee_state))
+ elif callee_state in stacked_states:
+ # The state is shown in the stack. There is a cycle.
+ sub_stack_usage = 0
+ scc_lowlink = min(
+ scc_lowlink, scc_index_map[callee_state]
+ )
+ if callee_state == curr_state:
+ self_loop = True
+
+ done_result = done_states.get(callee_state)
+ if done_result is not None:
+ # Already done this state and use its result. If the state reaches a
+ # cycle, reusing the result will cause inaccuracy (the stack usage
+ # of cycle depends on where the entrance is). But it's fine since we
+ # can't get accurate stack usage under this situation, and we rely
+ # on user-provided annotations to break the cycle, after which the
+ # result will be accurate again.
+ (sub_stack_usage, _) = done_result
+
+ if callsite.is_tail:
+ # For tailing call, since the callee reuses the stack frame of the
+ # caller, choose the larger one directly.
+ stack_usage = max(
+ curr_func.stack_frame, sub_stack_usage
+ )
+ else:
+ stack_usage = (
+ curr_func.stack_frame + sub_stack_usage
+ )
+
+ if stack_usage > max_stack_usage:
+ max_stack_usage = stack_usage
+ max_callee_state = callee_state
+
+ if scc_lowlink == scc_index:
+ group = []
+ while scc_stack[-1] != curr_state:
+ scc_state = scc_stack.pop()
+ stacked_states.remove(scc_state)
+ group.append(scc_state)
+
+ scc_stack.pop()
+ stacked_states.remove(curr_state)
+
+ # If the cycle is not empty, record it.
+ if len(group) > 0 or self_loop:
+ group.append(curr_state)
+ cycle_groups.append(group)
+
+ # Store the done result.
+ done_states[curr_state] = (max_stack_usage, max_callee_state)
+
+ if curr_positions == initial_positions:
+ # If the current state is initial state, we traversed the callgraph by
+ # using the current function as start point. Update the stack usage of
+ # the function.
+ # If the function matches a single vertex remove path, this will set its
+ # max stack usage to 0, which is not expected (we still calculate its
+ # max stack usage, but prevent any function from calling it). However,
+ # all the single vertex remove paths have been preprocessed and removed.
+ curr_func.stack_max_usage = max_stack_usage
+
+ # Reconstruct the max stack path by traversing the state transitions.
+ max_stack_path = [curr_func]
+ callee_state = max_callee_state
+ while callee_state is not None:
+ # The first element of state tuple is function address.
+ max_stack_path.append(function_map[callee_state[0]])
+ done_result = done_states.get(callee_state)
+ # All of the descendants should be done.
+ assert done_result is not None
+ (_, callee_state) = done_result
+
+ curr_func.stack_max_path = max_stack_path
+
+ return scc_lowlink
+
+ # The state is the concatenation of the current function address and the
+ # state of matching position.
+ initial_positions = (0,) * len(remove_list)
+ done_states = {}
+ stacked_states = set()
+ scc_index_counter = [0]
+ scc_index_map = {}
+ scc_stack = []
+ cycle_groups = []
+ for function in function_map.values():
+ if function.stack_max_usage is None:
+ Traverse((function.address, initial_positions))
+
+ cycle_functions = []
+ for group in cycle_groups:
+ cycle = set(function_map[state[0]] for state in group)
+ if cycle not in cycle_functions:
+ cycle_functions.append(cycle)
+
+ return cycle_functions
+
+ def Analyze(self):
+ """Run the stack analysis.
+
+ Raises:
+ StackAnalyzerError: If disassembly fails.
+ """
+
+ def OutputInlineStack(address, prefix=""):
+ """Output beautiful inline stack.
+
+ Args:
+ address: Address.
+ prefix: Prefix of each line.
+
+ Returns:
+ Key for sorting, output text
+ """
+ line_infos = self.AddressToLine(address, True)
+
+ if line_infos[0] is None:
+ order_key = (None, None)
else:
- add_rules[src_sig].add(dst_sig)
-
- if 'remove' in self.annotation and self.annotation['remove'] is not None:
- for sigtxt_path in self.annotation['remove']:
- if isinstance(sigtxt_path, str):
- # The path has only one vertex.
- sigtxt_path = [sigtxt_path]
-
- if len(sigtxt_path) == 0:
- continue
-
- # Generate multiple remove paths from all the combinations of the
- # signatures of each vertex.
- sig_paths = [[]]
- broken_flag = False
- for sigtxt_node in sigtxt_path:
- if isinstance(sigtxt_node, str):
- # The vertex has only one signature.
- sigtxt_set = {sigtxt_node}
- elif isinstance(sigtxt_node, list):
- # The vertex has multiple signatures.
- sigtxt_set = set(sigtxt_node)
- else:
- # Assume the format of annotation is verified. There should be no
- # invalid case.
- assert False
-
- sig_set = set()
- for sigtxt in sigtxt_set:
- sig = NormalizeSignature(sigtxt)
- if sig is None:
- invalid_sigtxts.add(sigtxt)
- broken_flag = True
- elif not broken_flag:
- sig_set.add(sig)
-
- if broken_flag:
- continue
-
- # Append each signature of the current node to the all previous
- # remove paths.
- sig_paths = [path + [sig] for path in sig_paths for sig in sig_set]
-
- if not broken_flag:
- # All signatures are normalized. The remove path has no error.
- remove_rules.extend(sig_paths)
-
- return (add_rules, remove_rules, invalid_sigtxts)
+ (_, path, linenum) = line_infos[0]
+ order_key = (linenum, path)
+
+ line_texts = []
+ for line_info in reversed(line_infos):
+ if line_info is None:
+ (function_name, path, linenum) = ("??", "??", 0)
+ else:
+ (function_name, path, linenum) = line_info
+
+ line_texts.append(
+ "{}[{}:{}]".format(
+ function_name, os.path.relpath(path), linenum
+ )
+ )
+
+ output = "{}-> {} {:x}\n".format(prefix, line_texts[0], address)
+ for depth, line_text in enumerate(line_texts[1:]):
+ output += "{} {}- {}\n".format(
+ prefix, " " * depth, line_text
+ )
+
+ # Remove the last newline character.
+ return (order_key, output.rstrip("\n"))
+
+ # Analyze disassembly.
+ try:
+ disasm_text = subprocess.check_output(
+ [self.options.objdump, "-d", self.options.elf_path],
+ encoding="utf-8",
+ )
+ except subprocess.CalledProcessError:
+ raise StackAnalyzerError("objdump failed to disassemble.")
+ except OSError:
+ raise StackAnalyzerError("Failed to run objdump.")
+
+ function_map = self.AnalyzeDisassembly(disasm_text)
+ result = self.ResolveAnnotation(function_map)
+ (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result
+ remove_list = self.PreprocessAnnotation(
+ function_map, add_set, remove_list, eliminated_addrs
+ )
+ cycle_functions = self.AnalyzeCallGraph(function_map, remove_list)
+
+ # Print the results of task-aware stack analysis.
+ extra_stack_frame = self.annotation.get(
+ "exception_frame_size", DEFAULT_EXCEPTION_FRAME_SIZE
+ )
+ for task in self.tasklist:
+ routine_func = function_map[task.routine_address]
+ print(
+ "Task: {}, Max size: {} ({} + {}), Allocated size: {}".format(
+ task.name,
+ routine_func.stack_max_usage + extra_stack_frame,
+ routine_func.stack_max_usage,
+ extra_stack_frame,
+ task.stack_max_size,
+ )
+ )
+
+ print("Call Trace:")
+ max_stack_path = routine_func.stack_max_path
+ # Assume the routine function is resolved.
+ assert max_stack_path is not None
+ for depth, curr_func in enumerate(max_stack_path):
+ line_info = self.AddressToLine(curr_func.address)[0]
+ if line_info is None:
+ (path, linenum) = ("??", 0)
+ else:
+ (_, path, linenum) = line_info
+
+ print(
+ " {} ({}) [{}:{}] {:x}".format(
+ curr_func.name,
+ curr_func.stack_frame,
+ os.path.relpath(path),
+ linenum,
+ curr_func.address,
+ )
+ )
+
+ if depth + 1 < len(max_stack_path):
+ succ_func = max_stack_path[depth + 1]
+ text_list = []
+ for callsite in curr_func.callsites:
+ if callsite.callee is succ_func:
+ indent_prefix = " "
+ if callsite.address is None:
+ order_text = (
+ None,
+ "{}-> [annotation]".format(indent_prefix),
+ )
+ else:
+ order_text = OutputInlineStack(
+ callsite.address, indent_prefix
+ )
+
+ text_list.append(order_text)
+
+ for _, text in sorted(text_list, key=lambda item: item[0]):
+ print(text)
+
+ print("Unresolved indirect callsites:")
+ for function in function_map.values():
+ indirect_callsites = []
+ for callsite in function.callsites:
+ if callsite.target is None:
+ indirect_callsites.append(callsite.address)
+
+ if len(indirect_callsites) > 0:
+ print(" In function {}:".format(function.name))
+ text_list = []
+ for address in indirect_callsites:
+ text_list.append(OutputInlineStack(address, " "))
+
+ for _, text in sorted(text_list, key=lambda item: item[0]):
+ print(text)
+
+ print("Unresolved annotation signatures:")
+ for sigtxt, error in failed_sigtxts:
+ print(" {}: {}".format(sigtxt, error))
+
+ if len(cycle_functions) > 0:
+ print("There are cycles in the following function sets:")
+ for functions in cycle_functions:
+ print(
+ "[{}]".format(
+ ", ".join(function.name for function in functions)
+ )
+ )
- def ResolveAnnotation(self, function_map):
- """Resolve annotation.
- Args:
- function_map: Function map.
+def ParseArgs():
+ """Parse commandline arguments.
Returns:
- Set of added call edges, list of remove paths, set of eliminated
- callsite addresses, set of annotation signatures which can't be resolved.
+ options: Namespace from argparse.parse_args().
"""
- def StringifySignature(signature):
- """Stringify the tupled signature.
-
- Args:
- signature: Tupled signature.
-
- Returns:
- Signature string.
- """
- (name, path, linenum) = signature
- bracket_text = ''
- if path is not None:
- path = os.path.relpath(path)
- if linenum is None:
- bracket_text = '[{}]'.format(path)
- else:
- bracket_text = '[{}:{}]'.format(path, linenum)
-
- return name + bracket_text
-
- (add_rules, remove_rules, invalid_sigtxts) = self.LoadAnnotation()
-
- signature_set = set()
- for src_sig, dst_sigs in add_rules.items():
- signature_set.add(src_sig)
- signature_set.update(dst_sigs)
-
- for remove_sigs in remove_rules:
- signature_set.update(remove_sigs)
-
- # Map signatures to functions.
- (signature_map, sig_error_map) = self.MapAnnotation(function_map,
- signature_set)
-
- # Build the indirect callsite map indexed by callsite signature.
- indirect_map = collections.defaultdict(set)
- for function in function_map.values():
- for callsite in function.callsites:
- if callsite.target is not None:
- continue
-
- # Found an indirect callsite.
- line_info = self.AddressToLine(callsite.address)[0]
- if line_info is None:
- continue
-
- (name, path, linenum) = line_info
- result = self.FUNCTION_PREFIX_NAME_RE.match(name)
- if result is None:
- continue
-
- indirect_map[(result.group('name').strip(), path, linenum)].add(
- (function, callsite.address))
-
- # Generate the annotation sets.
- add_set = set()
- remove_list = list()
- eliminated_addrs = set()
-
- for src_sig, dst_sigs in add_rules.items():
- src_funcs = set(signature_map.get(src_sig, []))
- # Try to match the source signature to the indirect callsites. Even if it
- # can't be found in disassembly.
- indirect_calls = indirect_map.get(src_sig)
- if indirect_calls is not None:
- for function, callsite_address in indirect_calls:
- # Add the caller of the indirect callsite to the source functions.
- src_funcs.add(function)
- # Assume each callsite can be represented by a unique address.
- eliminated_addrs.add(callsite_address)
-
- if src_sig in sig_error_map:
- # Assume the error is always the not found error. Since the signature
- # found in indirect callsite map must be a full signature, it can't
- # happen the ambiguous error.
- assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND
- # Found in inline stack, remove the not found error.
- del sig_error_map[src_sig]
-
- for dst_sig in dst_sigs:
- dst_funcs = signature_map.get(dst_sig)
- if dst_funcs is None:
- continue
-
- # Duplicate the call edge for all the same source and destination
- # functions.
- for src_func in src_funcs:
- for dst_func in dst_funcs:
- add_set.add((src_func, dst_func))
-
- for remove_sigs in remove_rules:
- # Since each signature can be mapped to multiple functions, generate
- # multiple remove paths from all the combinations of these functions.
- remove_paths = [[]]
- skip_flag = False
- for remove_sig in remove_sigs:
- # Transform each signature to the corresponding functions.
- remove_funcs = signature_map.get(remove_sig)
- if remove_funcs is None:
- # There is an unresolved signature in the remove path. Ignore the
- # whole broken remove path.
- skip_flag = True
- break
- else:
- # Append each function of the current signature to the all previous
- # remove paths.
- remove_paths = [p + [f] for p in remove_paths for f in remove_funcs]
-
- if skip_flag:
- # Ignore the broken remove path.
- continue
-
- for remove_path in remove_paths:
- # Deduplicate the remove paths.
- if remove_path not in remove_list:
- remove_list.append(remove_path)
-
- # Format the error messages.
- failed_sigtxts = set()
- for sigtxt in invalid_sigtxts:
- failed_sigtxts.add((sigtxt, self.ANNOTATION_ERROR_INVALID))
-
- for sig, error in sig_error_map.items():
- failed_sigtxts.add((StringifySignature(sig), error))
-
- return (add_set, remove_list, eliminated_addrs, failed_sigtxts)
-
- def PreprocessAnnotation(self, function_map, add_set, remove_list,
- eliminated_addrs):
- """Preprocess the annotation and callgraph.
+ parser = argparse.ArgumentParser(description="EC firmware stack analyzer.")
+ parser.add_argument("elf_path", help="the path of EC firmware ELF")
+ parser.add_argument(
+ "--export_taskinfo",
+ required=True,
+ help="the path of export_taskinfo.so utility",
+ )
+ parser.add_argument(
+ "--section",
+ required=True,
+ help="the section.",
+ choices=[SECTION_RO, SECTION_RW],
+ )
+ parser.add_argument(
+ "--objdump", default="objdump", help="the path of objdump"
+ )
+ parser.add_argument(
+ "--addr2line", default="addr2line", help="the path of addr2line"
+ )
+ parser.add_argument(
+ "--annotation", default=None, help="the path of annotation file"
+ )
+
+ # TODO(cheyuw): Add an option for dumping stack usage of all functions.
+
+ return parser.parse_args()
- Add the missing call edges, and delete simple remove paths (the paths have
- one or two vertices) from the function_map.
- Eliminate the annotated indirect callsites.
-
- Return the remaining remove list.
+def ParseSymbolText(symbol_text):
+ """Parse the content of the symbol text.
Args:
- function_map: Function map.
- add_set: Set of missing call edges.
- remove_list: List of remove paths.
- eliminated_addrs: Set of eliminated callsite addresses.
+ symbol_text: Text of the symbols.
Returns:
- List of remaining remove paths.
+ symbols: Symbol list.
"""
- def CheckEdge(path):
- """Check if all edges of the path are on the callgraph.
-
- Args:
- path: Path.
-
- Returns:
- True or False.
- """
- for index in range(len(path) - 1):
- if (path[index], path[index + 1]) not in edge_set:
- return False
-
- return True
-
- for src_func, dst_func in add_set:
- # TODO(cheyuw): Support tailing call annotation.
- src_func.callsites.append(
- Callsite(None, dst_func.address, False, dst_func))
-
- # Delete simple remove paths.
- remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2)
- edge_set = set()
- for function in function_map.values():
- cleaned_callsites = []
- for callsite in function.callsites:
- if ((callsite.callee,) in remove_simple or
- (function, callsite.callee) in remove_simple):
- continue
-
- if callsite.target is None and callsite.address in eliminated_addrs:
- continue
-
- cleaned_callsites.append(callsite)
- if callsite.callee is not None:
- edge_set.add((function, callsite.callee))
+ # Example: "10093064 g F .text 0000015c .hidden hook_task"
+ symbol_regex = re.compile(
+ r"^(?P<address>[0-9A-Fa-f]+)\s+[lwg]\s+"
+ r"((?P<type>[OF])\s+)?\S+\s+"
+ r"(?P<size>[0-9A-Fa-f]+)\s+"
+ r"(\S+\s+)?(?P<name>\S+)$"
+ )
+
+ symbols = []
+ for line in symbol_text.splitlines():
+ line = line.strip()
+ result = symbol_regex.match(line)
+ if result is not None:
+ address = int(result.group("address"), 16)
+ symtype = result.group("type")
+ if symtype is None:
+ symtype = "O"
- function.callsites = cleaned_callsites
+ size = int(result.group("size"), 16)
+ name = result.group("name")
+ symbols.append(Symbol(address, symtype, size, name))
- return [p for p in remove_list if len(p) >= 3 and CheckEdge(p)]
+ return symbols
- def AnalyzeCallGraph(self, function_map, remove_list):
- """Analyze callgraph.
- It will update the max stack size and path for each function.
+def ParseRoDataText(rodata_text):
+ """Parse the content of rodata
Args:
- function_map: Function map.
- remove_list: List of remove paths.
+ symbol_text: Text of the rodata dump.
Returns:
- List of function cycles.
+ symbols: Symbol list.
"""
- def Traverse(curr_state):
- """Traverse the callgraph and calculate the max stack usages of functions.
-
- Args:
- curr_state: Current state.
-
- Returns:
- SCC lowest link.
- """
- scc_index = scc_index_counter[0]
- scc_index_counter[0] += 1
- scc_index_map[curr_state] = scc_index
- scc_lowlink = scc_index
- scc_stack.append(curr_state)
- # Push the current state in the stack. We can use a set to maintain this
- # because the stacked states are unique; otherwise we will find a cycle
- # first.
- stacked_states.add(curr_state)
-
- (curr_address, curr_positions) = curr_state
- curr_func = function_map[curr_address]
-
- invalid_flag = False
- new_positions = list(curr_positions)
- for index, position in enumerate(curr_positions):
- remove_path = remove_list[index]
-
- # The position of each remove path in the state is the length of the
- # longest matching path between the prefix of the remove path and the
- # suffix of the current traversing path. We maintain this length when
- # appending the next callee to the traversing path. And it can be used
- # to check if the remove path appears in the traversing path.
-
- # TODO(cheyuw): Implement KMP algorithm to match remove paths
- # efficiently.
- if remove_path[position] is curr_func:
- # Matches the current function, extend the length.
- new_positions[index] = position + 1
- if new_positions[index] == len(remove_path):
- # The length of the longest matching path is equal to the length of
- # the remove path, which means the suffix of the current traversing
- # path matches the remove path.
- invalid_flag = True
- break
-
- else:
- # We can't get the new longest matching path by extending the previous
- # one directly. Fallback to search the new longest matching path.
-
- # If we can't find any matching path in the following search, reset
- # the matching length to 0.
- new_positions[index] = 0
-
- # We want to find the new longest matching prefix of remove path with
- # the suffix of the current traversing path. Because the new longest
- # matching path won't be longer than the prevous one now, and part of
- # the suffix matches the prefix of remove path, we can get the needed
- # suffix from the previous matching prefix of the invalid path.
- suffix = remove_path[:position] + [curr_func]
- for offset in range(1, len(suffix)):
- length = position - offset
- if remove_path[:length] == suffix[offset:]:
- new_positions[index] = length
- break
-
- new_positions = tuple(new_positions)
-
- # If the current suffix is invalid, set the max stack usage to 0.
- max_stack_usage = 0
- max_callee_state = None
- self_loop = False
-
- if not invalid_flag:
- # Max stack usage is at least equal to the stack frame.
- max_stack_usage = curr_func.stack_frame
- for callsite in curr_func.callsites:
- callee = callsite.callee
- if callee is None:
+ # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K...
+ # 100a7294 00000000 00000000 01000000 ............
+
+ base_offset = None
+ offset = None
+ rodata = []
+ for line in rodata_text.splitlines():
+ line = line.strip()
+ space = line.find(" ")
+ if space < 0:
+ continue
+ try:
+ address = int(line[0:space], 16)
+ except ValueError:
continue
- callee_state = (callee.address, new_positions)
- if callee_state not in scc_index_map:
- # Unvisited state.
- scc_lowlink = min(scc_lowlink, Traverse(callee_state))
- elif callee_state in stacked_states:
- # The state is shown in the stack. There is a cycle.
- sub_stack_usage = 0
- scc_lowlink = min(scc_lowlink, scc_index_map[callee_state])
- if callee_state == curr_state:
- self_loop = True
-
- done_result = done_states.get(callee_state)
- if done_result is not None:
- # Already done this state and use its result. If the state reaches a
- # cycle, reusing the result will cause inaccuracy (the stack usage
- # of cycle depends on where the entrance is). But it's fine since we
- # can't get accurate stack usage under this situation, and we rely
- # on user-provided annotations to break the cycle, after which the
- # result will be accurate again.
- (sub_stack_usage, _) = done_result
-
- if callsite.is_tail:
- # For tailing call, since the callee reuses the stack frame of the
- # caller, choose the larger one directly.
- stack_usage = max(curr_func.stack_frame, sub_stack_usage)
- else:
- stack_usage = curr_func.stack_frame + sub_stack_usage
-
- if stack_usage > max_stack_usage:
- max_stack_usage = stack_usage
- max_callee_state = callee_state
-
- if scc_lowlink == scc_index:
- group = []
- while scc_stack[-1] != curr_state:
- scc_state = scc_stack.pop()
- stacked_states.remove(scc_state)
- group.append(scc_state)
-
- scc_stack.pop()
- stacked_states.remove(curr_state)
-
- # If the cycle is not empty, record it.
- if len(group) > 0 or self_loop:
- group.append(curr_state)
- cycle_groups.append(group)
-
- # Store the done result.
- done_states[curr_state] = (max_stack_usage, max_callee_state)
-
- if curr_positions == initial_positions:
- # If the current state is initial state, we traversed the callgraph by
- # using the current function as start point. Update the stack usage of
- # the function.
- # If the function matches a single vertex remove path, this will set its
- # max stack usage to 0, which is not expected (we still calculate its
- # max stack usage, but prevent any function from calling it). However,
- # all the single vertex remove paths have been preprocessed and removed.
- curr_func.stack_max_usage = max_stack_usage
-
- # Reconstruct the max stack path by traversing the state transitions.
- max_stack_path = [curr_func]
- callee_state = max_callee_state
- while callee_state is not None:
- # The first element of state tuple is function address.
- max_stack_path.append(function_map[callee_state[0]])
- done_result = done_states.get(callee_state)
- # All of the descendants should be done.
- assert done_result is not None
- (_, callee_state) = done_result
-
- curr_func.stack_max_path = max_stack_path
-
- return scc_lowlink
-
- # The state is the concatenation of the current function address and the
- # state of matching position.
- initial_positions = (0,) * len(remove_list)
- done_states = {}
- stacked_states = set()
- scc_index_counter = [0]
- scc_index_map = {}
- scc_stack = []
- cycle_groups = []
- for function in function_map.values():
- if function.stack_max_usage is None:
- Traverse((function.address, initial_positions))
-
- cycle_functions = []
- for group in cycle_groups:
- cycle = set(function_map[state[0]] for state in group)
- if cycle not in cycle_functions:
- cycle_functions.append(cycle)
-
- return cycle_functions
-
- def Analyze(self):
- """Run the stack analysis.
-
- Raises:
- StackAnalyzerError: If disassembly fails.
- """
- def OutputInlineStack(address, prefix=''):
- """Output beautiful inline stack.
-
- Args:
- address: Address.
- prefix: Prefix of each line.
-
- Returns:
- Key for sorting, output text
- """
- line_infos = self.AddressToLine(address, True)
-
- if line_infos[0] is None:
- order_key = (None, None)
- else:
- (_, path, linenum) = line_infos[0]
- order_key = (linenum, path)
-
- line_texts = []
- for line_info in reversed(line_infos):
- if line_info is None:
- (function_name, path, linenum) = ('??', '??', 0)
- else:
- (function_name, path, linenum) = line_info
-
- line_texts.append('{}[{}:{}]'.format(function_name,
- os.path.relpath(path),
- linenum))
-
- output = '{}-> {} {:x}\n'.format(prefix, line_texts[0], address)
- for depth, line_text in enumerate(line_texts[1:]):
- output += '{} {}- {}\n'.format(prefix, ' ' * depth, line_text)
-
- # Remove the last newline character.
- return (order_key, output.rstrip('\n'))
-
- # Analyze disassembly.
- try:
- disasm_text = subprocess.check_output([self.options.objdump,
- '-d',
- self.options.elf_path],
- encoding='utf-8')
- except subprocess.CalledProcessError:
- raise StackAnalyzerError('objdump failed to disassemble.')
- except OSError:
- raise StackAnalyzerError('Failed to run objdump.')
-
- function_map = self.AnalyzeDisassembly(disasm_text)
- result = self.ResolveAnnotation(function_map)
- (add_set, remove_list, eliminated_addrs, failed_sigtxts) = result
- remove_list = self.PreprocessAnnotation(function_map,
- add_set,
- remove_list,
- eliminated_addrs)
- cycle_functions = self.AnalyzeCallGraph(function_map, remove_list)
-
- # Print the results of task-aware stack analysis.
- extra_stack_frame = self.annotation.get('exception_frame_size',
- DEFAULT_EXCEPTION_FRAME_SIZE)
- for task in self.tasklist:
- routine_func = function_map[task.routine_address]
- print('Task: {}, Max size: {} ({} + {}), Allocated size: {}'.format(
- task.name,
- routine_func.stack_max_usage + extra_stack_frame,
- routine_func.stack_max_usage,
- extra_stack_frame,
- task.stack_max_size))
-
- print('Call Trace:')
- max_stack_path = routine_func.stack_max_path
- # Assume the routine function is resolved.
- assert max_stack_path is not None
- for depth, curr_func in enumerate(max_stack_path):
- line_info = self.AddressToLine(curr_func.address)[0]
- if line_info is None:
- (path, linenum) = ('??', 0)
- else:
- (_, path, linenum) = line_info
-
- print(' {} ({}) [{}:{}] {:x}'.format(curr_func.name,
- curr_func.stack_frame,
- os.path.relpath(path),
- linenum,
- curr_func.address))
-
- if depth + 1 < len(max_stack_path):
- succ_func = max_stack_path[depth + 1]
- text_list = []
- for callsite in curr_func.callsites:
- if callsite.callee is succ_func:
- indent_prefix = ' '
- if callsite.address is None:
- order_text = (None, '{}-> [annotation]'.format(indent_prefix))
- else:
- order_text = OutputInlineStack(callsite.address, indent_prefix)
-
- text_list.append(order_text)
-
- for _, text in sorted(text_list, key=lambda item: item[0]):
- print(text)
-
- print('Unresolved indirect callsites:')
- for function in function_map.values():
- indirect_callsites = []
- for callsite in function.callsites:
- if callsite.target is None:
- indirect_callsites.append(callsite.address)
-
- if len(indirect_callsites) > 0:
- print(' In function {}:'.format(function.name))
- text_list = []
- for address in indirect_callsites:
- text_list.append(OutputInlineStack(address, ' '))
-
- for _, text in sorted(text_list, key=lambda item: item[0]):
- print(text)
-
- print('Unresolved annotation signatures:')
- for sigtxt, error in failed_sigtxts:
- print(' {}: {}'.format(sigtxt, error))
-
- if len(cycle_functions) > 0:
- print('There are cycles in the following function sets:')
- for functions in cycle_functions:
- print('[{}]'.format(', '.join(function.name for function in functions)))
-
-
-def ParseArgs():
- """Parse commandline arguments.
-
- Returns:
- options: Namespace from argparse.parse_args().
- """
- parser = argparse.ArgumentParser(description="EC firmware stack analyzer.")
- parser.add_argument('elf_path', help="the path of EC firmware ELF")
- parser.add_argument('--export_taskinfo', required=True,
- help="the path of export_taskinfo.so utility")
- parser.add_argument('--section', required=True, help='the section.',
- choices=[SECTION_RO, SECTION_RW])
- parser.add_argument('--objdump', default='objdump',
- help='the path of objdump')
- parser.add_argument('--addr2line', default='addr2line',
- help='the path of addr2line')
- parser.add_argument('--annotation', default=None,
- help='the path of annotation file')
-
- # TODO(cheyuw): Add an option for dumping stack usage of all functions.
-
- return parser.parse_args()
-
-
-def ParseSymbolText(symbol_text):
- """Parse the content of the symbol text.
-
- Args:
- symbol_text: Text of the symbols.
-
- Returns:
- symbols: Symbol list.
- """
- # Example: "10093064 g F .text 0000015c .hidden hook_task"
- symbol_regex = re.compile(r'^(?P<address>[0-9A-Fa-f]+)\s+[lwg]\s+'
- r'((?P<type>[OF])\s+)?\S+\s+'
- r'(?P<size>[0-9A-Fa-f]+)\s+'
- r'(\S+\s+)?(?P<name>\S+)$')
-
- symbols = []
- for line in symbol_text.splitlines():
- line = line.strip()
- result = symbol_regex.match(line)
- if result is not None:
- address = int(result.group('address'), 16)
- symtype = result.group('type')
- if symtype is None:
- symtype = 'O'
-
- size = int(result.group('size'), 16)
- name = result.group('name')
- symbols.append(Symbol(address, symtype, size, name))
-
- return symbols
-
-
-def ParseRoDataText(rodata_text):
- """Parse the content of rodata
-
- Args:
- symbol_text: Text of the rodata dump.
-
- Returns:
- symbols: Symbol list.
- """
- # Examples: 8018ab0 00040048 00010000 10020000 4b8e0108 ...H........K...
- # 100a7294 00000000 00000000 01000000 ............
-
- base_offset = None
- offset = None
- rodata = []
- for line in rodata_text.splitlines():
- line = line.strip()
- space = line.find(' ')
- if space < 0:
- continue
- try:
- address = int(line[0:space], 16)
- except ValueError:
- continue
-
- if not base_offset:
- base_offset = address
- offset = address
- elif address != offset:
- raise StackAnalyzerError('objdump of rodata not contiguous.')
+ if not base_offset:
+ base_offset = address
+ offset = address
+ elif address != offset:
+ raise StackAnalyzerError("objdump of rodata not contiguous.")
- for i in range(0, 4):
- num = line[(space + 1 + i*9):(space + 9 + i*9)]
- if len(num.strip()) > 0:
- val = int(num, 16)
- else:
- val = 0
- # TODO(drinkcat): Not all platforms are necessarily big-endian
- rodata.append((val & 0x000000ff) << 24 |
- (val & 0x0000ff00) << 8 |
- (val & 0x00ff0000) >> 8 |
- (val & 0xff000000) >> 24)
+ for i in range(0, 4):
+ num = line[(space + 1 + i * 9) : (space + 9 + i * 9)]
+ if len(num.strip()) > 0:
+ val = int(num, 16)
+ else:
+ val = 0
+ # TODO(drinkcat): Not all platforms are necessarily big-endian
+ rodata.append(
+ (val & 0x000000FF) << 24
+ | (val & 0x0000FF00) << 8
+ | (val & 0x00FF0000) >> 8
+ | (val & 0xFF000000) >> 24
+ )
- offset = offset + 4*4
+ offset = offset + 4 * 4
- return (base_offset, rodata)
+ return (base_offset, rodata)
def LoadTasklist(section, export_taskinfo, symbols):
- """Load the task information.
+ """Load the task information.
- Args:
- section: Section (RO | RW).
- export_taskinfo: Handle of export_taskinfo.so.
- symbols: Symbol list.
+ Args:
+ section: Section (RO | RW).
+ export_taskinfo: Handle of export_taskinfo.so.
+ symbols: Symbol list.
- Returns:
- tasklist: Task list.
- """
+ Returns:
+ tasklist: Task list.
+ """
- TaskInfoPointer = ctypes.POINTER(TaskInfo)
- taskinfos = TaskInfoPointer()
- if section == SECTION_RO:
- get_taskinfos_func = export_taskinfo.get_ro_taskinfos
- else:
- get_taskinfos_func = export_taskinfo.get_rw_taskinfos
+ TaskInfoPointer = ctypes.POINTER(TaskInfo)
+ taskinfos = TaskInfoPointer()
+ if section == SECTION_RO:
+ get_taskinfos_func = export_taskinfo.get_ro_taskinfos
+ else:
+ get_taskinfos_func = export_taskinfo.get_rw_taskinfos
- taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos))
+ taskinfo_num = get_taskinfos_func(ctypes.pointer(taskinfos))
- tasklist = []
- for index in range(taskinfo_num):
- taskinfo = taskinfos[index]
- tasklist.append(Task(taskinfo.name.decode('utf-8'),
- taskinfo.routine.decode('utf-8'),
- taskinfo.stack_size))
+ tasklist = []
+ for index in range(taskinfo_num):
+ taskinfo = taskinfos[index]
+ tasklist.append(
+ Task(
+ taskinfo.name.decode("utf-8"),
+ taskinfo.routine.decode("utf-8"),
+ taskinfo.stack_size,
+ )
+ )
- # Resolve routine address for each task. It's more efficient to resolve all
- # routine addresses of tasks together.
- routine_map = dict((task.routine_name, None) for task in tasklist)
+ # Resolve routine address for each task. It's more efficient to resolve all
+ # routine addresses of tasks together.
+ routine_map = dict((task.routine_name, None) for task in tasklist)
- for symbol in symbols:
- # Resolve task routine address.
- if symbol.name in routine_map:
- # Assume the symbol of routine is unique.
- assert routine_map[symbol.name] is None
- routine_map[symbol.name] = symbol.address
+ for symbol in symbols:
+ # Resolve task routine address.
+ if symbol.name in routine_map:
+ # Assume the symbol of routine is unique.
+ assert routine_map[symbol.name] is None
+ routine_map[symbol.name] = symbol.address
- for task in tasklist:
- address = routine_map[task.routine_name]
- # Assume we have resolved all routine addresses.
- assert address is not None
- task.routine_address = address
+ for task in tasklist:
+ address = routine_map[task.routine_name]
+ # Assume we have resolved all routine addresses.
+ assert address is not None
+ task.routine_address = address
- return tasklist
+ return tasklist
def main():
- """Main function."""
- try:
- options = ParseArgs()
-
- # Load annotation config.
- if options.annotation is None:
- annotation = {}
- elif not os.path.exists(options.annotation):
- print('Warning: Annotation file {} does not exist.'
- .format(options.annotation))
- annotation = {}
- else:
- try:
- with open(options.annotation, 'r') as annotation_file:
- annotation = yaml.safe_load(annotation_file)
-
- except yaml.YAMLError:
- raise StackAnalyzerError('Failed to parse annotation file {}.'
- .format(options.annotation))
- except IOError:
- raise StackAnalyzerError('Failed to open annotation file {}.'
- .format(options.annotation))
-
- # TODO(cheyuw): Do complete annotation format verification.
- if not isinstance(annotation, dict):
- raise StackAnalyzerError('Invalid annotation file {}.'
- .format(options.annotation))
-
- # Generate and parse the symbols.
+ """Main function."""
try:
- symbol_text = subprocess.check_output([options.objdump,
- '-t',
- options.elf_path],
- encoding='utf-8')
- rodata_text = subprocess.check_output([options.objdump,
- '-s',
- '-j', '.rodata',
- options.elf_path],
- encoding='utf-8')
- except subprocess.CalledProcessError:
- raise StackAnalyzerError('objdump failed to dump symbol table or rodata.')
- except OSError:
- raise StackAnalyzerError('Failed to run objdump.')
-
- symbols = ParseSymbolText(symbol_text)
- rodata = ParseRoDataText(rodata_text)
-
- # Load the tasklist.
- try:
- export_taskinfo = ctypes.CDLL(options.export_taskinfo)
- except OSError:
- raise StackAnalyzerError('Failed to load export_taskinfo.')
-
- tasklist = LoadTasklist(options.section, export_taskinfo, symbols)
-
- analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation)
- analyzer.Analyze()
- except StackAnalyzerError as e:
- print('Error: {}'.format(e))
-
-
-if __name__ == '__main__':
- main()
+ options = ParseArgs()
+
+ # Load annotation config.
+ if options.annotation is None:
+ annotation = {}
+ elif not os.path.exists(options.annotation):
+ print(
+ "Warning: Annotation file {} does not exist.".format(
+ options.annotation
+ )
+ )
+ annotation = {}
+ else:
+ try:
+ with open(options.annotation, "r") as annotation_file:
+ annotation = yaml.safe_load(annotation_file)
+
+ except yaml.YAMLError:
+ raise StackAnalyzerError(
+ "Failed to parse annotation file {}.".format(
+ options.annotation
+ )
+ )
+ except IOError:
+ raise StackAnalyzerError(
+ "Failed to open annotation file {}.".format(
+ options.annotation
+ )
+ )
+
+ # TODO(cheyuw): Do complete annotation format verification.
+ if not isinstance(annotation, dict):
+ raise StackAnalyzerError(
+ "Invalid annotation file {}.".format(options.annotation)
+ )
+
+ # Generate and parse the symbols.
+ try:
+ symbol_text = subprocess.check_output(
+ [options.objdump, "-t", options.elf_path], encoding="utf-8"
+ )
+ rodata_text = subprocess.check_output(
+ [options.objdump, "-s", "-j", ".rodata", options.elf_path],
+ encoding="utf-8",
+ )
+ except subprocess.CalledProcessError:
+ raise StackAnalyzerError(
+ "objdump failed to dump symbol table or rodata."
+ )
+ except OSError:
+ raise StackAnalyzerError("Failed to run objdump.")
+
+ symbols = ParseSymbolText(symbol_text)
+ rodata = ParseRoDataText(rodata_text)
+
+ # Load the tasklist.
+ try:
+ export_taskinfo = ctypes.CDLL(options.export_taskinfo)
+ except OSError:
+ raise StackAnalyzerError("Failed to load export_taskinfo.")
+
+ tasklist = LoadTasklist(options.section, export_taskinfo, symbols)
+
+ analyzer = StackAnalyzer(options, symbols, rodata, tasklist, annotation)
+ analyzer.Analyze()
+ except StackAnalyzerError as e:
+ print("Error: {}".format(e))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/extra/stack_analyzer/stack_analyzer_unittest.py b/extra/stack_analyzer/stack_analyzer_unittest.py
index c36fa9da45..23a8fb93ea 100755
--- a/extra/stack_analyzer/stack_analyzer_unittest.py
+++ b/extra/stack_analyzer/stack_analyzer_unittest.py
@@ -1,830 +1,993 @@
#!/usr/bin/env python3
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Tests for Stack Analyzer classes and functions."""
from __future__ import print_function
-import mock
import os
import subprocess
import unittest
+import mock # pylint:disable=import-error
import stack_analyzer as sa
class ObjectTest(unittest.TestCase):
- """Tests for classes of basic objects."""
-
- def testTask(self):
- task_a = sa.Task('a', 'a_task', 1234)
- task_b = sa.Task('b', 'b_task', 5678, 0x1000)
- self.assertEqual(task_a, task_a)
- self.assertNotEqual(task_a, task_b)
- self.assertNotEqual(task_a, None)
-
- def testSymbol(self):
- symbol_a = sa.Symbol(0x1234, 'F', 32, 'a')
- symbol_b = sa.Symbol(0x234, 'O', 42, 'b')
- self.assertEqual(symbol_a, symbol_a)
- self.assertNotEqual(symbol_a, symbol_b)
- self.assertNotEqual(symbol_a, None)
-
- def testCallsite(self):
- callsite_a = sa.Callsite(0x1002, 0x3000, False)
- callsite_b = sa.Callsite(0x1002, 0x3000, True)
- self.assertEqual(callsite_a, callsite_a)
- self.assertNotEqual(callsite_a, callsite_b)
- self.assertNotEqual(callsite_a, None)
-
- def testFunction(self):
- func_a = sa.Function(0x100, 'a', 0, [])
- func_b = sa.Function(0x200, 'b', 0, [])
- self.assertEqual(func_a, func_a)
- self.assertNotEqual(func_a, func_b)
- self.assertNotEqual(func_a, None)
+ """Tests for classes of basic objects."""
+
+ def testTask(self):
+ task_a = sa.Task("a", "a_task", 1234)
+ task_b = sa.Task("b", "b_task", 5678, 0x1000)
+ self.assertEqual(task_a, task_a)
+ self.assertNotEqual(task_a, task_b)
+ self.assertNotEqual(task_a, None)
+
+ def testSymbol(self):
+ symbol_a = sa.Symbol(0x1234, "F", 32, "a")
+ symbol_b = sa.Symbol(0x234, "O", 42, "b")
+ self.assertEqual(symbol_a, symbol_a)
+ self.assertNotEqual(symbol_a, symbol_b)
+ self.assertNotEqual(symbol_a, None)
+
+ def testCallsite(self):
+ callsite_a = sa.Callsite(0x1002, 0x3000, False)
+ callsite_b = sa.Callsite(0x1002, 0x3000, True)
+ self.assertEqual(callsite_a, callsite_a)
+ self.assertNotEqual(callsite_a, callsite_b)
+ self.assertNotEqual(callsite_a, None)
+
+ def testFunction(self):
+ func_a = sa.Function(0x100, "a", 0, [])
+ func_b = sa.Function(0x200, "b", 0, [])
+ self.assertEqual(func_a, func_a)
+ self.assertNotEqual(func_a, func_b)
+ self.assertNotEqual(func_a, None)
class ArmAnalyzerTest(unittest.TestCase):
- """Tests for class ArmAnalyzer."""
-
- def AppendConditionCode(self, opcodes):
- rets = []
- for opcode in opcodes:
- rets.extend(opcode + cc for cc in sa.ArmAnalyzer.CONDITION_CODES)
-
- return rets
-
- def testInstructionMatching(self):
- jump_list = self.AppendConditionCode(['b', 'bx'])
- jump_list += (list(opcode + '.n' for opcode in jump_list) +
- list(opcode + '.w' for opcode in jump_list))
- for opcode in jump_list:
- self.assertIsNotNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match(opcode))
-
- self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match('bl'))
- self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match('blx'))
-
- cbz_list = ['cbz', 'cbnz', 'cbz.n', 'cbnz.n', 'cbz.w', 'cbnz.w']
- for opcode in cbz_list:
- self.assertIsNotNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode))
-
- self.assertIsNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match('cbn'))
-
- call_list = self.AppendConditionCode(['bl', 'blx'])
- call_list += list(opcode + '.n' for opcode in call_list)
- for opcode in call_list:
- self.assertIsNotNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match(opcode))
-
- self.assertIsNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match('ble'))
-
- result = sa.ArmAnalyzer.CALL_OPERAND_RE.match('53f90 <get_time+0x18>')
- self.assertIsNotNone(result)
- self.assertEqual(result.group(1), '53f90')
- self.assertEqual(result.group(2), 'get_time+0x18')
-
- result = sa.ArmAnalyzer.CBZ_CBNZ_OPERAND_RE.match('r6, 53f90 <get+0x0>')
- self.assertIsNotNone(result)
- self.assertEqual(result.group(1), '53f90')
- self.assertEqual(result.group(2), 'get+0x0')
-
- self.assertIsNotNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match('push'))
- self.assertIsNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match('pushal'))
- self.assertIsNotNone(sa.ArmAnalyzer.STM_OPCODE_RE.match('stmdb'))
- self.assertIsNone(sa.ArmAnalyzer.STM_OPCODE_RE.match('lstm'))
- self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('sub'))
- self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('subs'))
- self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('subw'))
- self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('sub.w'))
- self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match('subs.w'))
-
- result = sa.ArmAnalyzer.SUB_OPERAND_RE.match('sp, sp, #1668 ; 0x684')
- self.assertIsNotNone(result)
- self.assertEqual(result.group(1), '1668')
- result = sa.ArmAnalyzer.SUB_OPERAND_RE.match('sp, #1668')
- self.assertIsNotNone(result)
- self.assertEqual(result.group(1), '1668')
- self.assertIsNone(sa.ArmAnalyzer.SUB_OPERAND_RE.match('sl, #1668'))
-
- def testAnalyzeFunction(self):
- analyzer = sa.ArmAnalyzer()
- symbol = sa.Symbol(0x10, 'F', 0x100, 'foo')
- instructions = [
- (0x10, 'push', '{r4, r5, r6, r7, lr}'),
- (0x12, 'subw', 'sp, sp, #16 ; 0x10'),
- (0x16, 'movs', 'lr, r1'),
- (0x18, 'beq.n', '26 <foo+0x26>'),
- (0x1a, 'bl', '30 <foo+0x30>'),
- (0x1e, 'bl', 'deadbeef <bar>'),
- (0x22, 'blx', '0 <woo>'),
- (0x26, 'push', '{r1}'),
- (0x28, 'stmdb', 'sp!, {r4, r5, r6, r7, r8, r9, lr}'),
- (0x2c, 'stmdb', 'sp!, {r4}'),
- (0x30, 'stmdb', 'sp, {r4}'),
- (0x34, 'bx.n', '10 <foo>'),
- (0x36, 'bx.n', 'r3'),
- (0x38, 'ldr', 'pc, [r10]'),
- ]
- (size, callsites) = analyzer.AnalyzeFunction(symbol, instructions)
- self.assertEqual(size, 72)
- expect_callsites = [sa.Callsite(0x1e, 0xdeadbeef, False),
- sa.Callsite(0x22, 0x0, False),
- sa.Callsite(0x34, 0x10, True),
- sa.Callsite(0x36, None, True),
- sa.Callsite(0x38, None, True)]
- self.assertEqual(callsites, expect_callsites)
+ """Tests for class ArmAnalyzer."""
+
+ def AppendConditionCode(self, opcodes):
+ rets = []
+ for opcode in opcodes:
+ rets.extend(opcode + cc for cc in sa.ArmAnalyzer.CONDITION_CODES)
+
+ return rets
+
+ def testInstructionMatching(self):
+ jump_list = self.AppendConditionCode(["b", "bx"])
+ jump_list += list(opcode + ".n" for opcode in jump_list) + list(
+ opcode + ".w" for opcode in jump_list
+ )
+ for opcode in jump_list:
+ self.assertIsNotNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match(opcode))
+
+ self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match("bl"))
+ self.assertIsNone(sa.ArmAnalyzer.JUMP_OPCODE_RE.match("blx"))
+
+ cbz_list = ["cbz", "cbnz", "cbz.n", "cbnz.n", "cbz.w", "cbnz.w"]
+ for opcode in cbz_list:
+ self.assertIsNotNone(
+ sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode)
+ )
+
+ self.assertIsNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match("cbn"))
+
+ call_list = self.AppendConditionCode(["bl", "blx"])
+ call_list += list(opcode + ".n" for opcode in call_list)
+ for opcode in call_list:
+ self.assertIsNotNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match(opcode))
+
+ self.assertIsNone(sa.ArmAnalyzer.CALL_OPCODE_RE.match("ble"))
+
+ result = sa.ArmAnalyzer.CALL_OPERAND_RE.match("53f90 <get_time+0x18>")
+ self.assertIsNotNone(result)
+ self.assertEqual(result.group(1), "53f90")
+ self.assertEqual(result.group(2), "get_time+0x18")
+
+ result = sa.ArmAnalyzer.CBZ_CBNZ_OPERAND_RE.match("r6, 53f90 <get+0x0>")
+ self.assertIsNotNone(result)
+ self.assertEqual(result.group(1), "53f90")
+ self.assertEqual(result.group(2), "get+0x0")
+
+ self.assertIsNotNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match("push"))
+ self.assertIsNone(sa.ArmAnalyzer.PUSH_OPCODE_RE.match("pushal"))
+ self.assertIsNotNone(sa.ArmAnalyzer.STM_OPCODE_RE.match("stmdb"))
+ self.assertIsNone(sa.ArmAnalyzer.STM_OPCODE_RE.match("lstm"))
+ self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("sub"))
+ self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("subs"))
+ self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("subw"))
+ self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("sub.w"))
+ self.assertIsNotNone(sa.ArmAnalyzer.SUB_OPCODE_RE.match("subs.w"))
+
+ result = sa.ArmAnalyzer.SUB_OPERAND_RE.match("sp, sp, #1668 ; 0x684")
+ self.assertIsNotNone(result)
+ self.assertEqual(result.group(1), "1668")
+ result = sa.ArmAnalyzer.SUB_OPERAND_RE.match("sp, #1668")
+ self.assertIsNotNone(result)
+ self.assertEqual(result.group(1), "1668")
+ self.assertIsNone(sa.ArmAnalyzer.SUB_OPERAND_RE.match("sl, #1668"))
+
+ def testAnalyzeFunction(self):
+ analyzer = sa.ArmAnalyzer()
+ symbol = sa.Symbol(0x10, "F", 0x100, "foo")
+ instructions = [
+ (0x10, "push", "{r4, r5, r6, r7, lr}"),
+ (0x12, "subw", "sp, sp, #16 ; 0x10"),
+ (0x16, "movs", "lr, r1"),
+ (0x18, "beq.n", "26 <foo+0x26>"),
+ (0x1A, "bl", "30 <foo+0x30>"),
+ (0x1E, "bl", "deadbeef <bar>"),
+ (0x22, "blx", "0 <woo>"),
+ (0x26, "push", "{r1}"),
+ (0x28, "stmdb", "sp!, {r4, r5, r6, r7, r8, r9, lr}"),
+ (0x2C, "stmdb", "sp!, {r4}"),
+ (0x30, "stmdb", "sp, {r4}"),
+ (0x34, "bx.n", "10 <foo>"),
+ (0x36, "bx.n", "r3"),
+ (0x38, "ldr", "pc, [r10]"),
+ ]
+ (size, callsites) = analyzer.AnalyzeFunction(symbol, instructions)
+ self.assertEqual(size, 72)
+ expect_callsites = [
+ sa.Callsite(0x1E, 0xDEADBEEF, False),
+ sa.Callsite(0x22, 0x0, False),
+ sa.Callsite(0x34, 0x10, True),
+ sa.Callsite(0x36, None, True),
+ sa.Callsite(0x38, None, True),
+ ]
+ self.assertEqual(callsites, expect_callsites)
class StackAnalyzerTest(unittest.TestCase):
- """Tests for class StackAnalyzer."""
-
- def setUp(self):
- symbols = [sa.Symbol(0x1000, 'F', 0x15C, 'hook_task'),
- sa.Symbol(0x2000, 'F', 0x51C, 'console_task'),
- sa.Symbol(0x3200, 'O', 0x124, '__just_data'),
- sa.Symbol(0x4000, 'F', 0x11C, 'touchpad_calc'),
- sa.Symbol(0x5000, 'F', 0x12C, 'touchpad_calc.constprop.42'),
- sa.Symbol(0x12000, 'F', 0x13C, 'trackpad_range'),
- sa.Symbol(0x13000, 'F', 0x200, 'inlined_mul'),
- sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul'),
- sa.Symbol(0x13100, 'F', 0x200, 'inlined_mul_alias'),
- sa.Symbol(0x20000, 'O', 0x0, '__array'),
- sa.Symbol(0x20010, 'O', 0x0, '__array_end'),
- ]
- tasklist = [sa.Task('HOOKS', 'hook_task', 2048, 0x1000),
- sa.Task('CONSOLE', 'console_task', 460, 0x2000)]
- # Array at 0x20000 that contains pointers to hook_task and console_task,
- # with stride=8, offset=4
- rodata = (0x20000, [ 0xDEAD1000, 0x00001000, 0xDEAD2000, 0x00002000 ])
- options = mock.MagicMock(elf_path='./ec.RW.elf',
- export_taskinfo='fake',
- section='RW',
- objdump='objdump',
- addr2line='addr2line',
- annotation=None)
- self.analyzer = sa.StackAnalyzer(options, symbols, rodata, tasklist, {})
-
- def testParseSymbolText(self):
- symbol_text = (
- '0 g F .text e8 Foo\n'
- '0000dead w F .text 000000e8 .hidden Bar\n'
- 'deadbeef l O .bss 00000004 .hidden Woooo\n'
- 'deadbee g O .rodata 00000008 __Hooo_ooo\n'
- 'deadbee g .rodata 00000000 __foo_doo_coo_end\n'
- )
- symbols = sa.ParseSymbolText(symbol_text)
- expect_symbols = [sa.Symbol(0x0, 'F', 0xe8, 'Foo'),
- sa.Symbol(0xdead, 'F', 0xe8, 'Bar'),
- sa.Symbol(0xdeadbeef, 'O', 0x4, 'Woooo'),
- sa.Symbol(0xdeadbee, 'O', 0x8, '__Hooo_ooo'),
- sa.Symbol(0xdeadbee, 'O', 0x0, '__foo_doo_coo_end')]
- self.assertEqual(symbols, expect_symbols)
-
- def testParseRoData(self):
- rodata_text = (
- '\n'
- 'Contents of section .rodata:\n'
- ' 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n'
- )
- rodata = sa.ParseRoDataText(rodata_text)
- expect_rodata = (0x20000,
- [ 0x0010adde, 0x00001000, 0x0020adde, 0x00002000 ])
- self.assertEqual(rodata, expect_rodata)
-
- def testLoadTasklist(self):
- def tasklist_to_taskinfos(pointer, tasklist):
- taskinfos = []
- for task in tasklist:
- taskinfos.append(sa.TaskInfo(name=task.name.encode('utf-8'),
- routine=task.routine_name.encode('utf-8'),
- stack_size=task.stack_max_size))
-
- TaskInfoArray = sa.TaskInfo * len(taskinfos)
- pointer.contents.contents = TaskInfoArray(*taskinfos)
- return len(taskinfos)
-
- def ro_taskinfos(pointer):
- return tasklist_to_taskinfos(pointer, expect_ro_tasklist)
-
- def rw_taskinfos(pointer):
- return tasklist_to_taskinfos(pointer, expect_rw_tasklist)
-
- expect_ro_tasklist = [
- sa.Task('HOOKS', 'hook_task', 2048, 0x1000),
- ]
-
- expect_rw_tasklist = [
- sa.Task('HOOKS', 'hook_task', 2048, 0x1000),
- sa.Task('WOOKS', 'hook_task', 4096, 0x1000),
- sa.Task('CONSOLE', 'console_task', 460, 0x2000),
- ]
-
- export_taskinfo = mock.MagicMock(
- get_ro_taskinfos=mock.MagicMock(side_effect=ro_taskinfos),
- get_rw_taskinfos=mock.MagicMock(side_effect=rw_taskinfos))
-
- tasklist = sa.LoadTasklist('RO', export_taskinfo, self.analyzer.symbols)
- self.assertEqual(tasklist, expect_ro_tasklist)
- tasklist = sa.LoadTasklist('RW', export_taskinfo, self.analyzer.symbols)
- self.assertEqual(tasklist, expect_rw_tasklist)
-
- def testResolveAnnotation(self):
- self.analyzer.annotation = {}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
- self.assertEqual(add_rules, {})
- self.assertEqual(remove_rules, [])
- self.assertEqual(invalid_sigtxts, set())
-
- self.analyzer.annotation = {'add': None, 'remove': None}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
- self.assertEqual(add_rules, {})
- self.assertEqual(remove_rules, [])
- self.assertEqual(invalid_sigtxts, set())
-
- self.analyzer.annotation = {
- 'add': None,
- 'remove': [
- [['a', 'b'], ['0', '[', '2'], 'x'],
- [['a', 'b[x:3]'], ['0', '1', '2'], 'x'],
- ],
- }
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
- self.assertEqual(add_rules, {})
- self.assertEqual(list.sort(remove_rules), list.sort([
- [('a', None, None), ('1', None, None), ('x', None, None)],
- [('a', None, None), ('0', None, None), ('x', None, None)],
- [('a', None, None), ('2', None, None), ('x', None, None)],
- [('b', os.path.abspath('x'), 3), ('1', None, None), ('x', None, None)],
- [('b', os.path.abspath('x'), 3), ('0', None, None), ('x', None, None)],
- [('b', os.path.abspath('x'), 3), ('2', None, None), ('x', None, None)],
- ]))
- self.assertEqual(invalid_sigtxts, {'['})
-
- self.analyzer.annotation = {
- 'add': {
- 'touchpad_calc': [ dict(name='__array', stride=8, offset=4) ],
+ """Tests for class StackAnalyzer."""
+
+ def setUp(self):
+ symbols = [
+ sa.Symbol(0x1000, "F", 0x15C, "hook_task"),
+ sa.Symbol(0x2000, "F", 0x51C, "console_task"),
+ sa.Symbol(0x3200, "O", 0x124, "__just_data"),
+ sa.Symbol(0x4000, "F", 0x11C, "touchpad_calc"),
+ sa.Symbol(0x5000, "F", 0x12C, "touchpad_calc.constprop.42"),
+ sa.Symbol(0x12000, "F", 0x13C, "trackpad_range"),
+ sa.Symbol(0x13000, "F", 0x200, "inlined_mul"),
+ sa.Symbol(0x13100, "F", 0x200, "inlined_mul"),
+ sa.Symbol(0x13100, "F", 0x200, "inlined_mul_alias"),
+ sa.Symbol(0x20000, "O", 0x0, "__array"),
+ sa.Symbol(0x20010, "O", 0x0, "__array_end"),
+ ]
+ tasklist = [
+ sa.Task("HOOKS", "hook_task", 2048, 0x1000),
+ sa.Task("CONSOLE", "console_task", 460, 0x2000),
+ ]
+ # Array at 0x20000 that contains pointers to hook_task and console_task,
+ # with stride=8, offset=4
+ rodata = (0x20000, [0xDEAD1000, 0x00001000, 0xDEAD2000, 0x00002000])
+ options = mock.MagicMock(
+ elf_path="./ec.RW.elf",
+ export_taskinfo="fake",
+ section="RW",
+ objdump="objdump",
+ addr2line="addr2line",
+ annotation=None,
+ )
+ self.analyzer = sa.StackAnalyzer(options, symbols, rodata, tasklist, {})
+
+ def testParseSymbolText(self):
+ symbol_text = (
+ "0 g F .text e8 Foo\n"
+ "0000dead w F .text 000000e8 .hidden Bar\n"
+ "deadbeef l O .bss 00000004 .hidden Woooo\n"
+ "deadbee g O .rodata 00000008 __Hooo_ooo\n"
+ "deadbee g .rodata 00000000 __foo_doo_coo_end\n"
+ )
+ symbols = sa.ParseSymbolText(symbol_text)
+ expect_symbols = [
+ sa.Symbol(0x0, "F", 0xE8, "Foo"),
+ sa.Symbol(0xDEAD, "F", 0xE8, "Bar"),
+ sa.Symbol(0xDEADBEEF, "O", 0x4, "Woooo"),
+ sa.Symbol(0xDEADBEE, "O", 0x8, "__Hooo_ooo"),
+ sa.Symbol(0xDEADBEE, "O", 0x0, "__foo_doo_coo_end"),
+ ]
+ self.assertEqual(symbols, expect_symbols)
+
+ def testParseRoData(self):
+ rodata_text = (
+ "\n"
+ "Contents of section .rodata:\n"
+ " 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n"
+ )
+ rodata = sa.ParseRoDataText(rodata_text)
+ expect_rodata = (
+ 0x20000,
+ [0x0010ADDE, 0x00001000, 0x0020ADDE, 0x00002000],
+ )
+ self.assertEqual(rodata, expect_rodata)
+
+ def testLoadTasklist(self):
+ def tasklist_to_taskinfos(pointer, tasklist):
+ taskinfos = []
+ for task in tasklist:
+ taskinfos.append(
+ sa.TaskInfo(
+ name=task.name.encode("utf-8"),
+ routine=task.routine_name.encode("utf-8"),
+ stack_size=task.stack_max_size,
+ )
+ )
+
+ TaskInfoArray = sa.TaskInfo * len(taskinfos)
+ pointer.contents.contents = TaskInfoArray(*taskinfos)
+ return len(taskinfos)
+
+ def ro_taskinfos(pointer):
+ return tasklist_to_taskinfos(pointer, expect_ro_tasklist)
+
+ def rw_taskinfos(pointer):
+ return tasklist_to_taskinfos(pointer, expect_rw_tasklist)
+
+ expect_ro_tasklist = [
+ sa.Task("HOOKS", "hook_task", 2048, 0x1000),
+ ]
+
+ expect_rw_tasklist = [
+ sa.Task("HOOKS", "hook_task", 2048, 0x1000),
+ sa.Task("WOOKS", "hook_task", 4096, 0x1000),
+ sa.Task("CONSOLE", "console_task", 460, 0x2000),
+ ]
+
+ export_taskinfo = mock.MagicMock(
+ get_ro_taskinfos=mock.MagicMock(side_effect=ro_taskinfos),
+ get_rw_taskinfos=mock.MagicMock(side_effect=rw_taskinfos),
+ )
+
+ tasklist = sa.LoadTasklist("RO", export_taskinfo, self.analyzer.symbols)
+ self.assertEqual(tasklist, expect_ro_tasklist)
+ tasklist = sa.LoadTasklist("RW", export_taskinfo, self.analyzer.symbols)
+ self.assertEqual(tasklist, expect_rw_tasklist)
+
+ def testResolveAnnotation(self):
+ self.analyzer.annotation = {}
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
+ self.assertEqual(add_rules, {})
+ self.assertEqual(remove_rules, [])
+ self.assertEqual(invalid_sigtxts, set())
+
+ self.analyzer.annotation = {"add": None, "remove": None}
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
+ self.assertEqual(add_rules, {})
+ self.assertEqual(remove_rules, [])
+ self.assertEqual(invalid_sigtxts, set())
+
+ self.analyzer.annotation = {
+ "add": None,
+ "remove": [
+ [["a", "b"], ["0", "[", "2"], "x"],
+ [["a", "b[x:3]"], ["0", "1", "2"], "x"],
+ ],
}
- }
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
- self.assertEqual(add_rules, {
- ('touchpad_calc', None, None):
- set([('console_task', None, None), ('hook_task', None, None)])})
-
- funcs = {
- 0x1000: sa.Function(0x1000, 'hook_task', 0, []),
- 0x2000: sa.Function(0x2000, 'console_task', 0, []),
- 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
- 0x5000: sa.Function(0x5000, 'touchpad_calc.constprop.42', 0, []),
- 0x13000: sa.Function(0x13000, 'inlined_mul', 0, []),
- 0x13100: sa.Function(0x13100, 'inlined_mul', 0, []),
- }
- funcs[0x1000].callsites = [
- sa.Callsite(0x1002, None, False, None)]
- # Set address_to_line_cache to fake the results of addr2line.
- self.analyzer.address_to_line_cache = {
- (0x1000, False): [('hook_task', os.path.abspath('a.c'), 10)],
- (0x1002, False): [('toot_calc', os.path.abspath('t.c'), 1234)],
- (0x2000, False): [('console_task', os.path.abspath('b.c'), 20)],
- (0x4000, False): [('toudhpad_calc', os.path.abspath('a.c'), 20)],
- (0x5000, False): [
- ('touchpad_calc.constprop.42', os.path.abspath('b.c'), 40)],
- (0x12000, False): [('trackpad_range', os.path.abspath('t.c'), 10)],
- (0x13000, False): [('inlined_mul', os.path.abspath('x.c'), 12)],
- (0x13100, False): [('inlined_mul', os.path.abspath('x.c'), 12)],
- }
- self.analyzer.annotation = {
- 'add': {
- 'hook_task.lto.573': ['touchpad_calc.lto.2501[a.c]'],
- 'console_task': ['touchpad_calc[b.c]', 'inlined_mul_alias'],
- 'hook_task[q.c]': ['hook_task'],
- 'inlined_mul[x.c]': ['inlined_mul'],
- 'toot_calc[t.c:1234]': ['hook_task'],
- },
- 'remove': [
- ['touchpad?calc['],
- 'touchpad_calc',
- ['touchpad_calc[a.c]'],
- ['task_unk[a.c]'],
- ['touchpad_calc[x/a.c]'],
- ['trackpad_range'],
- ['inlined_mul'],
- ['inlined_mul', 'console_task', 'touchpad_calc[a.c]'],
- ['inlined_mul', 'inlined_mul_alias', 'console_task'],
- ['inlined_mul', 'inlined_mul_alias', 'console_task'],
- ],
- }
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
- self.assertEqual(invalid_sigtxts, {'touchpad?calc['})
-
- signature_set = set()
- for src_sig, dst_sigs in add_rules.items():
- signature_set.add(src_sig)
- signature_set.update(dst_sigs)
-
- for remove_sigs in remove_rules:
- signature_set.update(remove_sigs)
-
- (signature_map, failed_sigs) = self.analyzer.MapAnnotation(funcs,
- signature_set)
- result = self.analyzer.ResolveAnnotation(funcs)
- (add_set, remove_list, eliminated_addrs, failed_sigs) = result
-
- expect_signature_map = {
- ('hook_task', None, None): {funcs[0x1000]},
- ('touchpad_calc', os.path.abspath('a.c'), None): {funcs[0x4000]},
- ('touchpad_calc', os.path.abspath('b.c'), None): {funcs[0x5000]},
- ('console_task', None, None): {funcs[0x2000]},
- ('inlined_mul_alias', None, None): {funcs[0x13100]},
- ('inlined_mul', os.path.abspath('x.c'), None): {funcs[0x13000],
- funcs[0x13100]},
- ('inlined_mul', None, None): {funcs[0x13000], funcs[0x13100]},
- }
- self.assertEqual(len(signature_map), len(expect_signature_map))
- for sig, funclist in signature_map.items():
- self.assertEqual(set(funclist), expect_signature_map[sig])
-
- self.assertEqual(add_set, {
- (funcs[0x1000], funcs[0x4000]),
- (funcs[0x1000], funcs[0x1000]),
- (funcs[0x2000], funcs[0x5000]),
- (funcs[0x2000], funcs[0x13100]),
- (funcs[0x13000], funcs[0x13000]),
- (funcs[0x13000], funcs[0x13100]),
- (funcs[0x13100], funcs[0x13000]),
- (funcs[0x13100], funcs[0x13100]),
- })
- expect_remove_list = [
- [funcs[0x4000]],
- [funcs[0x13000]],
- [funcs[0x13100]],
- [funcs[0x13000], funcs[0x2000], funcs[0x4000]],
- [funcs[0x13100], funcs[0x2000], funcs[0x4000]],
- [funcs[0x13000], funcs[0x13100], funcs[0x2000]],
- [funcs[0x13100], funcs[0x13100], funcs[0x2000]],
- ]
- self.assertEqual(len(remove_list), len(expect_remove_list))
- for remove_path in remove_list:
- self.assertTrue(remove_path in expect_remove_list)
-
- self.assertEqual(eliminated_addrs, {0x1002})
- self.assertEqual(failed_sigs, {
- ('touchpad?calc[', sa.StackAnalyzer.ANNOTATION_ERROR_INVALID),
- ('touchpad_calc', sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS),
- ('hook_task[q.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
- ('task_unk[a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
- ('touchpad_calc[x/a.c]', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
- ('trackpad_range', sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
- })
-
- def testPreprocessAnnotation(self):
- funcs = {
- 0x1000: sa.Function(0x1000, 'hook_task', 0, []),
- 0x2000: sa.Function(0x2000, 'console_task', 0, []),
- 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
- }
- funcs[0x1000].callsites = [
- sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])]
- funcs[0x2000].callsites = [
- sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]),
- sa.Callsite(0x2006, None, True, None),
- ]
- add_set = {
- (funcs[0x2000], funcs[0x2000]),
- (funcs[0x2000], funcs[0x4000]),
- (funcs[0x4000], funcs[0x1000]),
- (funcs[0x4000], funcs[0x2000]),
- }
- remove_list = [
- [funcs[0x1000]],
- [funcs[0x2000], funcs[0x2000]],
- [funcs[0x4000], funcs[0x1000]],
- [funcs[0x2000], funcs[0x4000], funcs[0x2000]],
- [funcs[0x4000], funcs[0x1000], funcs[0x4000]],
- ]
- eliminated_addrs = {0x2006}
-
- remaining_remove_list = self.analyzer.PreprocessAnnotation(funcs,
- add_set,
- remove_list,
- eliminated_addrs)
-
- expect_funcs = {
- 0x1000: sa.Function(0x1000, 'hook_task', 0, []),
- 0x2000: sa.Function(0x2000, 'console_task', 0, []),
- 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
- }
- expect_funcs[0x2000].callsites = [
- sa.Callsite(None, 0x4000, False, expect_funcs[0x4000])]
- expect_funcs[0x4000].callsites = [
- sa.Callsite(None, 0x2000, False, expect_funcs[0x2000])]
- self.assertEqual(funcs, expect_funcs)
- self.assertEqual(remaining_remove_list, [
- [funcs[0x2000], funcs[0x4000], funcs[0x2000]],
- ])
-
- def testAndesAnalyzeDisassembly(self):
- disasm_text = (
- '\n'
- 'build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le'
- '\n'
- 'Disassembly of section .text:\n'
- '\n'
- '00000900 <wook_task>:\n'
- ' ...\n'
- '00001000 <hook_task>:\n'
- ' 1000: fc 42\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n'
- ' 1004: 47 70\t\tmovi55 $r0, #1\n'
- ' 1006: b1 13\tbnezs8 100929de <flash_command_write>\n'
- ' 1008: 00 01 5c fc\tbne $r6, $r0, 2af6a\n'
- '00002000 <console_task>:\n'
- ' 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n'
- ' 2002: f0 0e fc c5\tjal 1000 <hook_task>\n'
- ' 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n'
- ' 200a: de ad be ef\tswi.gp $r0, [ + #-11036]\n'
- '00004000 <touchpad_calc>:\n'
- ' 4000: 47 70\t\tmovi55 $r0, #1\n'
- '00010000 <look_task>:'
- )
- function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
- func_hook_task = sa.Function(0x1000, 'hook_task', 48, [
- sa.Callsite(0x1006, 0x100929de, True, None)])
- expect_funcmap = {
- 0x1000: func_hook_task,
- 0x2000: sa.Function(0x2000, 'console_task', 16,
- [sa.Callsite(0x2002, 0x1000, False, func_hook_task),
- sa.Callsite(0x2006, 0x53968, True, None)]),
- 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
- }
- self.assertEqual(function_map, expect_funcmap)
-
- def testArmAnalyzeDisassembly(self):
- disasm_text = (
- '\n'
- 'build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm'
- '\n'
- 'Disassembly of section .text:\n'
- '\n'
- '00000900 <wook_task>:\n'
- ' ...\n'
- '00001000 <hook_task>:\n'
- ' 1000: dead beef\tfake\n'
- ' 1004: 4770\t\tbx lr\n'
- ' 1006: b113\tcbz r3, 100929de <flash_command_write>\n'
- ' 1008: 00015cfc\t.word 0x00015cfc\n'
- '00002000 <console_task>:\n'
- ' 2000: b508\t\tpush {r3, lr} ; malformed comments,; r0, r1 \n'
- ' 2002: f00e fcc5\tbl 1000 <hook_task>\n'
- ' 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n'
- ' 200a: dead beef\tfake\n'
- '00004000 <touchpad_calc>:\n'
- ' 4000: 4770\t\tbx lr\n'
- '00010000 <look_task>:'
- )
- function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
- func_hook_task = sa.Function(0x1000, 'hook_task', 0, [
- sa.Callsite(0x1006, 0x100929de, True, None)])
- expect_funcmap = {
- 0x1000: func_hook_task,
- 0x2000: sa.Function(0x2000, 'console_task', 8,
- [sa.Callsite(0x2002, 0x1000, False, func_hook_task),
- sa.Callsite(0x2006, 0x53968, True, None)]),
- 0x4000: sa.Function(0x4000, 'touchpad_calc', 0, []),
- }
- self.assertEqual(function_map, expect_funcmap)
-
- def testAnalyzeCallGraph(self):
- funcs = {
- 0x1000: sa.Function(0x1000, 'hook_task', 0, []),
- 0x2000: sa.Function(0x2000, 'console_task', 8, []),
- 0x3000: sa.Function(0x3000, 'task_a', 12, []),
- 0x4000: sa.Function(0x4000, 'task_b', 96, []),
- 0x5000: sa.Function(0x5000, 'task_c', 32, []),
- 0x6000: sa.Function(0x6000, 'task_d', 100, []),
- 0x7000: sa.Function(0x7000, 'task_e', 24, []),
- 0x8000: sa.Function(0x8000, 'task_f', 20, []),
- 0x9000: sa.Function(0x9000, 'task_g', 20, []),
- 0x10000: sa.Function(0x10000, 'task_x', 16, []),
- }
- funcs[0x1000].callsites = [
- sa.Callsite(0x1002, 0x3000, False, funcs[0x3000]),
- sa.Callsite(0x1006, 0x4000, False, funcs[0x4000])]
- funcs[0x2000].callsites = [
- sa.Callsite(0x2002, 0x5000, False, funcs[0x5000]),
- sa.Callsite(0x2006, 0x2000, False, funcs[0x2000]),
- sa.Callsite(0x200a, 0x10000, False, funcs[0x10000])]
- funcs[0x3000].callsites = [
- sa.Callsite(0x3002, 0x4000, False, funcs[0x4000]),
- sa.Callsite(0x3006, 0x1000, False, funcs[0x1000])]
- funcs[0x4000].callsites = [
- sa.Callsite(0x4002, 0x6000, True, funcs[0x6000]),
- sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]),
- sa.Callsite(0x400a, 0x8000, False, funcs[0x8000])]
- funcs[0x5000].callsites = [
- sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])]
- funcs[0x7000].callsites = [
- sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])]
- funcs[0x8000].callsites = [
- sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])]
- funcs[0x9000].callsites = [
- sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])]
- funcs[0x10000].callsites = [
- sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])]
-
- cycles = self.analyzer.AnalyzeCallGraph(funcs, [
- [funcs[0x2000]] * 2,
- [funcs[0x10000], funcs[0x2000]] * 3,
- [funcs[0x1000], funcs[0x3000], funcs[0x1000]]
- ])
-
- expect_func_stack = {
- 0x1000: (268, [funcs[0x1000],
- funcs[0x3000],
- funcs[0x4000],
- funcs[0x8000],
- funcs[0x9000],
- funcs[0x4000],
- funcs[0x7000]]),
- 0x2000: (208, [funcs[0x2000],
- funcs[0x10000],
- funcs[0x2000],
- funcs[0x10000],
- funcs[0x2000],
- funcs[0x5000],
- funcs[0x4000],
- funcs[0x7000]]),
- 0x3000: (280, [funcs[0x3000],
- funcs[0x1000],
- funcs[0x3000],
- funcs[0x4000],
- funcs[0x8000],
- funcs[0x9000],
- funcs[0x4000],
- funcs[0x7000]]),
- 0x4000: (120, [funcs[0x4000], funcs[0x7000]]),
- 0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]),
- 0x6000: (100, [funcs[0x6000]]),
- 0x7000: (24, [funcs[0x7000]]),
- 0x8000: (160, [funcs[0x8000],
- funcs[0x9000],
- funcs[0x4000],
- funcs[0x7000]]),
- 0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]),
- 0x10000: (200, [funcs[0x10000],
- funcs[0x2000],
- funcs[0x10000],
- funcs[0x2000],
- funcs[0x5000],
- funcs[0x4000],
- funcs[0x7000]]),
- }
- expect_cycles = [
- {funcs[0x4000], funcs[0x8000], funcs[0x9000]},
- {funcs[0x7000]},
- ]
- for func in funcs.values():
- (stack_max_usage, stack_max_path) = expect_func_stack[func.address]
- self.assertEqual(func.stack_max_usage, stack_max_usage)
- self.assertEqual(func.stack_max_path, stack_max_path)
-
- self.assertEqual(len(cycles), len(expect_cycles))
- for cycle in cycles:
- self.assertTrue(cycle in expect_cycles)
-
- @mock.patch('subprocess.check_output')
- def testAddressToLine(self, checkoutput_mock):
- checkoutput_mock.return_value = 'fake_func\n/test.c:1'
- self.assertEqual(self.analyzer.AddressToLine(0x1234),
- [('fake_func', '/test.c', 1)])
- checkoutput_mock.assert_called_once_with(
- ['addr2line', '-f', '-e', './ec.RW.elf', '1234'], encoding='utf-8')
- checkoutput_mock.reset_mock()
-
- checkoutput_mock.return_value = 'fake_func\n/a.c:1\nbake_func\n/b.c:2\n'
- self.assertEqual(self.analyzer.AddressToLine(0x1234, True),
- [('fake_func', '/a.c', 1), ('bake_func', '/b.c', 2)])
- checkoutput_mock.assert_called_once_with(
- ['addr2line', '-f', '-e', './ec.RW.elf', '1234', '-i'],
- encoding='utf-8')
- checkoutput_mock.reset_mock()
-
- checkoutput_mock.return_value = 'fake_func\n/test.c:1 (discriminator 128)'
- self.assertEqual(self.analyzer.AddressToLine(0x12345),
- [('fake_func', '/test.c', 1)])
- checkoutput_mock.assert_called_once_with(
- ['addr2line', '-f', '-e', './ec.RW.elf', '12345'], encoding='utf-8')
- checkoutput_mock.reset_mock()
-
- checkoutput_mock.return_value = '??\n:?\nbake_func\n/b.c:2\n'
- self.assertEqual(self.analyzer.AddressToLine(0x123456),
- [None, ('bake_func', '/b.c', 2)])
- checkoutput_mock.assert_called_once_with(
- ['addr2line', '-f', '-e', './ec.RW.elf', '123456'], encoding='utf-8')
- checkoutput_mock.reset_mock()
-
- with self.assertRaisesRegexp(sa.StackAnalyzerError,
- 'addr2line failed to resolve lines.'):
- checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '')
- self.analyzer.AddressToLine(0x5678)
-
- with self.assertRaisesRegexp(sa.StackAnalyzerError,
- 'Failed to run addr2line.'):
- checkoutput_mock.side_effect = OSError()
- self.analyzer.AddressToLine(0x9012)
-
- @mock.patch('subprocess.check_output')
- @mock.patch('stack_analyzer.StackAnalyzer.AddressToLine')
- def testAndesAnalyze(self, addrtoline_mock, checkoutput_mock):
- disasm_text = (
- '\n'
- 'build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le'
- '\n'
- 'Disassembly of section .text:\n'
- '\n'
- '00000900 <wook_task>:\n'
- ' ...\n'
- '00001000 <hook_task>:\n'
- ' 1000: fc 00\t\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n'
- ' 1002: 47 70\t\tmovi55 $r0, #1\n'
- ' 1006: 00 01 5c fc\tbne $r6, $r0, 2af6a\n'
- '00002000 <console_task>:\n'
- ' 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n'
- ' 2002: f0 0e fc c5\tjal 1000 <hook_task>\n'
- ' 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n'
- ' 200a: 12 34 56 78\tjral5 $r0\n'
- )
-
- addrtoline_mock.return_value = [('??', '??', 0)]
- self.analyzer.annotation = {
- 'exception_frame_size': 64,
- 'remove': [['fake_func']],
- }
-
- with mock.patch('builtins.print') as print_mock:
- checkoutput_mock.return_value = disasm_text
- self.analyzer.Analyze()
- print_mock.assert_has_calls([
- mock.call(
- 'Task: HOOKS, Max size: 96 (32 + 64), Allocated size: 2048'),
- mock.call('Call Trace:'),
- mock.call(' hook_task (32) [??:0] 1000'),
- mock.call(
- 'Task: CONSOLE, Max size: 112 (48 + 64), Allocated size: 460'),
- mock.call('Call Trace:'),
- mock.call(' console_task (16) [??:0] 2000'),
- mock.call(' -> ??[??:0] 2002'),
- mock.call(' hook_task (32) [??:0] 1000'),
- mock.call('Unresolved indirect callsites:'),
- mock.call(' In function console_task:'),
- mock.call(' -> ??[??:0] 200a'),
- mock.call('Unresolved annotation signatures:'),
- mock.call(' fake_func: function is not found'),
- ])
-
- with self.assertRaisesRegexp(sa.StackAnalyzerError,
- 'Failed to run objdump.'):
- checkoutput_mock.side_effect = OSError()
- self.analyzer.Analyze()
-
- with self.assertRaisesRegexp(sa.StackAnalyzerError,
- 'objdump failed to disassemble.'):
- checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '')
- self.analyzer.Analyze()
-
- @mock.patch('subprocess.check_output')
- @mock.patch('stack_analyzer.StackAnalyzer.AddressToLine')
- def testArmAnalyze(self, addrtoline_mock, checkoutput_mock):
- disasm_text = (
- '\n'
- 'build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm'
- '\n'
- 'Disassembly of section .text:\n'
- '\n'
- '00000900 <wook_task>:\n'
- ' ...\n'
- '00001000 <hook_task>:\n'
- ' 1000: b508\t\tpush {r3, lr}\n'
- ' 1002: 4770\t\tbx lr\n'
- ' 1006: 00015cfc\t.word 0x00015cfc\n'
- '00002000 <console_task>:\n'
- ' 2000: b508\t\tpush {r3, lr}\n'
- ' 2002: f00e fcc5\tbl 1000 <hook_task>\n'
- ' 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n'
- ' 200a: 1234 5678\tb.w sl\n'
- )
-
- addrtoline_mock.return_value = [('??', '??', 0)]
- self.analyzer.annotation = {
- 'exception_frame_size': 64,
- 'remove': [['fake_func']],
- }
-
- with mock.patch('builtins.print') as print_mock:
- checkoutput_mock.return_value = disasm_text
- self.analyzer.Analyze()
- print_mock.assert_has_calls([
- mock.call(
- 'Task: HOOKS, Max size: 72 (8 + 64), Allocated size: 2048'),
- mock.call('Call Trace:'),
- mock.call(' hook_task (8) [??:0] 1000'),
- mock.call(
- 'Task: CONSOLE, Max size: 80 (16 + 64), Allocated size: 460'),
- mock.call('Call Trace:'),
- mock.call(' console_task (8) [??:0] 2000'),
- mock.call(' -> ??[??:0] 2002'),
- mock.call(' hook_task (8) [??:0] 1000'),
- mock.call('Unresolved indirect callsites:'),
- mock.call(' In function console_task:'),
- mock.call(' -> ??[??:0] 200a'),
- mock.call('Unresolved annotation signatures:'),
- mock.call(' fake_func: function is not found'),
- ])
-
- with self.assertRaisesRegexp(sa.StackAnalyzerError,
- 'Failed to run objdump.'):
- checkoutput_mock.side_effect = OSError()
- self.analyzer.Analyze()
-
- with self.assertRaisesRegexp(sa.StackAnalyzerError,
- 'objdump failed to disassemble.'):
- checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '')
- self.analyzer.Analyze()
-
- @mock.patch('subprocess.check_output')
- @mock.patch('stack_analyzer.ParseArgs')
- def testMain(self, parseargs_mock, checkoutput_mock):
- symbol_text = ('1000 g F .text 0000015c .hidden hook_task\n'
- '2000 g F .text 0000051c .hidden console_task\n')
- rodata_text = (
- '\n'
- 'Contents of section .rodata:\n'
- ' 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n'
- )
-
- args = mock.MagicMock(elf_path='./ec.RW.elf',
- export_taskinfo='fake',
- section='RW',
- objdump='objdump',
- addr2line='addr2line',
- annotation='fake')
- parseargs_mock.return_value = args
-
- with mock.patch('os.path.exists') as path_mock:
- path_mock.return_value = False
- with mock.patch('builtins.print') as print_mock:
- with mock.patch('builtins.open', mock.mock_open()) as open_mock:
- sa.main()
- print_mock.assert_any_call(
- 'Warning: Annotation file fake does not exist.')
-
- with mock.patch('os.path.exists') as path_mock:
- path_mock.return_value = True
- with mock.patch('builtins.print') as print_mock:
- with mock.patch('builtins.open', mock.mock_open()) as open_mock:
- open_mock.side_effect = IOError()
- sa.main()
- print_mock.assert_called_once_with(
- 'Error: Failed to open annotation file fake.')
-
- with mock.patch('builtins.print') as print_mock:
- with mock.patch('builtins.open', mock.mock_open()) as open_mock:
- open_mock.return_value.read.side_effect = ['{', '']
- sa.main()
- open_mock.assert_called_once_with('fake', 'r')
- print_mock.assert_called_once_with(
- 'Error: Failed to parse annotation file fake.')
-
- with mock.patch('builtins.print') as print_mock:
- with mock.patch('builtins.open',
- mock.mock_open(read_data='')) as open_mock:
- sa.main()
- print_mock.assert_called_once_with(
- 'Error: Invalid annotation file fake.')
-
- args.annotation = None
-
- with mock.patch('builtins.print') as print_mock:
- checkoutput_mock.side_effect = [symbol_text, rodata_text]
- sa.main()
- print_mock.assert_called_once_with(
- 'Error: Failed to load export_taskinfo.')
-
- with mock.patch('builtins.print') as print_mock:
- checkoutput_mock.side_effect = subprocess.CalledProcessError(1, '')
- sa.main()
- print_mock.assert_called_once_with(
- 'Error: objdump failed to dump symbol table or rodata.')
-
- with mock.patch('builtins.print') as print_mock:
- checkoutput_mock.side_effect = OSError()
- sa.main()
- print_mock.assert_called_once_with('Error: Failed to run objdump.')
-
-
-if __name__ == '__main__':
- unittest.main()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
+ self.assertEqual(add_rules, {})
+ self.assertEqual(
+ list.sort(remove_rules),
+ list.sort(
+ [
+ [("a", None, None), ("1", None, None), ("x", None, None)],
+ [("a", None, None), ("0", None, None), ("x", None, None)],
+ [("a", None, None), ("2", None, None), ("x", None, None)],
+ [
+ ("b", os.path.abspath("x"), 3),
+ ("1", None, None),
+ ("x", None, None),
+ ],
+ [
+ ("b", os.path.abspath("x"), 3),
+ ("0", None, None),
+ ("x", None, None),
+ ],
+ [
+ ("b", os.path.abspath("x"), 3),
+ ("2", None, None),
+ ("x", None, None),
+ ],
+ ]
+ ),
+ )
+ self.assertEqual(invalid_sigtxts, {"["})
+
+ self.analyzer.annotation = {
+ "add": {
+ "touchpad_calc": [dict(name="__array", stride=8, offset=4)],
+ }
+ }
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
+ self.assertEqual(
+ add_rules,
+ {
+ ("touchpad_calc", None, None): set(
+ [("console_task", None, None), ("hook_task", None, None)]
+ )
+ },
+ )
+
+ funcs = {
+ 0x1000: sa.Function(0x1000, "hook_task", 0, []),
+ 0x2000: sa.Function(0x2000, "console_task", 0, []),
+ 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
+ 0x5000: sa.Function(0x5000, "touchpad_calc.constprop.42", 0, []),
+ 0x13000: sa.Function(0x13000, "inlined_mul", 0, []),
+ 0x13100: sa.Function(0x13100, "inlined_mul", 0, []),
+ }
+ funcs[0x1000].callsites = [sa.Callsite(0x1002, None, False, None)]
+ # Set address_to_line_cache to fake the results of addr2line.
+ self.analyzer.address_to_line_cache = {
+ (0x1000, False): [("hook_task", os.path.abspath("a.c"), 10)],
+ (0x1002, False): [("toot_calc", os.path.abspath("t.c"), 1234)],
+ (0x2000, False): [("console_task", os.path.abspath("b.c"), 20)],
+ (0x4000, False): [("toudhpad_calc", os.path.abspath("a.c"), 20)],
+ (0x5000, False): [
+ ("touchpad_calc.constprop.42", os.path.abspath("b.c"), 40)
+ ],
+ (0x12000, False): [("trackpad_range", os.path.abspath("t.c"), 10)],
+ (0x13000, False): [("inlined_mul", os.path.abspath("x.c"), 12)],
+ (0x13100, False): [("inlined_mul", os.path.abspath("x.c"), 12)],
+ }
+ self.analyzer.annotation = {
+ "add": {
+ "hook_task.lto.573": ["touchpad_calc.lto.2501[a.c]"],
+ "console_task": ["touchpad_calc[b.c]", "inlined_mul_alias"],
+ "hook_task[q.c]": ["hook_task"],
+ "inlined_mul[x.c]": ["inlined_mul"],
+ "toot_calc[t.c:1234]": ["hook_task"],
+ },
+ "remove": [
+ ["touchpad?calc["],
+ "touchpad_calc",
+ ["touchpad_calc[a.c]"],
+ ["task_unk[a.c]"],
+ ["touchpad_calc[x/a.c]"],
+ ["trackpad_range"],
+ ["inlined_mul"],
+ ["inlined_mul", "console_task", "touchpad_calc[a.c]"],
+ ["inlined_mul", "inlined_mul_alias", "console_task"],
+ ["inlined_mul", "inlined_mul_alias", "console_task"],
+ ],
+ }
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
+ self.assertEqual(invalid_sigtxts, {"touchpad?calc["})
+
+ signature_set = set()
+ for src_sig, dst_sigs in add_rules.items():
+ signature_set.add(src_sig)
+ signature_set.update(dst_sigs)
+
+ for remove_sigs in remove_rules:
+ signature_set.update(remove_sigs)
+
+ (signature_map, failed_sigs) = self.analyzer.MapAnnotation(
+ funcs, signature_set
+ )
+ result = self.analyzer.ResolveAnnotation(funcs)
+ (add_set, remove_list, eliminated_addrs, failed_sigs) = result
+
+ expect_signature_map = {
+ ("hook_task", None, None): {funcs[0x1000]},
+ ("touchpad_calc", os.path.abspath("a.c"), None): {funcs[0x4000]},
+ ("touchpad_calc", os.path.abspath("b.c"), None): {funcs[0x5000]},
+ ("console_task", None, None): {funcs[0x2000]},
+ ("inlined_mul_alias", None, None): {funcs[0x13100]},
+ ("inlined_mul", os.path.abspath("x.c"), None): {
+ funcs[0x13000],
+ funcs[0x13100],
+ },
+ ("inlined_mul", None, None): {funcs[0x13000], funcs[0x13100]},
+ }
+ self.assertEqual(len(signature_map), len(expect_signature_map))
+ for sig, funclist in signature_map.items():
+ self.assertEqual(set(funclist), expect_signature_map[sig])
+
+ self.assertEqual(
+ add_set,
+ {
+ (funcs[0x1000], funcs[0x4000]),
+ (funcs[0x1000], funcs[0x1000]),
+ (funcs[0x2000], funcs[0x5000]),
+ (funcs[0x2000], funcs[0x13100]),
+ (funcs[0x13000], funcs[0x13000]),
+ (funcs[0x13000], funcs[0x13100]),
+ (funcs[0x13100], funcs[0x13000]),
+ (funcs[0x13100], funcs[0x13100]),
+ },
+ )
+ expect_remove_list = [
+ [funcs[0x4000]],
+ [funcs[0x13000]],
+ [funcs[0x13100]],
+ [funcs[0x13000], funcs[0x2000], funcs[0x4000]],
+ [funcs[0x13100], funcs[0x2000], funcs[0x4000]],
+ [funcs[0x13000], funcs[0x13100], funcs[0x2000]],
+ [funcs[0x13100], funcs[0x13100], funcs[0x2000]],
+ ]
+ self.assertEqual(len(remove_list), len(expect_remove_list))
+ for remove_path in remove_list:
+ self.assertTrue(remove_path in expect_remove_list)
+
+ self.assertEqual(eliminated_addrs, {0x1002})
+ self.assertEqual(
+ failed_sigs,
+ {
+ ("touchpad?calc[", sa.StackAnalyzer.ANNOTATION_ERROR_INVALID),
+ ("touchpad_calc", sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS),
+ ("hook_task[q.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
+ ("task_unk[a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
+ (
+ "touchpad_calc[x/a.c]",
+ sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND,
+ ),
+ ("trackpad_range", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
+ },
+ )
+
+ def testPreprocessAnnotation(self):
+ funcs = {
+ 0x1000: sa.Function(0x1000, "hook_task", 0, []),
+ 0x2000: sa.Function(0x2000, "console_task", 0, []),
+ 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
+ }
+ funcs[0x1000].callsites = [
+ sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])
+ ]
+ funcs[0x2000].callsites = [
+ sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]),
+ sa.Callsite(0x2006, None, True, None),
+ ]
+ add_set = {
+ (funcs[0x2000], funcs[0x2000]),
+ (funcs[0x2000], funcs[0x4000]),
+ (funcs[0x4000], funcs[0x1000]),
+ (funcs[0x4000], funcs[0x2000]),
+ }
+ remove_list = [
+ [funcs[0x1000]],
+ [funcs[0x2000], funcs[0x2000]],
+ [funcs[0x4000], funcs[0x1000]],
+ [funcs[0x2000], funcs[0x4000], funcs[0x2000]],
+ [funcs[0x4000], funcs[0x1000], funcs[0x4000]],
+ ]
+ eliminated_addrs = {0x2006}
+
+ remaining_remove_list = self.analyzer.PreprocessAnnotation(
+ funcs, add_set, remove_list, eliminated_addrs
+ )
+
+ expect_funcs = {
+ 0x1000: sa.Function(0x1000, "hook_task", 0, []),
+ 0x2000: sa.Function(0x2000, "console_task", 0, []),
+ 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
+ }
+ expect_funcs[0x2000].callsites = [
+ sa.Callsite(None, 0x4000, False, expect_funcs[0x4000])
+ ]
+ expect_funcs[0x4000].callsites = [
+ sa.Callsite(None, 0x2000, False, expect_funcs[0x2000])
+ ]
+ self.assertEqual(funcs, expect_funcs)
+ self.assertEqual(
+ remaining_remove_list,
+ [
+ [funcs[0x2000], funcs[0x4000], funcs[0x2000]],
+ ],
+ )
+
+ def testAndesAnalyzeDisassembly(self):
+ disasm_text = (
+ "\n"
+ "build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le"
+ "\n"
+ "Disassembly of section .text:\n"
+ "\n"
+ "00000900 <wook_task>:\n"
+ " ...\n"
+ "00001000 <hook_task>:\n"
+ " 1000: fc 42\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n"
+ " 1004: 47 70\t\tmovi55 $r0, #1\n"
+ " 1006: b1 13\tbnezs8 100929de <flash_command_write>\n"
+ " 1008: 00 01 5c fc\tbne $r6, $r0, 2af6a\n"
+ "00002000 <console_task>:\n"
+ " 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n"
+ " 2002: f0 0e fc c5\tjal 1000 <hook_task>\n"
+ " 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n"
+ " 200a: de ad be ef\tswi.gp $r0, [ + #-11036]\n"
+ "00004000 <touchpad_calc>:\n"
+ " 4000: 47 70\t\tmovi55 $r0, #1\n"
+ "00010000 <look_task>:"
+ )
+ function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
+ func_hook_task = sa.Function(
+ 0x1000,
+ "hook_task",
+ 48,
+ [sa.Callsite(0x1006, 0x100929DE, True, None)],
+ )
+ expect_funcmap = {
+ 0x1000: func_hook_task,
+ 0x2000: sa.Function(
+ 0x2000,
+ "console_task",
+ 16,
+ [
+ sa.Callsite(0x2002, 0x1000, False, func_hook_task),
+ sa.Callsite(0x2006, 0x53968, True, None),
+ ],
+ ),
+ 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
+ }
+ self.assertEqual(function_map, expect_funcmap)
+
+ def testArmAnalyzeDisassembly(self):
+ disasm_text = (
+ "\n"
+ "build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm"
+ "\n"
+ "Disassembly of section .text:\n"
+ "\n"
+ "00000900 <wook_task>:\n"
+ " ...\n"
+ "00001000 <hook_task>:\n"
+ " 1000: dead beef\tfake\n"
+ " 1004: 4770\t\tbx lr\n"
+ " 1006: b113\tcbz r3, 100929de <flash_command_write>\n"
+ " 1008: 00015cfc\t.word 0x00015cfc\n"
+ "00002000 <console_task>:\n"
+ " 2000: b508\t\tpush {r3, lr} ; malformed comments,; r0, r1 \n"
+ " 2002: f00e fcc5\tbl 1000 <hook_task>\n"
+ " 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n"
+ " 200a: dead beef\tfake\n"
+ "00004000 <touchpad_calc>:\n"
+ " 4000: 4770\t\tbx lr\n"
+ "00010000 <look_task>:"
+ )
+ function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
+ func_hook_task = sa.Function(
+ 0x1000,
+ "hook_task",
+ 0,
+ [sa.Callsite(0x1006, 0x100929DE, True, None)],
+ )
+ expect_funcmap = {
+ 0x1000: func_hook_task,
+ 0x2000: sa.Function(
+ 0x2000,
+ "console_task",
+ 8,
+ [
+ sa.Callsite(0x2002, 0x1000, False, func_hook_task),
+ sa.Callsite(0x2006, 0x53968, True, None),
+ ],
+ ),
+ 0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
+ }
+ self.assertEqual(function_map, expect_funcmap)
+
+ def testAnalyzeCallGraph(self):
+ funcs = {
+ 0x1000: sa.Function(0x1000, "hook_task", 0, []),
+ 0x2000: sa.Function(0x2000, "console_task", 8, []),
+ 0x3000: sa.Function(0x3000, "task_a", 12, []),
+ 0x4000: sa.Function(0x4000, "task_b", 96, []),
+ 0x5000: sa.Function(0x5000, "task_c", 32, []),
+ 0x6000: sa.Function(0x6000, "task_d", 100, []),
+ 0x7000: sa.Function(0x7000, "task_e", 24, []),
+ 0x8000: sa.Function(0x8000, "task_f", 20, []),
+ 0x9000: sa.Function(0x9000, "task_g", 20, []),
+ 0x10000: sa.Function(0x10000, "task_x", 16, []),
+ }
+ funcs[0x1000].callsites = [
+ sa.Callsite(0x1002, 0x3000, False, funcs[0x3000]),
+ sa.Callsite(0x1006, 0x4000, False, funcs[0x4000]),
+ ]
+ funcs[0x2000].callsites = [
+ sa.Callsite(0x2002, 0x5000, False, funcs[0x5000]),
+ sa.Callsite(0x2006, 0x2000, False, funcs[0x2000]),
+ sa.Callsite(0x200A, 0x10000, False, funcs[0x10000]),
+ ]
+ funcs[0x3000].callsites = [
+ sa.Callsite(0x3002, 0x4000, False, funcs[0x4000]),
+ sa.Callsite(0x3006, 0x1000, False, funcs[0x1000]),
+ ]
+ funcs[0x4000].callsites = [
+ sa.Callsite(0x4002, 0x6000, True, funcs[0x6000]),
+ sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]),
+ sa.Callsite(0x400A, 0x8000, False, funcs[0x8000]),
+ ]
+ funcs[0x5000].callsites = [
+ sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])
+ ]
+ funcs[0x7000].callsites = [
+ sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])
+ ]
+ funcs[0x8000].callsites = [
+ sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])
+ ]
+ funcs[0x9000].callsites = [
+ sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])
+ ]
+ funcs[0x10000].callsites = [
+ sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])
+ ]
+
+ cycles = self.analyzer.AnalyzeCallGraph(
+ funcs,
+ [
+ [funcs[0x2000]] * 2,
+ [funcs[0x10000], funcs[0x2000]] * 3,
+ [funcs[0x1000], funcs[0x3000], funcs[0x1000]],
+ ],
+ )
+
+ expect_func_stack = {
+ 0x1000: (
+ 268,
+ [
+ funcs[0x1000],
+ funcs[0x3000],
+ funcs[0x4000],
+ funcs[0x8000],
+ funcs[0x9000],
+ funcs[0x4000],
+ funcs[0x7000],
+ ],
+ ),
+ 0x2000: (
+ 208,
+ [
+ funcs[0x2000],
+ funcs[0x10000],
+ funcs[0x2000],
+ funcs[0x10000],
+ funcs[0x2000],
+ funcs[0x5000],
+ funcs[0x4000],
+ funcs[0x7000],
+ ],
+ ),
+ 0x3000: (
+ 280,
+ [
+ funcs[0x3000],
+ funcs[0x1000],
+ funcs[0x3000],
+ funcs[0x4000],
+ funcs[0x8000],
+ funcs[0x9000],
+ funcs[0x4000],
+ funcs[0x7000],
+ ],
+ ),
+ 0x4000: (120, [funcs[0x4000], funcs[0x7000]]),
+ 0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]),
+ 0x6000: (100, [funcs[0x6000]]),
+ 0x7000: (24, [funcs[0x7000]]),
+ 0x8000: (
+ 160,
+ [funcs[0x8000], funcs[0x9000], funcs[0x4000], funcs[0x7000]],
+ ),
+ 0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]),
+ 0x10000: (
+ 200,
+ [
+ funcs[0x10000],
+ funcs[0x2000],
+ funcs[0x10000],
+ funcs[0x2000],
+ funcs[0x5000],
+ funcs[0x4000],
+ funcs[0x7000],
+ ],
+ ),
+ }
+ expect_cycles = [
+ {funcs[0x4000], funcs[0x8000], funcs[0x9000]},
+ {funcs[0x7000]},
+ ]
+ for func in funcs.values():
+ (stack_max_usage, stack_max_path) = expect_func_stack[func.address]
+ self.assertEqual(func.stack_max_usage, stack_max_usage)
+ self.assertEqual(func.stack_max_path, stack_max_path)
+
+ self.assertEqual(len(cycles), len(expect_cycles))
+ for cycle in cycles:
+ self.assertTrue(cycle in expect_cycles)
+
+ @mock.patch("subprocess.check_output")
+ def testAddressToLine(self, checkoutput_mock):
+ checkoutput_mock.return_value = "fake_func\n/test.c:1"
+ self.assertEqual(
+ self.analyzer.AddressToLine(0x1234), [("fake_func", "/test.c", 1)]
+ )
+ checkoutput_mock.assert_called_once_with(
+ ["addr2line", "-f", "-e", "./ec.RW.elf", "1234"], encoding="utf-8"
+ )
+ checkoutput_mock.reset_mock()
+
+ checkoutput_mock.return_value = "fake_func\n/a.c:1\nbake_func\n/b.c:2\n"
+ self.assertEqual(
+ self.analyzer.AddressToLine(0x1234, True),
+ [("fake_func", "/a.c", 1), ("bake_func", "/b.c", 2)],
+ )
+ checkoutput_mock.assert_called_once_with(
+ ["addr2line", "-f", "-e", "./ec.RW.elf", "1234", "-i"],
+ encoding="utf-8",
+ )
+ checkoutput_mock.reset_mock()
+
+ checkoutput_mock.return_value = (
+ "fake_func\n/test.c:1 (discriminator 128)"
+ )
+ self.assertEqual(
+ self.analyzer.AddressToLine(0x12345), [("fake_func", "/test.c", 1)]
+ )
+ checkoutput_mock.assert_called_once_with(
+ ["addr2line", "-f", "-e", "./ec.RW.elf", "12345"], encoding="utf-8"
+ )
+ checkoutput_mock.reset_mock()
+
+ checkoutput_mock.return_value = "??\n:?\nbake_func\n/b.c:2\n"
+ self.assertEqual(
+ self.analyzer.AddressToLine(0x123456),
+ [None, ("bake_func", "/b.c", 2)],
+ )
+ checkoutput_mock.assert_called_once_with(
+ ["addr2line", "-f", "-e", "./ec.RW.elf", "123456"], encoding="utf-8"
+ )
+ checkoutput_mock.reset_mock()
+
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "addr2line failed to resolve lines."
+ ):
+ checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
+ self.analyzer.AddressToLine(0x5678)
+
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run addr2line."
+ ):
+ checkoutput_mock.side_effect = OSError()
+ self.analyzer.AddressToLine(0x9012)
+
+ @mock.patch("subprocess.check_output")
+ @mock.patch("stack_analyzer.StackAnalyzer.AddressToLine")
+ def testAndesAnalyze(self, addrtoline_mock, checkoutput_mock):
+ disasm_text = (
+ "\n"
+ "build/{BOARD}/RW/ec.RW.elf: file format elf32-nds32le"
+ "\n"
+ "Disassembly of section .text:\n"
+ "\n"
+ "00000900 <wook_task>:\n"
+ " ...\n"
+ "00001000 <hook_task>:\n"
+ " 1000: fc 00\t\tpush25 $r10, #16 ! {$r6~$r10, $fp, $gp, $lp}\n"
+ " 1002: 47 70\t\tmovi55 $r0, #1\n"
+ " 1006: 00 01 5c fc\tbne $r6, $r0, 2af6a\n"
+ "00002000 <console_task>:\n"
+ " 2000: fc 00\t\tpush25 $r6, #0 ! {$r6, $fp, $gp, $lp} \n"
+ " 2002: f0 0e fc c5\tjal 1000 <hook_task>\n"
+ " 2006: f0 0e bd 3b\tj 53968 <get_program_memory_addr>\n"
+ " 200a: 12 34 56 78\tjral5 $r0\n"
+ )
+
+ addrtoline_mock.return_value = [("??", "??", 0)]
+ self.analyzer.annotation = {
+ "exception_frame_size": 64,
+ "remove": [["fake_func"]],
+ }
+
+ with mock.patch("builtins.print") as print_mock:
+ checkoutput_mock.return_value = disasm_text
+ self.analyzer.Analyze()
+ print_mock.assert_has_calls(
+ [
+ mock.call(
+ "Task: HOOKS, Max size: 96 (32 + 64), Allocated size: 2048"
+ ),
+ mock.call("Call Trace:"),
+ mock.call(" hook_task (32) [??:0] 1000"),
+ mock.call(
+ "Task: CONSOLE, Max size: 112 (48 + 64), Allocated size: 460"
+ ),
+ mock.call("Call Trace:"),
+ mock.call(" console_task (16) [??:0] 2000"),
+ mock.call(" -> ??[??:0] 2002"),
+ mock.call(" hook_task (32) [??:0] 1000"),
+ mock.call("Unresolved indirect callsites:"),
+ mock.call(" In function console_task:"),
+ mock.call(" -> ??[??:0] 200a"),
+ mock.call("Unresolved annotation signatures:"),
+ mock.call(" fake_func: function is not found"),
+ ]
+ )
+
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run objdump."
+ ):
+ checkoutput_mock.side_effect = OSError()
+ self.analyzer.Analyze()
+
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "objdump failed to disassemble."
+ ):
+ checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
+ self.analyzer.Analyze()
+
+ @mock.patch("subprocess.check_output")
+ @mock.patch("stack_analyzer.StackAnalyzer.AddressToLine")
+ def testArmAnalyze(self, addrtoline_mock, checkoutput_mock):
+ disasm_text = (
+ "\n"
+ "build/{BOARD}/RW/ec.RW.elf: file format elf32-littlearm"
+ "\n"
+ "Disassembly of section .text:\n"
+ "\n"
+ "00000900 <wook_task>:\n"
+ " ...\n"
+ "00001000 <hook_task>:\n"
+ " 1000: b508\t\tpush {r3, lr}\n"
+ " 1002: 4770\t\tbx lr\n"
+ " 1006: 00015cfc\t.word 0x00015cfc\n"
+ "00002000 <console_task>:\n"
+ " 2000: b508\t\tpush {r3, lr}\n"
+ " 2002: f00e fcc5\tbl 1000 <hook_task>\n"
+ " 2006: f00e bd3b\tb.w 53968 <get_program_memory_addr>\n"
+ " 200a: 1234 5678\tb.w sl\n"
+ )
+
+ addrtoline_mock.return_value = [("??", "??", 0)]
+ self.analyzer.annotation = {
+ "exception_frame_size": 64,
+ "remove": [["fake_func"]],
+ }
+
+ with mock.patch("builtins.print") as print_mock:
+ checkoutput_mock.return_value = disasm_text
+ self.analyzer.Analyze()
+ print_mock.assert_has_calls(
+ [
+ mock.call(
+ "Task: HOOKS, Max size: 72 (8 + 64), Allocated size: 2048"
+ ),
+ mock.call("Call Trace:"),
+ mock.call(" hook_task (8) [??:0] 1000"),
+ mock.call(
+ "Task: CONSOLE, Max size: 80 (16 + 64), Allocated size: 460"
+ ),
+ mock.call("Call Trace:"),
+ mock.call(" console_task (8) [??:0] 2000"),
+ mock.call(" -> ??[??:0] 2002"),
+ mock.call(" hook_task (8) [??:0] 1000"),
+ mock.call("Unresolved indirect callsites:"),
+ mock.call(" In function console_task:"),
+ mock.call(" -> ??[??:0] 200a"),
+ mock.call("Unresolved annotation signatures:"),
+ mock.call(" fake_func: function is not found"),
+ ]
+ )
+
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run objdump."
+ ):
+ checkoutput_mock.side_effect = OSError()
+ self.analyzer.Analyze()
+
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "objdump failed to disassemble."
+ ):
+ checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
+ self.analyzer.Analyze()
+
+ @mock.patch("subprocess.check_output")
+ @mock.patch("stack_analyzer.ParseArgs")
+ def testMain(self, parseargs_mock, checkoutput_mock):
+ symbol_text = (
+ "1000 g F .text 0000015c .hidden hook_task\n"
+ "2000 g F .text 0000051c .hidden console_task\n"
+ )
+ rodata_text = (
+ "\n"
+ "Contents of section .rodata:\n"
+ " 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n"
+ )
+
+ args = mock.MagicMock(
+ elf_path="./ec.RW.elf",
+ export_taskinfo="fake",
+ section="RW",
+ objdump="objdump",
+ addr2line="addr2line",
+ annotation="fake",
+ )
+ parseargs_mock.return_value = args
+
+ with mock.patch("os.path.exists") as path_mock:
+ path_mock.return_value = False
+ with mock.patch("builtins.print") as print_mock:
+ with mock.patch("builtins.open", mock.mock_open()) as open_mock:
+ sa.main()
+ print_mock.assert_any_call(
+ "Warning: Annotation file fake does not exist."
+ )
+
+ with mock.patch("os.path.exists") as path_mock:
+ path_mock.return_value = True
+ with mock.patch("builtins.print") as print_mock:
+ with mock.patch("builtins.open", mock.mock_open()) as open_mock:
+ open_mock.side_effect = IOError()
+ sa.main()
+ print_mock.assert_called_once_with(
+ "Error: Failed to open annotation file fake."
+ )
+
+ with mock.patch("builtins.print") as print_mock:
+ with mock.patch("builtins.open", mock.mock_open()) as open_mock:
+ open_mock.return_value.read.side_effect = ["{", ""]
+ sa.main()
+ open_mock.assert_called_once_with("fake", "r")
+ print_mock.assert_called_once_with(
+ "Error: Failed to parse annotation file fake."
+ )
+
+ with mock.patch("builtins.print") as print_mock:
+ with mock.patch(
+ "builtins.open", mock.mock_open(read_data="")
+ ) as open_mock:
+ sa.main()
+ print_mock.assert_called_once_with(
+ "Error: Invalid annotation file fake."
+ )
+
+ args.annotation = None
+
+ with mock.patch("builtins.print") as print_mock:
+ checkoutput_mock.side_effect = [symbol_text, rodata_text]
+ sa.main()
+ print_mock.assert_called_once_with(
+ "Error: Failed to load export_taskinfo."
+ )
+
+ with mock.patch("builtins.print") as print_mock:
+ checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
+ sa.main()
+ print_mock.assert_called_once_with(
+ "Error: objdump failed to dump symbol table or rodata."
+ )
+
+ with mock.patch("builtins.print") as print_mock:
+ checkoutput_mock.side_effect = OSError()
+ sa.main()
+ print_mock.assert_called_once_with("Error: Failed to run objdump.")
+
+
+if __name__ == "__main__":
+ unittest.main()