summaryrefslogtreecommitdiff
path: root/extra
diff options
context:
space:
mode:
authorJeremy Bettis <jbettis@google.com>2022-07-14 14:19:14 -0600
committerChromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-07-26 19:43:40 +0000
commitf37d95b3edc6ff7bbda96fe7a4147273db66c306 (patch)
treeaaf408cfa525efce460cddb4153101b1db492109 /extra
parent160af3cb3634f645ef0ca1a3c312c68cc3316869 (diff)
downloadchrome-ec-f37d95b3edc6ff7bbda96fe7a4147273db66c306.tar.gz
ec: Switch black to 80 cols and reformat files
Add pyproject.toml config file to set black to 80 columns. Remove column length overrides from other config files. Reformat python files to 80 cols. BRANCH=None BUG=b:238434058 TEST=presubmit/CQ Signed-off-by: Jeremy Bettis <jbettis@google.com> Change-Id: I870a68f1bb751f4bad97024045f6e3075489e80f Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3764071 Commit-Queue: Jeremy Bettis <jbettis@chromium.org> Auto-Submit: Jeremy Bettis <jbettis@chromium.org> Tested-by: Jeremy Bettis <jbettis@chromium.org> Reviewed-by: Jack Rosenthal <jrosenth@chromium.org>
Diffstat (limited to 'extra')
-rwxr-xr-xextra/cr50_rma_open/cr50_rma_open.py41
-rwxr-xr-xextra/stack_analyzer/stack_analyzer.py180
-rwxr-xr-xextra/stack_analyzer/stack_analyzer_unittest.py113
-rw-r--r--extra/tigertool/ecusb/pty_driver.py12
-rw-r--r--extra/tigertool/ecusb/stm32uart.py5
-rw-r--r--extra/tigertool/ecusb/stm32usb.py18
-rw-r--r--extra/tigertool/ecusb/tiny_servo_common.py11
-rwxr-xr-xextra/tigertool/tigertest.py4
-rwxr-xr-xextra/tigertool/tigertool.py18
-rw-r--r--extra/usb_power/convert_power_log_board.py14
-rwxr-xr-xextra/usb_power/convert_servo_ina.py11
-rwxr-xr-xextra/usb_power/powerlog.py104
-rw-r--r--extra/usb_power/stats_manager.py32
-rw-r--r--extra/usb_power/stats_manager_unittest.py12
-rwxr-xr-xextra/usb_serial/console.py35
-rwxr-xr-xextra/usb_updater/fw_update.py27
-rwxr-xr-xextra/usb_updater/servo_updater.py39
17 files changed, 517 insertions, 159 deletions
diff --git a/extra/cr50_rma_open/cr50_rma_open.py b/extra/cr50_rma_open/cr50_rma_open.py
index 47cb7cd6e3..b2600a6454 100755
--- a/extra/cr50_rma_open/cr50_rma_open.py
+++ b/extra/cr50_rma_open/cr50_rma_open.py
@@ -357,7 +357,9 @@ class RMAOpen(object):
def _run_on_dut(self, command):
"""Run the command on the DUT."""
- return subprocess.check_output(["ssh", self.ip, command], encoding="utf-8")
+ return subprocess.check_output(
+ ["ssh", self.ip, command], encoding="utf-8"
+ )
def _open_in_dev_mode(self):
"""Open Cr50 when it's in dev mode"""
@@ -471,7 +473,9 @@ class RMAOpen(object):
rma_support,
)
if not self.is_prepvt and self._running_version_is_older(TESTLAB_PROD):
- raise ValueError("Update cr50. No testlab support in old prod images.")
+ raise ValueError(
+ "Update cr50. No testlab support in old prod images."
+ )
if self._running_version_is_older(rma_support):
raise ValueError(
"%s does not have RMA support. Update to at least %s"
@@ -565,7 +569,9 @@ class RMAOpen(object):
raise ValueError('Could not find usb device "%s"' % usb_serial)
return usb_serial
if len(serialnames) > 1:
- logging.info("Found Cr50 device serialnames %s", ", ".join(serialnames))
+ logging.info(
+ "Found Cr50 device serialnames %s", ", ".join(serialnames)
+ )
logging.warning(DEBUG_TOO_MANY_USB_DEVICES)
raise ValueError("Too many cr50 usb devices")
return serialnames[0]
@@ -605,7 +611,10 @@ def parse_args(argv):
help="Generate Cr50 challenge. Must be used with -i",
)
parser.add_argument(
- "-t", "--enable_testlab", action="store_true", help="enable testlab mode"
+ "-t",
+ "--enable_testlab",
+ action="store_true",
+ help="enable testlab mode",
)
parser.add_argument(
"-w", "--wp_disable", action="store_true", help="Disable write protect"
@@ -617,7 +626,11 @@ def parse_args(argv):
help="Check cr50 console connection works",
)
parser.add_argument(
- "-s", "--serialname", type=str, default="", help="The cr50 usb serialname"
+ "-s",
+ "--serialname",
+ type=str,
+ default="",
+ help="The cr50 usb serialname",
)
parser.add_argument(
"-D", "--debug", action="store_true", help="print debug messages"
@@ -647,7 +660,11 @@ def parse_args(argv):
"-P", "--servo_port", type=str, default="", help="the servo port"
)
parser.add_argument(
- "-I", "--ip", type=str, default="", help="The DUT IP. Necessary to do ccd open"
+ "-I",
+ "--ip",
+ type=str,
+ default="",
+ help="The DUT IP. Necessary to do ccd open",
)
return parser.parse_args(argv)
@@ -667,7 +684,9 @@ def main(argv):
tried_authcode = False
logging.info("Running cr50_rma_open version %s", SCRIPT_VERSION)
- cr50_rma_open = RMAOpen(opts.device, opts.serialname, opts.servo_port, opts.ip)
+ cr50_rma_open = RMAOpen(
+ opts.device, opts.serialname, opts.servo_port, opts.ip
+ )
if opts.check_connection:
sys.exit(0)
@@ -683,14 +702,18 @@ def main(argv):
cr50_rma_open.try_authcode(opts.authcode)
tried_authcode = True
- if not cr50_rma_open.check(WP_IS_DISABLED) and (tried_authcode or opts.wp_disable):
+ if not cr50_rma_open.check(WP_IS_DISABLED) and (
+ tried_authcode or opts.wp_disable
+ ):
if not cr50_rma_open.check(CCD_IS_UNRESTRICTED):
raise ValueError(
"Can't disable write protect unless ccd is "
"open. Run through the rma open process first"
)
if tried_authcode:
- logging.warning("RMA Open did not disable write protect. File a bug")
+ logging.warning(
+ "RMA Open did not disable write protect. File a bug"
+ )
logging.warning("Trying to disable it manually")
cr50_rma_open.wp_disable()
diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py
index 9feb2423c9..cd5ca29011 100755
--- a/extra/stack_analyzer/stack_analyzer.py
+++ b/extra/stack_analyzer/stack_analyzer.py
@@ -59,7 +59,9 @@ class Task(object):
routine_address: Resolved routine address. None if it hasn't been resolved.
"""
- def __init__(self, name, routine_name, stack_max_size, routine_address=None):
+ def __init__(
+ self, name, routine_name, stack_max_size, routine_address=None
+ ):
"""Constructor.
Args:
@@ -250,7 +252,9 @@ class Function(object):
if len(self.stack_max_path) != len(other.stack_max_path):
return False
- for self_func, other_func in zip(self.stack_max_path, other.stack_max_path):
+ for self_func, other_func in zip(
+ self.stack_max_path, other.stack_max_path
+ ):
# Assume the addresses of functions are unique.
if self_func.address != other_func.address:
return False
@@ -294,10 +298,14 @@ class AndesAnalyzer(object):
r"^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$".format(CONDITION_CODES_RE)
)
# Call instructions.
- CALL_OPCODE_RE = re.compile(r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$")
+ CALL_OPCODE_RE = re.compile(
+ r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$"
+ )
CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE))
# Ignore lp register because it's for return.
- INDIRECT_CALL_OPERAND_RE = re.compile(r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$")
+ INDIRECT_CALL_OPERAND_RE = re.compile(
+ r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$"
+ )
# TODO: Handle other kinds of store instructions.
PUSH_OPCODE_RE = re.compile(r"^push(\d{1,})$")
PUSH_OPERAND_RE = re.compile(r"^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}")
@@ -363,7 +371,10 @@ class AndesAnalyzer(object):
result = self.CALL_OPERAND_RE.match(operand_text)
if result is None:
- if self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None:
+ if (
+ self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
# Found an indirect call.
callsites.append(Callsite(address, None, is_tail))
@@ -378,7 +389,9 @@ class AndesAnalyzer(object):
< (function_symbol.address + function_symbol.size)
):
# Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
elif self.LWI_OPCODE_RE.match(opcode) is not None:
result = self.LWI_PC_OPERAND_RE.match(operand_text)
@@ -400,14 +413,25 @@ class AndesAnalyzer(object):
result = self.PUSH_OPERAND_RE.match(operand_text)
operandgroup_text = result.group(1)
# capture $rx~$ry
- if self.OPERANDGROUP_RE.match(operandgroup_text) is not None:
+ if (
+ self.OPERANDGROUP_RE.match(operandgroup_text)
+ is not None
+ ):
# capture number & transfer string to integer
oprandgrouphead = operandgroup_text.split(",")[0]
rx = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[0]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[0]
+ )
+ )
)
ry = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[1]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[1]
+ )
+ )
)
stack_frame += (
@@ -425,14 +449,25 @@ class AndesAnalyzer(object):
result = self.SMW_OPERAND_RE.match(operand_text)
operandgroup_text = result.group(3)
# capture $rx~$ry
- if self.OPERANDGROUP_RE.match(operandgroup_text) is not None:
+ if (
+ self.OPERANDGROUP_RE.match(operandgroup_text)
+ is not None
+ ):
# capture number & transfer string to integer
oprandgrouphead = operandgroup_text.split(",")[0]
rx = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[0]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[0]
+ )
+ )
)
ry = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[1]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[1]
+ )
+ )
)
stack_frame += (
@@ -482,9 +517,13 @@ class ArmAnalyzer(object):
# Fuzzy regular expressions for instruction and operand parsing.
# Branch instructions.
- JUMP_OPCODE_RE = re.compile(r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE))
+ JUMP_OPCODE_RE = re.compile(
+ r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE)
+ )
# Call instructions.
- CALL_OPCODE_RE = re.compile(r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE))
+ CALL_OPCODE_RE = re.compile(
+ r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE)
+ )
CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE))
CBZ_CBNZ_OPCODE_RE = re.compile(r"^(cbz|cbnz)(\.\w)?$")
# Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>"
@@ -555,7 +594,9 @@ class ArmAnalyzer(object):
for address, opcode, operand_text in instructions:
is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
- is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None
+ is_cbz_cbnz_opcode = (
+ self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None
+ )
if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode:
is_tail = is_jump_opcode or is_cbz_cbnz_opcode
@@ -586,7 +627,9 @@ class ArmAnalyzer(object):
< (function_symbol.address + function_symbol.size)
):
# Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
elif self.LDR_OPCODE_RE.match(opcode) is not None:
result = self.LDR_PC_OPERAND_RE.match(operand_text)
@@ -599,7 +642,8 @@ class ArmAnalyzer(object):
elif self.PUSH_OPCODE_RE.match(opcode) is not None:
# Example: "{r4, r5, r6, r7, lr}"
stack_frame += (
- len(operand_text.split(",")) * self.GENERAL_PURPOSE_REGISTER_SIZE
+ len(operand_text.split(","))
+ * self.GENERAL_PURPOSE_REGISTER_SIZE
)
elif self.SUB_OPCODE_RE.match(opcode) is not None:
result = self.SUB_OPERAND_RE.match(operand_text)
@@ -614,7 +658,11 @@ class ArmAnalyzer(object):
# Subtract and writeback to stack register.
# Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}"
# Get the text of pushed register list.
- unused_sp, unused_sep, parameter_text = operand_text.partition(",")
+ (
+ unused_sp,
+ unused_sep,
+ parameter_text,
+ ) = operand_text.partition(",")
stack_frame += (
len(parameter_text.split(","))
* self.GENERAL_PURPOSE_REGISTER_SIZE
@@ -716,13 +764,18 @@ class RiscvAnalyzer(object):
result = self.CALL_OPERAND_RE.match(operand_text)
if result is None:
- if self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None:
+ if (
+ self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
# Found an indirect call.
callsites.append(Callsite(address, None, is_tail))
else:
# Capture address form operand_text and then convert to string
- address_str = "".join(self.CAPTURE_ADDRESS.findall(operand_text))
+ address_str = "".join(
+ self.CAPTURE_ADDRESS.findall(operand_text)
+ )
# String to integer
target_address = int(address_str, 16)
# Filter out the in-function target (branches and in-function calls,
@@ -734,7 +787,9 @@ class RiscvAnalyzer(object):
< (function_symbol.address + function_symbol.size)
):
# Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
elif self.ADDI_OPCODE_RE.match(opcode) is not None:
# Example: sp,sp,-32
@@ -940,7 +995,9 @@ class StackAnalyzer(object):
instructions = []
# If symbol size exists, use it as a hint of function size.
if function_symbol.size > 0:
- function_end = function_symbol.address + function_symbol.size
+ function_end = (
+ function_symbol.address + function_symbol.size
+ )
else:
function_end = None
@@ -1193,7 +1250,10 @@ class StackAnalyzer(object):
else:
add_rules[src_sig].add(dst_sig)
- if "remove" in self.annotation and self.annotation["remove"] is not None:
+ if (
+ "remove" in self.annotation
+ and self.annotation["remove"] is not None
+ ):
for sigtxt_path in self.annotation["remove"]:
if isinstance(sigtxt_path, str):
# The path has only one vertex.
@@ -1232,7 +1292,9 @@ class StackAnalyzer(object):
# Append each signature of the current node to the all previous
# remove paths.
- sig_paths = [path + [sig] for path in sig_paths for sig in sig_set]
+ sig_paths = [
+ path + [sig] for path in sig_paths for sig in sig_set
+ ]
if not broken_flag:
# All signatures are normalized. The remove path has no error.
@@ -1282,7 +1344,9 @@ class StackAnalyzer(object):
signature_set.update(remove_sigs)
# Map signatures to functions.
- (signature_map, sig_error_map) = self.MapAnnotation(function_map, signature_set)
+ (signature_map, sig_error_map) = self.MapAnnotation(
+ function_map, signature_set
+ )
# Build the indirect callsite map indexed by callsite signature.
indirect_map = collections.defaultdict(set)
@@ -1326,7 +1390,9 @@ class StackAnalyzer(object):
# Assume the error is always the not found error. Since the signature
# found in indirect callsite map must be a full signature, it can't
# happen the ambiguous error.
- assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND
+ assert (
+ sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND
+ )
# Found in inline stack, remove the not found error.
del sig_error_map[src_sig]
@@ -1357,7 +1423,9 @@ class StackAnalyzer(object):
else:
# Append each function of the current signature to the all previous
# remove paths.
- remove_paths = [p + [f] for p in remove_paths for f in remove_funcs]
+ remove_paths = [
+ p + [f] for p in remove_paths for f in remove_funcs
+ ]
if skip_flag:
# Ignore the broken remove path.
@@ -1417,7 +1485,9 @@ class StackAnalyzer(object):
for src_func, dst_func in add_set:
# TODO(cheyuw): Support tailing call annotation.
- src_func.callsites.append(Callsite(None, dst_func.address, False, dst_func))
+ src_func.callsites.append(
+ Callsite(None, dst_func.address, False, dst_func)
+ )
# Delete simple remove paths.
remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2)
@@ -1431,7 +1501,10 @@ class StackAnalyzer(object):
) in remove_simple:
continue
- if callsite.target is None and callsite.address in eliminated_addrs:
+ if (
+ callsite.target is None
+ and callsite.address in eliminated_addrs
+ ):
continue
cleaned_callsites.append(callsite)
@@ -1542,7 +1615,9 @@ class StackAnalyzer(object):
elif callee_state in stacked_states:
# The state is shown in the stack. There is a cycle.
sub_stack_usage = 0
- scc_lowlink = min(scc_lowlink, scc_index_map[callee_state])
+ scc_lowlink = min(
+ scc_lowlink, scc_index_map[callee_state]
+ )
if callee_state == curr_state:
self_loop = True
@@ -1559,9 +1634,13 @@ class StackAnalyzer(object):
if callsite.is_tail:
# For tailing call, since the callee reuses the stack frame of the
# caller, choose the larger one directly.
- stack_usage = max(curr_func.stack_frame, sub_stack_usage)
+ stack_usage = max(
+ curr_func.stack_frame, sub_stack_usage
+ )
else:
- stack_usage = curr_func.stack_frame + sub_stack_usage
+ stack_usage = (
+ curr_func.stack_frame + sub_stack_usage
+ )
if stack_usage > max_stack_usage:
max_stack_usage = stack_usage
@@ -1664,12 +1743,16 @@ class StackAnalyzer(object):
(function_name, path, linenum) = line_info
line_texts.append(
- "{}[{}:{}]".format(function_name, os.path.relpath(path), linenum)
+ "{}[{}:{}]".format(
+ function_name, os.path.relpath(path), linenum
+ )
)
output = "{}-> {} {:x}\n".format(prefix, line_texts[0], address)
for depth, line_text in enumerate(line_texts[1:]):
- output += "{} {}- {}\n".format(prefix, " " * depth, line_text)
+ output += "{} {}- {}\n".format(
+ prefix, " " * depth, line_text
+ )
# Remove the last newline character.
return (order_key, output.rstrip("\n"))
@@ -1677,7 +1760,8 @@ class StackAnalyzer(object):
# Analyze disassembly.
try:
disasm_text = subprocess.check_output(
- [self.options.objdump, "-d", self.options.elf_path], encoding="utf-8"
+ [self.options.objdump, "-d", self.options.elf_path],
+ encoding="utf-8",
)
except subprocess.CalledProcessError:
raise StackAnalyzerError("objdump failed to disassemble.")
@@ -1773,7 +1857,11 @@ class StackAnalyzer(object):
if len(cycle_functions) > 0:
print("There are cycles in the following function sets:")
for functions in cycle_functions:
- print("[{}]".format(", ".join(function.name for function in functions)))
+ print(
+ "[{}]".format(
+ ", ".join(function.name for function in functions)
+ )
+ )
def ParseArgs():
@@ -1795,7 +1883,9 @@ def ParseArgs():
help="the section.",
choices=[SECTION_RO, SECTION_RW],
)
- parser.add_argument("--objdump", default="objdump", help="the path of objdump")
+ parser.add_argument(
+ "--objdump", default="objdump", help="the path of objdump"
+ )
parser.add_argument(
"--addr2line", default="addr2line", help="the path of addr2line"
)
@@ -1954,7 +2044,9 @@ def main():
annotation = {}
elif not os.path.exists(options.annotation):
print(
- "Warning: Annotation file {} does not exist.".format(options.annotation)
+ "Warning: Annotation file {} does not exist.".format(
+ options.annotation
+ )
)
annotation = {}
else:
@@ -1964,11 +2056,15 @@ def main():
except yaml.YAMLError:
raise StackAnalyzerError(
- "Failed to parse annotation file {}.".format(options.annotation)
+ "Failed to parse annotation file {}.".format(
+ options.annotation
+ )
)
except IOError:
raise StackAnalyzerError(
- "Failed to open annotation file {}.".format(options.annotation)
+ "Failed to open annotation file {}.".format(
+ options.annotation
+ )
)
# TODO(cheyuw): Do complete annotation format verification.
@@ -1987,7 +2083,9 @@ def main():
encoding="utf-8",
)
except subprocess.CalledProcessError:
- raise StackAnalyzerError("objdump failed to dump symbol table or rodata.")
+ raise StackAnalyzerError(
+ "objdump failed to dump symbol table or rodata."
+ )
except OSError:
raise StackAnalyzerError("Failed to run objdump.")
diff --git a/extra/stack_analyzer/stack_analyzer_unittest.py b/extra/stack_analyzer/stack_analyzer_unittest.py
index 228b7e010e..a40edaa5d8 100755
--- a/extra/stack_analyzer/stack_analyzer_unittest.py
+++ b/extra/stack_analyzer/stack_analyzer_unittest.py
@@ -70,7 +70,9 @@ class ArmAnalyzerTest(unittest.TestCase):
cbz_list = ["cbz", "cbnz", "cbz.n", "cbnz.n", "cbz.w", "cbnz.w"]
for opcode in cbz_list:
- self.assertIsNotNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode))
+ self.assertIsNotNone(
+ sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode)
+ )
self.assertIsNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match("cbn"))
@@ -199,7 +201,10 @@ class StackAnalyzerTest(unittest.TestCase):
" 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n"
)
rodata = sa.ParseRoDataText(rodata_text)
- expect_rodata = (0x20000, [0x0010ADDE, 0x00001000, 0x0020ADDE, 0x00002000])
+ expect_rodata = (
+ 0x20000,
+ [0x0010ADDE, 0x00001000, 0x0020ADDE, 0x00002000],
+ )
self.assertEqual(rodata, expect_rodata)
def testLoadTasklist(self):
@@ -246,13 +251,21 @@ class StackAnalyzerTest(unittest.TestCase):
def testResolveAnnotation(self):
self.analyzer.annotation = {}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(add_rules, {})
self.assertEqual(remove_rules, [])
self.assertEqual(invalid_sigtxts, set())
self.analyzer.annotation = {"add": None, "remove": None}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(add_rules, {})
self.assertEqual(remove_rules, [])
self.assertEqual(invalid_sigtxts, set())
@@ -264,7 +277,11 @@ class StackAnalyzerTest(unittest.TestCase):
[["a", "b[x:3]"], ["0", "1", "2"], "x"],
],
}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(add_rules, {})
self.assertEqual(
list.sort(remove_rules),
@@ -298,7 +315,11 @@ class StackAnalyzerTest(unittest.TestCase):
"touchpad_calc": [dict(name="__array", stride=8, offset=4)],
}
}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(
add_rules,
{
@@ -351,7 +372,11 @@ class StackAnalyzerTest(unittest.TestCase):
["inlined_mul", "inlined_mul_alias", "console_task"],
],
}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(invalid_sigtxts, {"touchpad?calc["})
signature_set = set()
@@ -362,7 +387,9 @@ class StackAnalyzerTest(unittest.TestCase):
for remove_sigs in remove_rules:
signature_set.update(remove_sigs)
- (signature_map, failed_sigs) = self.analyzer.MapAnnotation(funcs, signature_set)
+ (signature_map, failed_sigs) = self.analyzer.MapAnnotation(
+ funcs, signature_set
+ )
result = self.analyzer.ResolveAnnotation(funcs)
(add_set, remove_list, eliminated_addrs, failed_sigs) = result
@@ -416,7 +443,10 @@ class StackAnalyzerTest(unittest.TestCase):
("touchpad_calc", sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS),
("hook_task[q.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
("task_unk[a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
- ("touchpad_calc[x/a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
+ (
+ "touchpad_calc[x/a.c]",
+ sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND,
+ ),
("trackpad_range", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
},
)
@@ -427,7 +457,9 @@ class StackAnalyzerTest(unittest.TestCase):
0x2000: sa.Function(0x2000, "console_task", 0, []),
0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
}
- funcs[0x1000].callsites = [sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])]
+ funcs[0x1000].callsites = [
+ sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])
+ ]
funcs[0x2000].callsites = [
sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]),
sa.Callsite(0x2006, None, True, None),
@@ -495,7 +527,10 @@ class StackAnalyzerTest(unittest.TestCase):
)
function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
func_hook_task = sa.Function(
- 0x1000, "hook_task", 48, [sa.Callsite(0x1006, 0x100929DE, True, None)]
+ 0x1000,
+ "hook_task",
+ 48,
+ [sa.Callsite(0x1006, 0x100929DE, True, None)],
)
expect_funcmap = {
0x1000: func_hook_task,
@@ -537,7 +572,10 @@ class StackAnalyzerTest(unittest.TestCase):
)
function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
func_hook_task = sa.Function(
- 0x1000, "hook_task", 0, [sa.Callsite(0x1006, 0x100929DE, True, None)]
+ 0x1000,
+ "hook_task",
+ 0,
+ [sa.Callsite(0x1006, 0x100929DE, True, None)],
)
expect_funcmap = {
0x1000: func_hook_task,
@@ -585,11 +623,21 @@ class StackAnalyzerTest(unittest.TestCase):
sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]),
sa.Callsite(0x400A, 0x8000, False, funcs[0x8000]),
]
- funcs[0x5000].callsites = [sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])]
- funcs[0x7000].callsites = [sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])]
- funcs[0x8000].callsites = [sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])]
- funcs[0x9000].callsites = [sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])]
- funcs[0x10000].callsites = [sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])]
+ funcs[0x5000].callsites = [
+ sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])
+ ]
+ funcs[0x7000].callsites = [
+ sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])
+ ]
+ funcs[0x8000].callsites = [
+ sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])
+ ]
+ funcs[0x9000].callsites = [
+ sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])
+ ]
+ funcs[0x10000].callsites = [
+ sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])
+ ]
cycles = self.analyzer.AnalyzeCallGraph(
funcs,
@@ -643,7 +691,10 @@ class StackAnalyzerTest(unittest.TestCase):
0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]),
0x6000: (100, [funcs[0x6000]]),
0x7000: (24, [funcs[0x7000]]),
- 0x8000: (160, [funcs[0x8000], funcs[0x9000], funcs[0x4000], funcs[0x7000]]),
+ 0x8000: (
+ 160,
+ [funcs[0x8000], funcs[0x9000], funcs[0x4000], funcs[0x7000]],
+ ),
0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]),
0x10000: (
200,
@@ -688,11 +739,14 @@ class StackAnalyzerTest(unittest.TestCase):
[("fake_func", "/a.c", 1), ("bake_func", "/b.c", 2)],
)
checkoutput_mock.assert_called_once_with(
- ["addr2line", "-f", "-e", "./ec.RW.elf", "1234", "-i"], encoding="utf-8"
+ ["addr2line", "-f", "-e", "./ec.RW.elf", "1234", "-i"],
+ encoding="utf-8",
)
checkoutput_mock.reset_mock()
- checkoutput_mock.return_value = "fake_func\n/test.c:1 (discriminator 128)"
+ checkoutput_mock.return_value = (
+ "fake_func\n/test.c:1 (discriminator 128)"
+ )
self.assertEqual(
self.analyzer.AddressToLine(0x12345), [("fake_func", "/test.c", 1)]
)
@@ -703,7 +757,8 @@ class StackAnalyzerTest(unittest.TestCase):
checkoutput_mock.return_value = "??\n:?\nbake_func\n/b.c:2\n"
self.assertEqual(
- self.analyzer.AddressToLine(0x123456), [None, ("bake_func", "/b.c", 2)]
+ self.analyzer.AddressToLine(0x123456),
+ [None, ("bake_func", "/b.c", 2)],
)
checkoutput_mock.assert_called_once_with(
["addr2line", "-f", "-e", "./ec.RW.elf", "123456"], encoding="utf-8"
@@ -716,7 +771,9 @@ class StackAnalyzerTest(unittest.TestCase):
checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
self.analyzer.AddressToLine(0x5678)
- with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run addr2line."):
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run addr2line."
+ ):
checkoutput_mock.side_effect = OSError()
self.analyzer.AddressToLine(0x9012)
@@ -773,7 +830,9 @@ class StackAnalyzerTest(unittest.TestCase):
]
)
- with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run objdump."):
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run objdump."
+ ):
checkoutput_mock.side_effect = OSError()
self.analyzer.Analyze()
@@ -836,7 +895,9 @@ class StackAnalyzerTest(unittest.TestCase):
]
)
- with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run objdump."):
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run objdump."
+ ):
checkoutput_mock.side_effect = OSError()
self.analyzer.Analyze()
@@ -911,7 +972,9 @@ class StackAnalyzerTest(unittest.TestCase):
with mock.patch("builtins.print") as print_mock:
checkoutput_mock.side_effect = [symbol_text, rodata_text]
sa.main()
- print_mock.assert_called_once_with("Error: Failed to load export_taskinfo.")
+ print_mock.assert_called_once_with(
+ "Error: Failed to load export_taskinfo."
+ )
with mock.patch("builtins.print") as print_mock:
checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py
index c87378b94d..ec38e7dd0a 100644
--- a/extra/tigertool/ecusb/pty_driver.py
+++ b/extra/tigertool/ecusb/pty_driver.py
@@ -126,7 +126,9 @@ class ptyDriver(object):
"""
self._issue_cmd_get_results(cmds, [])
- def _issue_cmd_get_results(self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT):
+ def _issue_cmd_get_results(
+ self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT
+ ):
"""Send command to the device and wait for response.
This function waits for response message matching a regular
@@ -199,10 +201,14 @@ class ptyDriver(object):
try:
self._child.expect(regex, timeout=0.1)
match = self._child.match
- lastindex = match.lastindex if match and match.lastindex else 0
+ lastindex = (
+ match.lastindex if match and match.lastindex else 0
+ )
# Create a tuple which contains the entire matched string and all
# the subgroups of the match.
- result = match.group(*range(lastindex + 1)) if match else None
+ result = (
+ match.group(*range(lastindex + 1)) if match else None
+ )
if result:
result = tuple(res.decode("utf-8") for res in result)
result_list.append(result)
diff --git a/extra/tigertool/ecusb/stm32uart.py b/extra/tigertool/ecusb/stm32uart.py
index a6fa73970d..917ae8d7fd 100644
--- a/extra/tigertool/ecusb/stm32uart.py
+++ b/extra/tigertool/ecusb/stm32uart.py
@@ -66,7 +66,10 @@ class Suart(object):
self._tx_thread = None
self._debuglog = debuglog
self._susb = stm32usb.Susb(
- vendor=vendor, product=product, interface=interface, serialname=serialname
+ vendor=vendor,
+ product=product,
+ interface=interface,
+ serialname=serialname,
)
self._running = False
diff --git a/extra/tigertool/ecusb/stm32usb.py b/extra/tigertool/ecusb/stm32usb.py
index e8a0bc4f9b..b36fd5b3d7 100644
--- a/extra/tigertool/ecusb/stm32usb.py
+++ b/extra/tigertool/ecusb/stm32usb.py
@@ -35,7 +35,12 @@ class Susb(object):
TIMEOUT_MS = 100
def __init__(
- self, vendor=0x18D1, product=0x5027, interface=1, serialname=None, logger=None
+ self,
+ vendor=0x18D1,
+ product=0x5027,
+ interface=1,
+ serialname=None,
+ logger=None,
):
"""Susb constructor.
@@ -83,7 +88,8 @@ class Susb(object):
dev = dev_list[0]
except StopIteration:
raise SusbError(
- "USB device %04x:%04x not found" % (self._vendor, self._product)
+ "USB device %04x:%04x not found"
+ % (self._vendor, self._product)
)
# If we can't set configuration, it's already been set.
@@ -111,11 +117,15 @@ class Susb(object):
dev.detach_kernel_driver(intf.bInterfaceNumber)
read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
- read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
+ read_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=read_ep_number
+ )
self._read_ep = read_ep
write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
- write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
+ write_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=write_ep_number
+ )
self._write_ep = write_ep
def close(self):
diff --git a/extra/tigertool/ecusb/tiny_servo_common.py b/extra/tigertool/ecusb/tiny_servo_common.py
index 889abcc0a0..3f0a8ff46c 100644
--- a/extra/tigertool/ecusb/tiny_servo_common.py
+++ b/extra/tigertool/ecusb/tiny_servo_common.py
@@ -146,7 +146,9 @@ def wait_for_usb_remove(vidpid, serialname=None, timeout=None):
Wrapper for wait_for_usb below
"""
- wait_for_usb(vidpid, serialname=serialname, timeout=timeout, desiredpresence=False)
+ wait_for_usb(
+ vidpid, serialname=serialname, timeout=timeout, desiredpresence=False
+ )
def wait_for_usb(vidpid, serialname=None, timeout=None, desiredpresence=True):
@@ -167,7 +169,9 @@ def wait_for_usb(vidpid, serialname=None, timeout=None, desiredpresence=True):
time.sleep(0.1)
if timeout:
if datetime.datetime.now() > finish:
- raise TinyServoError("Timeout", "Timeout waiting for USB %s" % vidpid)
+ raise TinyServoError(
+ "Timeout", "Timeout waiting for USB %s" % vidpid
+ )
def do_serialno(serialno, pty):
@@ -198,7 +202,8 @@ def do_serialno(serialno, pty):
else:
log("Serial number set to %s but saved as %s." % (serialno, sn))
raise TinyServoError(
- "Serial Number", "Serial number set to %s but saved as %s." % (serialno, sn)
+ "Serial Number",
+ "Serial number set to %s but saved as %s." % (serialno, sn),
)
diff --git a/extra/tigertool/tigertest.py b/extra/tigertool/tigertest.py
index 924ec8c97c..bd748f9b34 100755
--- a/extra/tigertool/tigertest.py
+++ b/extra/tigertool/tigertest.py
@@ -76,7 +76,9 @@ def test_sequence():
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("-c", "--count", type=int, default=1, help="loops to run")
+ parser.add_argument(
+ "-c", "--count", type=int, default=1, help="loops to run"
+ )
opts = parser.parse_args(argv)
diff --git a/extra/tigertool/tigertool.py b/extra/tigertool/tigertool.py
index 241e7fe155..45dfec8286 100755
--- a/extra/tigertool/tigertool.py
+++ b/extra/tigertool/tigertool.py
@@ -202,7 +202,9 @@ def do_sysjump(region, pty, serialname):
"""
validregion = ["ro", "rw"]
if region not in validregion:
- c.log("Region setting %s invalid, try one of %s" % (region, validregion))
+ c.log(
+ "Region setting %s invalid, try one of %s" % (region, validregion)
+ )
return False
cmd = "sysjump %s" % region
@@ -232,7 +234,11 @@ def do_sysjump(region, pty, serialname):
def get_parser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
- "-s", "--serialno", type=str, default=None, help="serial number of board to use"
+ "-s",
+ "--serialno",
+ type=str,
+ default=None,
+ help="serial number of board to use",
)
parser.add_argument(
"-b",
@@ -253,9 +259,13 @@ def get_parser():
action="store_true",
help="check serial number set on the board.",
)
- group.add_argument("-m", "--mux", type=str, default=None, help="mux selection")
+ group.add_argument(
+ "-m", "--mux", type=str, default=None, help="mux selection"
+ )
group.add_argument("-p", "--power", action="store_true", help="check VBUS")
- group.add_argument("-l", "--powerlog", type=int, default=None, help="log VBUS")
+ group.add_argument(
+ "-l", "--powerlog", type=int, default=None, help="log VBUS"
+ )
group.add_argument(
"-r", "--sysjump", type=str, default=None, help="region selection"
)
diff --git a/extra/usb_power/convert_power_log_board.py b/extra/usb_power/convert_power_log_board.py
index 159b4621d7..a555ee30ea 100644
--- a/extra/usb_power/convert_power_log_board.py
+++ b/extra/usb_power/convert_power_log_board.py
@@ -55,11 +55,15 @@ def write_to_file(file, sweetberry, inas):
# EX : ('sweetberry', 0x40, 'SB_FW_CAM_2P8', 5.0, 1.000, 3, False),
channel, i2c_addr = Spower.CHMAP[rec["channel"]]
- record = " ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')" ",\n" % (
- i2c_addr,
- rec["name"],
- rec["rs"],
- channel,
+ record = (
+ " ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')"
+ ",\n"
+ % (
+ i2c_addr,
+ rec["name"],
+ rec["rs"],
+ channel,
+ )
)
pyfile.write(record)
diff --git a/extra/usb_power/convert_servo_ina.py b/extra/usb_power/convert_servo_ina.py
index c18b7368dc..f3840ddc7c 100755
--- a/extra/usb_power/convert_servo_ina.py
+++ b/extra/usb_power/convert_servo_ina.py
@@ -61,10 +61,13 @@ def main(argv):
boardfile.write(",\n")
scenario.write(",\n")
- record = ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}' % (
- rec[2],
- rec[4],
- rec[1] - 64,
+ record = (
+ ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}'
+ % (
+ rec[2],
+ rec[4],
+ rec[1] - 64,
+ )
)
boardfile.write(record)
scenario.write('"%s"' % rec[2])
diff --git a/extra/usb_power/powerlog.py b/extra/usb_power/powerlog.py
index 44dd6ec12f..3649c9f411 100755
--- a/extra/usb_power/powerlog.py
+++ b/extra/usb_power/powerlog.py
@@ -27,11 +27,17 @@ import usb # pylint:disable=import-error
from stats_manager import StatsManager # pylint:disable=import-error
# Directory where hdctools installs configuration files into.
-LIB_DIR = os.path.join(sysconfig.get_python_lib(standard_lib=False), "servo", "data")
+LIB_DIR = os.path.join(
+ sysconfig.get_python_lib(standard_lib=False), "servo", "data"
+)
# Potential config file locations: current working directory, the same directory
# as powerlog.py file or LIB_DIR.
-CONFIG_LOCATIONS = [os.getcwd(), os.path.dirname(os.path.realpath(__file__)), LIB_DIR]
+CONFIG_LOCATIONS = [
+ os.getcwd(),
+ os.path.dirname(os.path.realpath(__file__)),
+ LIB_DIR,
+]
def logoutput(msg):
@@ -176,7 +182,9 @@ class Spower(object):
dev = d
break
if dev is None:
- raise Exception("Power", "USB device(%s) not found" % serialname)
+ raise Exception(
+ "Power", "USB device(%s) not found" % serialname
+ )
else:
dev = dev_list[0]
@@ -202,7 +210,9 @@ class Spower(object):
read_ep = usb.util.find_descriptor(
intf,
# match the first IN endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_IN,
)
@@ -212,7 +222,9 @@ class Spower(object):
write_ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_OUT,
)
@@ -227,7 +239,9 @@ class Spower(object):
"""Clear INA description struct."""
self._inas = []
- def append_ina_struct(self, name, rs, port, addr, data=None, ina_type=INA_POWER):
+ def append_ina_struct(
+ self, name, rs, port, addr, data=None, ina_type=INA_POWER
+ ):
"""Add an INA descriptor into the list of active INAs.
Args:
@@ -313,7 +327,9 @@ class Spower(object):
"""Clear pending reads on the stm32"""
try:
while True:
- ret = self.wr_command(b"", read_count=512, rtimeout=100, wtimeout=50)
+ ret = self.wr_command(
+ b"", read_count=512, rtimeout=100, wtimeout=50
+ )
self._logger.debug(
"Try Clear: read %s", "success" if ret == 0 else "failure"
)
@@ -324,7 +340,9 @@ class Spower(object):
"""Reset the power interface on the stm32"""
cmd = struct.pack("<H", self.CMD_RESET)
ret = self.wr_command(cmd, rtimeout=50, wtimeout=50)
- self._logger.debug("Command RESET: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command RESET: %s", "success" if ret == 0 else "failure"
+ )
def reset(self):
"""Try resetting the USB interface until success.
@@ -343,7 +361,9 @@ class Spower(object):
except Exception as e:
self.clear()
self.clear()
- self._logger.debug("TRY %d of %d: %s", count, max_reset_retry, e)
+ self._logger.debug(
+ "TRY %d of %d: %s", count, max_reset_retry, e
+ )
time.sleep(count * 0.01)
raise Exception("Power", "Failed to reset")
@@ -351,7 +371,9 @@ class Spower(object):
"""Stop any active data acquisition."""
cmd = struct.pack("<H", self.CMD_STOP)
ret = self.wr_command(cmd)
- self._logger.debug("Command STOP: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command STOP: %s", "success" if ret == 0 else "failure"
+ )
def start(self, integration_us):
"""Start data acquisition.
@@ -420,7 +442,9 @@ class Spower(object):
cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us)
ret = self.wr_command(cmd)
- self._logger.debug("Command SETTIME: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command SETTIME: %s", "success" if ret == 0 else "failure"
+ )
def add_ina(self, bus, ina_type, addr, extra, resistance, data=None):
"""Add an INA to the data acquisition list.
@@ -445,7 +469,9 @@ class Spower(object):
self.append_ina_struct(
name, resistance, bus, addr, data=data, ina_type=ina_type
)
- self._logger.debug("Command ADD_INA: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command ADD_INA: %s", "success" if ret == 0 else "failure"
+ )
def report_header_size(self):
"""Helper function to calculate power record header size."""
@@ -481,13 +507,17 @@ class Spower(object):
if len(bytesread) == 1:
if bytesread[0] != 0x6:
self._logger.debug(
- "READ LINE FAILED bytes: %d ret: %02x", len(bytesread), bytesread[0]
+ "READ LINE FAILED bytes: %d ret: %02x",
+ len(bytesread),
+ bytesread[0],
)
return None
if len(bytesread) % expected_bytes != 0:
self._logger.debug(
- "READ LINE WARNING: expected %d, got %d", expected_bytes, len(bytesread)
+ "READ LINE WARNING: expected %d, got %d",
+ expected_bytes,
+ len(bytesread),
)
packet_count = len(bytesread) // expected_bytes
@@ -694,7 +724,8 @@ class powerlog(object):
raise Exception(
"Invalid INA type",
"Type of %s [%s] not recognized,"
- " try one of POWER, BUSV, CURRENT" % (entry[0], entry[1]),
+ " try one of POWER, BUSV, CURRENT"
+ % (entry[0], entry[1]),
)
else:
name = entry
@@ -774,7 +805,9 @@ class powerlog(object):
aggregate_record["boards"].add(record["berry"])
else:
self._logger.info(
- "break %s, %s", record["berry"], aggregate_record["boards"]
+ "break %s, %s",
+ record["berry"],
+ aggregate_record["boards"],
)
break
@@ -784,7 +817,10 @@ class powerlog(object):
if name in aggregate_record:
multiplier = (
0.001
- if (self._use_mW and name[1] == Spower.INA_POWER)
+ if (
+ self._use_mW
+ and name[1] == Spower.INA_POWER
+ )
else 1
)
value = aggregate_record[name] * multiplier
@@ -826,7 +862,9 @@ def main(argv=None):
if argv is None:
argv = sys.argv[1:]
# Command line argument description.
- parser = argparse.ArgumentParser(description="Gather CSV data from sweetberry")
+ parser = argparse.ArgumentParser(
+ description="Gather CSV data from sweetberry"
+ )
parser.add_argument(
"-b",
"--board",
@@ -842,10 +880,18 @@ def main(argv=None):
default="",
)
parser.add_argument(
- "-A", "--serial", type=str, help="Serial number of sweetberry A", default=""
+ "-A",
+ "--serial",
+ type=str,
+ help="Serial number of sweetberry A",
+ default="",
)
parser.add_argument(
- "-B", "--serial_b", type=str, help="Serial number of sweetberry B", default=""
+ "-B",
+ "--serial_b",
+ type=str,
+ help="Serial number of sweetberry B",
+ default="",
)
parser.add_argument(
"-t",
@@ -855,7 +901,11 @@ def main(argv=None):
default=100000,
)
parser.add_argument(
- "-s", "--seconds", type=float, help="Seconds to run capture", default=0.0
+ "-s",
+ "--seconds",
+ type=float,
+ help="Seconds to run capture",
+ default=0.0,
)
parser.add_argument(
"--date",
@@ -876,7 +926,10 @@ def main(argv=None):
action="store_true",
)
parser.add_argument(
- "--slow", default=False, help="Intentionally overflow", action="store_true"
+ "--slow",
+ default=False,
+ help="Intentionally overflow",
+ action="store_true",
)
parser.add_argument(
"--print_stats",
@@ -917,7 +970,8 @@ def main(argv=None):
dest="print_raw_data",
default=True,
action="store_false",
- help="Not print raw sweetberry readings at real time, default is to " "print",
+ help="Not print raw sweetberry readings at real time, default is to "
+ "print",
)
parser.add_argument(
"--save_raw_data",
@@ -952,7 +1006,9 @@ def main(argv=None):
# if powerlog is used through main, log to sys.stdout
if __name__ == "__main__":
stdout_handler = logging.StreamHandler(sys.stdout)
- stdout_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
+ stdout_handler.setFormatter(
+ logging.Formatter("%(levelname)s: %(message)s")
+ )
root_logger.addHandler(stdout_handler)
integration_us_request = args.integration_us
diff --git a/extra/usb_power/stats_manager.py b/extra/usb_power/stats_manager.py
index 0f92916251..d200d66af7 100644
--- a/extra/usb_power/stats_manager.py
+++ b/extra/usb_power/stats_manager.py
@@ -84,7 +84,9 @@ class StatsManager(object):
"""
# pylint: disable=W0102
- def __init__(self, smid="", title="", order=[], hide_domains=[], accept_nan=True):
+ def __init__(
+ self, smid="", title="", order=[], hide_domains=[], accept_nan=True
+ ):
"""Initialize infrastructure for data and their statistics."""
self._title = title
self._data = collections.defaultdict(list)
@@ -112,11 +114,15 @@ class StatsManager(object):
except ValueError:
# if we don't accept nan this will be caught below
self._logger.debug(
- "sample %s for domain %s is not a number. Making NaN", sample, domain
+ "sample %s for domain %s is not a number. Making NaN",
+ sample,
+ domain,
)
sample = float("NaN")
if not self._accept_nan and math.isnan(sample):
- raise StatsManagerError("accept_nan is false. Cannot add NaN sample.")
+ raise StatsManagerError(
+ "accept_nan is false. Cannot add NaN sample."
+ )
self._data[domain].append(sample)
if math.isnan(sample):
self._nan_domains.add(domain)
@@ -133,7 +139,8 @@ class StatsManager(object):
"""
if domain in self._unit:
self._logger.warning(
- "overwriting the unit of %s, old unit is %s, new " "unit is %s.",
+ "overwriting the unit of %s, old unit is %s, new "
+ "unit is %s.",
domain,
self._unit[domain],
unit,
@@ -179,7 +186,9 @@ class StatsManager(object):
table = [headers]
# determine what domains to display & and the order
domains_to_display = self.DomainsToDisplay
- display_order = [key for key in self._order if key in domains_to_display]
+ display_order = [
+ key for key in self._order if key in domains_to_display
+ ]
domains_to_display -= set(display_order)
display_order.extend(sorted(domains_to_display))
for domain in display_order:
@@ -259,8 +268,13 @@ class StatsManager(object):
# line is a seperator line consisting of -----
line = "%s%s" % (prefix, "-" * (line_length - dec_length))
# prepend the prefix to the centered title
- padded_title = "%s%s" % (prefix, title.center(line_length)[dec_length:])
- formatted_lines = [line, padded_title, line] + formatted_lines + [line]
+ padded_title = "%s%s" % (
+ prefix,
+ title.center(line_length)[dec_length:],
+ )
+ formatted_lines = (
+ [line, padded_title, line] + formatted_lines + [line]
+ )
formatted_output = "\n".join(formatted_lines)
return formatted_output
@@ -399,7 +413,9 @@ class StatsManager(object):
for domain, data in self._data.items():
if not domain.endswith(self._unit[domain]):
domain = "%s_%s" % (domain, self._unit[domain])
- fname = self._MakeUniqueFName(os.path.join(dirname, "%s.txt" % domain))
+ fname = self._MakeUniqueFName(
+ os.path.join(dirname, "%s.txt" % domain)
+ )
with open(fname, "w") as f:
f.write("\n".join("%.2f" % sample for sample in data) + "\n")
fnames.append(fname)
diff --git a/extra/usb_power/stats_manager_unittest.py b/extra/usb_power/stats_manager_unittest.py
index 2a7317546c..6c2d4be771 100644
--- a/extra/usb_power/stats_manager_unittest.py
+++ b/extra/usb_power/stats_manager_unittest.py
@@ -228,7 +228,9 @@ class TestStatsManager(unittest.TestCase):
"""Order passed into StatsManager is honoured when formatting summary."""
# StatsManager that should print D & B first, and the subsequent elements
# are sorted.
- d_b_a_c_regexp = re.compile("D-domain.*B-domain.*A-domain.*C-domain", re.DOTALL)
+ d_b_a_c_regexp = re.compile(
+ "D-domain.*B-domain.*A-domain.*C-domain", re.DOTALL
+ )
data = stats_manager.StatsManager(order=["D-domain", "B-domain"])
data.AddSample("A-domain", 17)
data.AddSample("B-domain", 17)
@@ -255,7 +257,9 @@ class TestStatsManager(unittest.TestCase):
# Assert the reported fname is the same as the expected fname
self.assertEqual(expected_fname, fname)
# Assert only the reported fname is output (in the tempdir)
- self.assertEqual(set([os.path.basename(fname)]), set(os.listdir(self.tempdir)))
+ self.assertEqual(
+ set([os.path.basename(fname)]), set(os.listdir(self.tempdir))
+ )
with open(fname, "r") as f:
self.assertEqual(
"@@ NAME COUNT MEAN STDDEV MAX MIN\n",
@@ -287,7 +291,9 @@ class TestStatsManager(unittest.TestCase):
# Assert the reported fname is the same as the expected fname
self.assertEqual(expected_fname, fname)
# Assert only the reported fname is output (in the tempdir)
- self.assertEqual(set([os.path.basename(fname)]), set(os.listdir(self.tempdir)))
+ self.assertEqual(
+ set([os.path.basename(fname)]), set(os.listdir(self.tempdir))
+ )
with open(fname, "r") as f:
summary = json.load(f)
self.assertAlmostEqual(100000.0, summary["A"]["mean"])
diff --git a/extra/usb_serial/console.py b/extra/usb_serial/console.py
index 839330d86a..49b731ad42 100755
--- a/extra/usb_serial/console.py
+++ b/extra/usb_serial/console.py
@@ -71,7 +71,9 @@ class Susb:
WRITE_ENDPOINT = 0x1
TIMEOUT_MS = 100
- def __init__(self, vendor=0x18D1, product=0x500F, interface=1, serialname=None):
+ def __init__(
+ self, vendor=0x18D1, product=0x500F, interface=1, serialname=None
+ ):
"""Susb constructor.
Discovers and connects to USB endpoints.
@@ -109,7 +111,9 @@ class Susb:
try:
dev = dev_list[0]
except:
- raise SusbError("USB device %04x:%04x not found" % (vendor, product))
+ raise SusbError(
+ "USB device %04x:%04x not found" % (vendor, product)
+ )
# If we can't set configuration, it's already been set.
try:
@@ -130,11 +134,15 @@ class Susb:
dev.detach_kernel_driver(intf.bInterfaceNumber)
read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
- read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
+ read_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=read_ep_number
+ )
self._read_ep = read_ep
write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
- write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
+ write_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=write_ep_number
+ )
self._write_ep = write_ep
@@ -163,7 +171,9 @@ class SuartError(Exception):
class Suart:
"""Provide interface to serial usb endpoint."""
- def __init__(self, vendor=0x18D1, product=0x501C, interface=0, serialname=None):
+ def __init__(
+ self, vendor=0x18D1, product=0x501C, interface=0, serialname=None
+ ):
"""Suart contstructor.
Initializes USB stream interface.
@@ -179,7 +189,10 @@ class Suart:
"""
self._done = threading.Event()
self._susb = Susb(
- vendor=vendor, product=product, interface=interface, serialname=serialname
+ vendor=vendor,
+ product=product,
+ interface=interface,
+ serialname=serialname,
)
def wait_until_done(self, timeout=None):
@@ -239,7 +252,11 @@ class Suart:
parser = argparse.ArgumentParser(description="Open a console to a USB device")
parser.add_argument(
- "-d", "--device", type=str, help="vid:pid of target device", default="18d1:501c"
+ "-d",
+ "--device",
+ type=str,
+ help="vid:pid of target device",
+ default="18d1:501c",
)
parser.add_argument(
"-i", "--interface", type=int, help="interface number of console", default=0
@@ -272,7 +289,9 @@ def runconsole():
serialno = args.serialno
interface = args.interface
- sobj = Suart(vendor=vid, product=pid, interface=interface, serialname=serialno)
+ sobj = Suart(
+ vendor=vid, product=pid, interface=interface, serialname=serialno
+ )
if sys.stdin.isatty():
tty.setraw(sys.stdin.fileno())
sobj.run()
diff --git a/extra/usb_updater/fw_update.py b/extra/usb_updater/fw_update.py
index 8658d92c48..ca4e0e2e94 100755
--- a/extra/usb_updater/fw_update.py
+++ b/extra/usb_updater/fw_update.py
@@ -113,7 +113,9 @@ class Supdate(object):
read_ep = usb.util.find_descriptor(
intf,
# match the first IN endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_IN,
)
@@ -123,7 +125,9 @@ class Supdate(object):
write_ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_OUT,
)
@@ -267,7 +271,9 @@ class Supdate(object):
result = struct.unpack("<I", read)
result = result[0]
if result != 0:
- raise Exception("Update", "Upload failed with rc: 0x%x" % result)
+ raise Exception(
+ "Update", "Upload failed with rc: 0x%x" % result
+ )
def start(self):
"""Start a transaction and erase currently inactive region.
@@ -297,7 +303,9 @@ class Supdate(object):
log("Update protocol v. %d" % version)
log("Available flash region base: %x" % base)
else:
- raise Exception("Update", "Start command returned %d bytes" % len(read))
+ raise Exception(
+ "Update", "Start command returned %d bytes" % len(read)
+ )
if base < 256:
raise Exception("Update", "Start returned error code 0x%x" % base)
@@ -380,7 +388,8 @@ class Supdate(object):
if self._filesize != self._flashsize:
raise Exception(
"Update",
- "Flash size 0x%x != file size 0x%x" % (self._flashsize, self._filesize),
+ "Flash size 0x%x != file size 0x%x"
+ % (self._flashsize, self._filesize),
)
@@ -396,9 +405,13 @@ parser.add_argument(
parser.add_argument(
"-f", "--file", type=str, help="Complete ec.bin file", default="ec.bin"
)
-parser.add_argument("-s", "--serial", type=str, help="Serial number", default="")
+parser.add_argument(
+ "-s", "--serial", type=str, help="Serial number", default=""
+)
parser.add_argument("-l", "--list", action="store_true", help="List regions")
-parser.add_argument("-v", "--verbose", action="store_true", help="Chatty output")
+parser.add_argument(
+ "-v", "--verbose", action="store_true", help="Chatty output"
+)
def main():
diff --git a/extra/usb_updater/servo_updater.py b/extra/usb_updater/servo_updater.py
index 7fa4608289..b5c23a9ad1 100755
--- a/extra/usb_updater/servo_updater.py
+++ b/extra/usb_updater/servo_updater.py
@@ -97,7 +97,9 @@ def do_with_retries(func, *args):
time.sleep(RETRIES_DELAY)
continue
- raise Exception("'{}' failed after {} retries".format(func.__name__, RETRIES_COUNT))
+ raise Exception(
+ "'{}' failed after {} retries".format(func.__name__, RETRIES_COUNT)
+ )
def flash(brdfile, serialno, binfile):
@@ -144,7 +146,9 @@ def flash2(vidpid, serialno, binfile):
print(cmd)
help_cmd = "%s --help" % tool
with open("/dev/null") as devnull:
- valid_check = subprocess.call(help_cmd.split(), stdout=devnull, stderr=devnull)
+ valid_check = subprocess.call(
+ help_cmd.split(), stdout=devnull, stderr=devnull
+ )
if valid_check:
raise ServoUpdaterException(
"%s exit with res = %d. Make sure the tool "
@@ -236,7 +240,9 @@ def do_updater_version(tinys):
return 2
else:
return 6
- raise ServoUpdaterException("Can't determine updater target from vers: [%s]" % vers)
+ raise ServoUpdaterException(
+ "Can't determine updater target from vers: [%s]" % vers
+ )
def _extract_version(boardname, binfile):
@@ -260,7 +266,9 @@ def _extract_version(boardname, binfile):
if m:
newvers = m.group(0).strip(" \t\r\n\0")
else:
- raise ServoUpdaterException("Can't find version from file: %s." % binfile)
+ raise ServoUpdaterException(
+ "Can't find version from file: %s." % binfile
+ )
return newvers
@@ -305,7 +313,9 @@ def get_files_and_version(cname, fname=None, channel=DEFAULT_CHANNEL):
if os.path.exists(updater_path):
break
else:
- raise ServoUpdaterException("servo_updater/ dir not found in known spots.")
+ raise ServoUpdaterException(
+ "servo_updater/ dir not found in known spots."
+ )
firmware_path = os.path.join(updater_path, FIRMWARE_DIR)
configs_path = os.path.join(updater_path, CONFIGS_DIR)
@@ -338,7 +348,9 @@ def get_files_and_version(cname, fname=None, channel=DEFAULT_CHANNEL):
if os.path.isfile(newname):
fname = newname
else:
- raise ServoUpdaterException("Can't find firmware binary: %s." % binary_file)
+ raise ServoUpdaterException(
+ "Can't find firmware binary: %s." % binary_file
+ )
elif not os.path.isfile(fname):
# If a name is specified but not found, try the default path.
newname = os.path.join(firmware_path, fname)
@@ -365,7 +377,11 @@ def main():
help="only print available firmware for board/channel",
)
parser.add_argument(
- "-s", "--serialno", type=str, help="serial number to program", default=None
+ "-s",
+ "--serialno",
+ type=str,
+ help="serial number to program",
+ default=None,
)
parser.add_argument(
"-b",
@@ -392,9 +408,14 @@ def main():
help="Update even if version match",
default=False,
)
- parser.add_argument("-v", "--verbose", action="store_true", help="Chatty output")
parser.add_argument(
- "-r", "--reboot", action="store_true", help="Always reboot, even after probe."
+ "-v", "--verbose", action="store_true", help="Chatty output"
+ )
+ parser.add_argument(
+ "-r",
+ "--reboot",
+ action="store_true",
+ help="Always reboot, even after probe.",
)
args = parser.parse_args()