summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeremy Bettis <jbettis@google.com>2022-07-14 14:19:14 -0600
committerChromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-07-26 19:43:40 +0000
commitf37d95b3edc6ff7bbda96fe7a4147273db66c306 (patch)
treeaaf408cfa525efce460cddb4153101b1db492109
parent160af3cb3634f645ef0ca1a3c312c68cc3316869 (diff)
downloadchrome-ec-f37d95b3edc6ff7bbda96fe7a4147273db66c306.tar.gz
ec: Switch black to 80 cols and reformat files
Add pyproject.toml config file to set black to 80 columns. Remove column length overrides from other config files. Reformat python files to 80 cols. BRANCH=None BUG=b:238434058 TEST=presubmit/CQ Signed-off-by: Jeremy Bettis <jbettis@google.com> Change-Id: I870a68f1bb751f4bad97024045f6e3075489e80f Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3764071 Commit-Queue: Jeremy Bettis <jbettis@chromium.org> Auto-Submit: Jeremy Bettis <jbettis@chromium.org> Tested-by: Jeremy Bettis <jbettis@chromium.org> Reviewed-by: Jack Rosenthal <jrosenth@chromium.org>
-rw-r--r--.flake81
-rw-r--r--.vscode/settings.json.default5
-rwxr-xr-xchip/ish/util/pack_ec.py10
-rwxr-xr-xchip/mchp/util/pack_ec.py45
-rwxr-xr-xchip/mchp/util/pack_ec_mec152x.py55
-rwxr-xr-xchip/mchp/util/pack_ec_mec172x.py67
-rwxr-xr-xchip/mec1322/util/pack_ec.py11
-rw-r--r--cts/common/board.py15
-rwxr-xr-xcts/cts.py37
-rwxr-xr-xextra/cr50_rma_open/cr50_rma_open.py41
-rwxr-xr-xextra/stack_analyzer/stack_analyzer.py180
-rwxr-xr-xextra/stack_analyzer/stack_analyzer_unittest.py113
-rw-r--r--extra/tigertool/ecusb/pty_driver.py12
-rw-r--r--extra/tigertool/ecusb/stm32uart.py5
-rw-r--r--extra/tigertool/ecusb/stm32usb.py18
-rw-r--r--extra/tigertool/ecusb/tiny_servo_common.py11
-rwxr-xr-xextra/tigertool/tigertest.py4
-rwxr-xr-xextra/tigertool/tigertool.py18
-rw-r--r--extra/usb_power/convert_power_log_board.py14
-rwxr-xr-xextra/usb_power/convert_servo_ina.py11
-rwxr-xr-xextra/usb_power/powerlog.py104
-rw-r--r--extra/usb_power/stats_manager.py32
-rw-r--r--extra/usb_power/stats_manager_unittest.py12
-rwxr-xr-xextra/usb_serial/console.py35
-rwxr-xr-xextra/usb_updater/fw_update.py27
-rwxr-xr-xextra/usb_updater/servo_updater.py39
-rwxr-xr-xfirmware_builder.py12
-rw-r--r--pylintrc1
-rw-r--r--pyproject.toml2
-rwxr-xr-xtest/run_device_tests.py84
-rw-r--r--test/timer_calib.py16
-rwxr-xr-xutil/build_with_clang.py7
-rwxr-xr-xutil/config_option_check.py12
-rwxr-xr-xutil/ec3po/console.py98
-rwxr-xr-xutil/ec3po/console_unittest.py18
-rw-r--r--util/ec3po/interpreter.py20
-rwxr-xr-xutil/ec3po/interpreter_unittest.py44
-rwxr-xr-xutil/ec_openocd.py13
-rwxr-xr-xutil/flash_jlink.py12
-rwxr-xr-xutil/fptool.py4
-rwxr-xr-xutil/inject-keys.py7
-rwxr-xr-xutil/kconfig_check.py39
-rw-r--r--util/run_ects.py5
-rw-r--r--util/test_kconfig_check.py15
-rwxr-xr-xutil/twister_launcher.py4
-rwxr-xr-xutil/uart_stress_tester.py37
-rwxr-xr-xutil/update_release_branch.py21
-rwxr-xr-xutil/zephyr_to_resultdb.py4
-rwxr-xr-xzephyr/firmware_builder.py26
-rw-r--r--zephyr/projects/brya/BUILD.py4
-rw-r--r--zephyr/projects/herobrine/BUILD.py4
-rw-r--r--zephyr/test/math/BUILD.py3
-rw-r--r--zephyr/zmake/tests/test_build_config.py13
-rw-r--r--zephyr/zmake/tests/test_modules.py4
-rw-r--r--zephyr/zmake/tests/test_multiproc_logging.py20
-rw-r--r--zephyr/zmake/tests/test_project.py12
-rw-r--r--zephyr/zmake/tests/test_toolchains.py8
-rw-r--r--zephyr/zmake/tests/test_version.py12
-rw-r--r--zephyr/zmake/tests/test_zmake.py25
-rwxr-xr-xzephyr/zmake/zephyr_build_tools/generate_ec_version.py16
-rw-r--r--zephyr/zmake/zmake/__main__.py8
-rw-r--r--zephyr/zmake/zmake/build_config.py14
-rw-r--r--zephyr/zmake/zmake/configlib.py8
-rw-r--r--zephyr/zmake/zmake/generate_readme.py8
-rw-r--r--zephyr/zmake/zmake/multiproc.py4
-rw-r--r--zephyr/zmake/zmake/output_packers.py36
-rw-r--r--zephyr/zmake/zmake/project.py28
-rw-r--r--zephyr/zmake/zmake/zmake.py87
68 files changed, 1328 insertions, 409 deletions
diff --git a/.flake8 b/.flake8
index 0a0a9c29ab..f5d6acc820 100644
--- a/.flake8
+++ b/.flake8
@@ -1,5 +1,4 @@
[flake8]
-max-line-length = 88
extend-ignore = E203
exclude =
.hypothesis,
diff --git a/.vscode/settings.json.default b/.vscode/settings.json.default
index 100ef251d2..2c9497413a 100644
--- a/.vscode/settings.json.default
+++ b/.vscode/settings.json.default
@@ -14,10 +14,7 @@
},
"[python]": {
"editor.insertSpaces": true,
- "editor.tabSize": 2
- "editor.rulers": [
- 88
- ],
+ "editor.tabSize": 4
},
"[shellscript]": {
"editor.insertSpaces": true,
diff --git a/chip/ish/util/pack_ec.py b/chip/ish/util/pack_ec.py
index e7bb0ce74b..3f55d61a1e 100755
--- a/chip/ish/util/pack_ec.py
+++ b/chip/ish/util/pack_ec.py
@@ -79,7 +79,9 @@ def main():
print(" kernel binary size:", args.kernel_size)
kern_rdup_pg_size = roundup_page(args.kernel_size)
# Add manifest for main ISH binary
- f.write(gen_manifest(b"ISHM", b"ISH_KERN", HEADER_SIZE, kern_rdup_pg_size))
+ f.write(
+ gen_manifest(b"ISHM", b"ISH_KERN", HEADER_SIZE, kern_rdup_pg_size)
+ )
if args.aon is not None:
print(" AON binary size: ", args.aon_size)
@@ -89,7 +91,11 @@ def main():
gen_manifest(
b"ISHM",
b"AON_TASK",
- (HEADER_SIZE + kern_rdup_pg_size * PAGE_SIZE - MANIFEST_ENTRY_SIZE),
+ (
+ HEADER_SIZE
+ + kern_rdup_pg_size * PAGE_SIZE
+ - MANIFEST_ENTRY_SIZE
+ ),
aon_rdup_pg_size,
)
)
diff --git a/chip/mchp/util/pack_ec.py b/chip/mchp/util/pack_ec.py
index 85aad94bd6..b015db377a 100755
--- a/chip/mchp/util/pack_ec.py
+++ b/chip/mchp/util/pack_ec.py
@@ -244,7 +244,11 @@ def BuildTag(args):
def BuildTagFromHdrAddr(header_loc):
tag = bytearray(
- [(header_loc >> 8) & 0xFF, (header_loc >> 16) & 0xFF, (header_loc >> 24) & 0xFF]
+ [
+ (header_loc >> 8) & 0xFF,
+ (header_loc >> 16) & 0xFF,
+ (header_loc >> 24) & 0xFF,
+ ]
)
tag.append(Crc8(0, tag))
return tag
@@ -327,7 +331,11 @@ def parseargs():
"--loader_file", help="EC loader binary", default="ecloader.bin"
)
parser.add_argument(
- "-s", "--spi_size", type=int, help="Size of the SPI flash in KB", default=512
+ "-s",
+ "--spi_size",
+ type=int,
+ help="Size of the SPI flash in KB",
+ default=512,
)
parser.add_argument(
"-l",
@@ -381,7 +389,10 @@ def parseargs():
default=False,
)
parser.add_argument(
- "--verbose", action="store_true", help="Enable verbose output", default=False
+ "--verbose",
+ action="store_true",
+ help="Enable verbose output",
+ default=False,
)
return parser.parse_args()
@@ -535,7 +546,9 @@ def main():
if args.test_spi == True:
crc = zlib.crc32(bytes(payload_rw[: (payload_rw_len - 32)]))
crc_ofs = payload_rw_len - 4
- debug_print("EC_RW CRC32 = 0x{0:08x} at offset 0x{1:08x}".format(crc, crc_ofs))
+ debug_print(
+ "EC_RW CRC32 = 0x{0:08x} at offset 0x{1:08x}".format(crc, crc_ofs)
+ )
for i in range(4):
payload_rw[crc_ofs + i] = crc & 0xFF
crc = crc >> 8
@@ -551,7 +564,11 @@ def main():
spi_list.append((args.header_loc, header, "header(lwf + ro)"))
spi_list.append(
- (args.header_loc + HEADER_SIZE, header_signature, "header(lwf + ro) signature")
+ (
+ args.header_loc + HEADER_SIZE,
+ header_signature,
+ "header(lwf + ro) signature",
+ )
)
spi_list.append(
(args.header_loc + args.payload_offset, payload, "payload(lfw + ro)")
@@ -593,12 +610,20 @@ def main():
for s in spi_list:
if addr < s[0]:
debug_print(
- "Offset ", hex(addr), " Length", hex(s[0] - addr), "fill with 0xff"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(s[0] - addr),
+ "fill with 0xff",
)
f.write(b"\xff" * (s[0] - addr))
addr = s[0]
debug_print(
- "Offset ", hex(addr), " Length", hex(len(s[1])), "write data"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(len(s[1])),
+ "write data",
)
f.write(s[1])
@@ -606,7 +631,11 @@ def main():
if addr < spi_size:
debug_print(
- "Offset ", hex(addr), " Length", hex(spi_size - addr), "fill with 0xff"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(spi_size - addr),
+ "fill with 0xff",
)
f.write(b"\xff" * (spi_size - addr))
diff --git a/chip/mchp/util/pack_ec_mec152x.py b/chip/mchp/util/pack_ec_mec152x.py
index 8ef7b3992c..1d7df5e9b6 100755
--- a/chip/mchp/util/pack_ec_mec152x.py
+++ b/chip/mchp/util/pack_ec_mec152x.py
@@ -123,7 +123,9 @@ def GetPayloadFromOffset(payload_file, offset, padsize):
if rem_len:
payload += PAYLOAD_PAD_BYTE * (padsize - rem_len)
- debug_print("GetPayload: Added {0} padding bytes".format(padsize - rem_len))
+ debug_print(
+ "GetPayload: Added {0} padding bytes".format(padsize - rem_len)
+ )
return payload
@@ -312,7 +314,9 @@ def BuildHeader2(args, chip_dict, payload_len, load_addr, payload_entry):
payload_units = int(payload_len // chip_dict["PAYLOAD_GRANULARITY"])
assert payload_units < 0x10000, print(
- "Payload too large: len={0} units={1}".format(payload_len, payload_units)
+ "Payload too large: len={0} units={1}".format(
+ payload_len, payload_units
+ )
)
header[0x10:0x12] = payload_units.to_bytes(2, "little")
@@ -414,7 +418,11 @@ def BuildTag(args):
def BuildTagFromHdrAddr(header_loc):
tag = bytearray(
- [(header_loc >> 8) & 0xFF, (header_loc >> 16) & 0xFF, (header_loc >> 24) & 0xFF]
+ [
+ (header_loc >> 8) & 0xFF,
+ (header_loc >> 16) & 0xFF,
+ (header_loc >> 24) & 0xFF,
+ ]
)
tag.append(Crc8(0, tag))
return tag
@@ -516,7 +524,11 @@ def parseargs():
"--loader_file", help="EC loader binary", default="ecloader.bin"
)
parser.add_argument(
- "-s", "--spi_size", type=int, help="Size of the SPI flash in KB", default=512
+ "-s",
+ "--spi_size",
+ type=int,
+ help="Size of the SPI flash in KB",
+ default=512,
)
parser.add_argument(
"-l",
@@ -563,7 +575,10 @@ def parseargs():
default=False,
)
parser.add_argument(
- "--verbose", action="store_true", help="Enable verbose output", default=False
+ "--verbose",
+ action="store_true",
+ help="Enable verbose output",
+ default=False,
)
parser.add_argument(
"--tag0_loc", type=int, help="MEC152X TAG0 SPI offset", default=0
@@ -761,7 +776,9 @@ def main():
# is filled with the hash digest of the respective entity.
# BuildHeader2 computes the hash digest and stores it in the correct
# header location.
- header = BuildHeader2(args, chip_dict, lfw_ecro_len, LOAD_ADDR, lfw_ecro_entry)
+ header = BuildHeader2(
+ args, chip_dict, lfw_ecro_len, LOAD_ADDR, lfw_ecro_entry
+ )
printByteArrayAsHex(header, "Header(lfw_ecro)")
ec_info_block = GenEcInfoBlock(args, chip_dict)
@@ -770,7 +787,9 @@ def main():
cosignature = GenCoSignature(args, chip_dict, lfw_ecro)
printByteArrayAsHex(cosignature, "LFW + EC_RO cosignature")
- trailer = GenTrailer(args, chip_dict, lfw_ecro, None, ec_info_block, cosignature)
+ trailer = GenTrailer(
+ args, chip_dict, lfw_ecro, None, ec_info_block, cosignature
+ )
printByteArrayAsHex(trailer, "LFW + EC_RO trailer")
@@ -782,7 +801,9 @@ def main():
debug_print("args.input = ", args.input)
debug_print("args.image_size = ", hex(args.image_size))
- ecrw = GetPayloadFromOffset(args.input, args.image_size, chip_dict["PAD_SIZE"])
+ ecrw = GetPayloadFromOffset(
+ args.input, args.image_size, chip_dict["PAD_SIZE"]
+ )
debug_print("type(ecrw) is ", type(ecrw))
debug_print("len(ecrw) is ", hex(len(ecrw)))
@@ -883,12 +904,20 @@ def main():
for s in spi_list:
if addr < s[0]:
debug_print(
- "Offset ", hex(addr), " Length", hex(s[0] - addr), "fill with 0xff"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(s[0] - addr),
+ "fill with 0xff",
)
f.write(b"\xff" * (s[0] - addr))
addr = s[0]
debug_print(
- "Offset ", hex(addr), " Length", hex(len(s[1])), "write data"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(len(s[1])),
+ "write data",
)
f.write(s[1])
@@ -896,7 +925,11 @@ def main():
if addr < spi_size:
debug_print(
- "Offset ", hex(addr), " Length", hex(spi_size - addr), "fill with 0xff"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(spi_size - addr),
+ "fill with 0xff",
)
f.write(b"\xff" * (spi_size - addr))
diff --git a/chip/mchp/util/pack_ec_mec172x.py b/chip/mchp/util/pack_ec_mec172x.py
index 25a4cb7ed1..ee14bdb2dc 100755
--- a/chip/mchp/util/pack_ec_mec172x.py
+++ b/chip/mchp/util/pack_ec_mec172x.py
@@ -132,7 +132,9 @@ def GetPayloadFromOffset(payload_file, offset, chip_dict):
if rem_len:
payload += chip_dict["PAYLOAD_PAD_BYTE"] * (padsize - rem_len)
- debug_print("GetPayload: Added {0} padding bytes".format(padsize - rem_len))
+ debug_print(
+ "GetPayload: Added {0} padding bytes".format(padsize - rem_len)
+ )
return payload
@@ -335,12 +337,16 @@ def BuildHeader2(args, chip_dict, payload_len, load_addr, payload_entry):
# bytes 0x10 - 0x11 payload length in units of 128 bytes
assert payload_len % chip_dict["PAYLOAD_GRANULARITY"] == 0, print(
- "Payload size not a multiple of {0}".format(chip_dict["PAYLOAD_GRANULARITY"])
+ "Payload size not a multiple of {0}".format(
+ chip_dict["PAYLOAD_GRANULARITY"]
+ )
)
payload_units = int(payload_len // chip_dict["PAYLOAD_GRANULARITY"])
assert payload_units < 0x10000, print(
- "Payload too large: len={0} units={1}".format(payload_len, payload_units)
+ "Payload too large: len={0} units={1}".format(
+ payload_len, payload_units
+ )
)
header[0x10:0x12] = payload_units.to_bytes(2, "little")
@@ -438,7 +444,9 @@ def GenTrailer(
debug_print(" Update: payload len=0x{0:0x}".format(len(payload)))
if ec_info_block != None:
hasher.update(ec_info_block)
- debug_print(" Update: ec_info_block len=0x{0:0x}".format(len(ec_info_block)))
+ debug_print(
+ " Update: ec_info_block len=0x{0:0x}".format(len(ec_info_block))
+ )
if encryption_key_header != None:
hasher.update(encryption_key_header)
debug_print(
@@ -448,7 +456,9 @@ def GenTrailer(
)
if cosignature != None:
hasher.update(cosignature)
- debug_print(" Update: cosignature len=0x{0:0x}".format(len(cosignature)))
+ debug_print(
+ " Update: cosignature len=0x{0:0x}".format(len(cosignature))
+ )
trailer[0:48] = hasher.digest()
trailer[-16:] = 16 * b"\xff"
@@ -478,7 +488,11 @@ def BuildTag(args):
def BuildTagFromHdrAddr(header_loc):
tag = bytearray(
- [(header_loc >> 8) & 0xFF, (header_loc >> 16) & 0xFF, (header_loc >> 24) & 0xFF]
+ [
+ (header_loc >> 8) & 0xFF,
+ (header_loc >> 16) & 0xFF,
+ (header_loc >> 24) & 0xFF,
+ ]
)
tag.append(Crc8(0, tag))
return tag
@@ -585,7 +599,11 @@ def parseargs():
"--load_addr", type=int, help="EC SRAM load address", default=0xC0000
)
parser.add_argument(
- "-s", "--spi_size", type=int, help="Size of the SPI flash in KB", default=512
+ "-s",
+ "--spi_size",
+ type=int,
+ help="Size of the SPI flash in KB",
+ default=512,
)
parser.add_argument(
"-l",
@@ -641,7 +659,10 @@ def parseargs():
default=False,
)
parser.add_argument(
- "--verbose", action="store_true", help="Enable verbose output", default=False
+ "--verbose",
+ action="store_true",
+ help="Enable verbose output",
+ default=False,
)
parser.add_argument(
"--tag0_loc", type=int, help="MEC172x TAG0 SPI offset", default=0
@@ -805,14 +826,18 @@ def main():
# 32-bit word at offset 0x4 address of reset handler
# NOTE: reset address will have bit[0]=1 to ensure thumb mode.
lfw_ecro_entry = GetEntryPoint(rorofile)
- debug_print("LFW Entry point from GetEntryPoint = 0x{0:08x}".format(lfw_ecro_entry))
+ debug_print(
+ "LFW Entry point from GetEntryPoint = 0x{0:08x}".format(lfw_ecro_entry)
+ )
# Chromebooks are not using MEC BootROM SPI header/payload authentication
# or payload encryption. In this case the header authentication signature
# is filled with the hash digest of the respective entity.
# BuildHeader2 computes the hash digest and stores it in the correct
# header location.
- header = BuildHeader2(args, chip_dict, lfw_ecro_len, args.load_addr, lfw_ecro_entry)
+ header = BuildHeader2(
+ args, chip_dict, lfw_ecro_len, args.load_addr, lfw_ecro_entry
+ )
printByteArrayAsHex(header, "Header(lfw_ecro)")
ec_info_block = GenEcInfoBlock(args, chip_dict)
@@ -821,7 +846,9 @@ def main():
cosignature = GenCoSignature(args, chip_dict, lfw_ecro)
printByteArrayAsHex(cosignature, "LFW + EC_RO cosignature")
- trailer = GenTrailer(args, chip_dict, lfw_ecro, None, ec_info_block, cosignature)
+ trailer = GenTrailer(
+ args, chip_dict, lfw_ecro, None, ec_info_block, cosignature
+ )
printByteArrayAsHex(trailer, "LFW + EC_RO trailer")
@@ -937,12 +964,20 @@ def main():
for s in spi_list:
if addr < s[0]:
debug_print(
- "Offset ", hex(addr), " Length", hex(s[0] - addr), "fill with 0xff"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(s[0] - addr),
+ "fill with 0xff",
)
f.write(b"\xff" * (s[0] - addr))
addr = s[0]
debug_print(
- "Offset ", hex(addr), " Length", hex(len(s[1])), "write data"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(len(s[1])),
+ "write data",
)
f.write(s[1])
@@ -950,7 +985,11 @@ def main():
if addr < spi_size:
debug_print(
- "Offset ", hex(addr), " Length", hex(spi_size - addr), "fill with 0xff"
+ "Offset ",
+ hex(addr),
+ " Length",
+ hex(spi_size - addr),
+ "fill with 0xff",
)
f.write(b"\xff" * (spi_size - addr))
diff --git a/chip/mec1322/util/pack_ec.py b/chip/mec1322/util/pack_ec.py
index 44ba6e7854..6898548feb 100755
--- a/chip/mec1322/util/pack_ec.py
+++ b/chip/mec1322/util/pack_ec.py
@@ -231,7 +231,11 @@ def parseargs():
"--loader_file", help="EC loader binary", default="ecloader.bin"
)
parser.add_argument(
- "-s", "--spi_size", type=int, help="Size of the SPI flash in MB", default=4
+ "-s",
+ "--spi_size",
+ type=int,
+ help="Size of the SPI flash in MB",
+ default=4,
)
parser.add_argument(
"-l",
@@ -281,7 +285,10 @@ def parseargs():
default=0xB,
)
parser.add_argument(
- "--image_size", type=int, help="Size of a single image.", default=(96 * 1024)
+ "--image_size",
+ type=int,
+ help="Size of a single image.",
+ default=(96 * 1024),
)
return parser.parse_args()
diff --git a/cts/common/board.py b/cts/common/board.py
index 68071f0b28..19b39c07cb 100644
--- a/cts/common/board.py
+++ b/cts/common/board.py
@@ -271,7 +271,14 @@ class Board(six.with_metaclass(ABCMeta, object)):
for device in com_devices:
self.tty_port = os.path.join(dev_dir, device)
properties = sp.check_output(
- ["udevadm", "info", "-a", "-n", self.tty_port, "--query=property"],
+ [
+ "udevadm",
+ "info",
+ "-a",
+ "-n",
+ self.tty_port,
+ "--query=property",
+ ],
**get_subprocess_args()
)
for line in [l.strip() for l in properties.split("\n")]:
@@ -394,7 +401,11 @@ class DeviceUnderTest(Board):
# If len(dut) is 0 then your dut doesn't use an st-link device, so we
# don't have to worry about its serial number
if not dut:
- msg = "Failed to find serial for DUT.\nIs " + self.board + " connected?"
+ msg = (
+ "Failed to find serial for DUT.\nIs "
+ + self.board
+ + " connected?"
+ )
raise RuntimeError(msg)
if len(dut) > 1:
msg = (
diff --git a/cts/cts.py b/cts/cts.py
index b29c849c0c..27620dd894 100755
--- a/cts/cts.py
+++ b/cts/cts.py
@@ -81,10 +81,14 @@ class Cts(object):
"""Build images for DUT and TH."""
print("Building DUT image...")
if not self.dut.build(self.ec_dir):
- raise RuntimeError("Building module %s for DUT failed" % (self.module))
+ raise RuntimeError(
+ "Building module %s for DUT failed" % (self.module)
+ )
print("Building TH image...")
if not self.th.build(self.ec_dir):
- raise RuntimeError("Building module %s for TH failed" % (self.module))
+ raise RuntimeError(
+ "Building module %s for TH failed" % (self.module)
+ )
def flash_boards(self):
"""Flashes TH and DUT with their most recently built ec.bin."""
@@ -134,7 +138,9 @@ class Cts(object):
d["name"] = l[0].strip()
d["th_rc"] = self.get_return_code_value(l[1].strip().strip('"'))
d["th_string"] = l[2].strip().strip('"')
- d["dut_rc"] = self.get_return_code_value(l[3].strip().strip('"'))
+ d["dut_rc"] = self.get_return_code_value(
+ l[3].strip().strip('"')
+ )
d["dut_string"] = l[4].strip().strip('"')
tests.append(d)
return tests
@@ -246,7 +252,10 @@ class Cts(object):
for i, v in enumerate(self.testlist):
if v["th_string"] in th_results[i]["output"] or not v["th_string"]:
th_results[i]["string"] = True
- if v["dut_string"] in dut_results[i]["output"] or not v["dut_string"]:
+ if (
+ v["dut_string"] in dut_results[i]["output"]
+ or not v["dut_string"]
+ ):
dut_results[i]["string"] = True
return th_results, dut_results
@@ -260,7 +269,8 @@ class Cts(object):
"""
len_test_name = max(len(s["name"]) for s in self.testlist)
len_code_name = max(
- len(self.get_return_code_name(v, True)) for v in self.return_codes.values()
+ len(self.get_return_code_name(v, True))
+ for v in self.return_codes.values()
)
head = "{:^" + str(len_test_name) + "} "
@@ -283,7 +293,9 @@ class Cts(object):
th_cn = self.get_return_code_name(th_results[i]["rc"], True)
dut_cn = self.get_return_code_name(dut_results[i]["rc"], True)
th_res = self.evaluate_result(
- th_results[i], self.testlist[i]["th_rc"], self.testlist[i]["th_string"]
+ th_results[i],
+ self.testlist[i]["th_rc"],
+ self.testlist[i]["th_string"],
)
dut_res = self.evaluate_result(
dut_results[i],
@@ -398,8 +410,12 @@ def main():
module = "meta"
parser = argparse.ArgumentParser(description="Used to build/flash boards")
- parser.add_argument("-d", "--dut", help="Specify DUT you want to build/flash")
- parser.add_argument("-m", "--module", help="Specify module you want to build/flash")
+ parser.add_argument(
+ "-d", "--dut", help="Specify DUT you want to build/flash"
+ )
+ parser.add_argument(
+ "-m", "--module", help="Specify module you want to build/flash"
+ )
parser.add_argument(
"-s",
"--setup",
@@ -407,7 +423,10 @@ def main():
help="Connect only the TH to save its serial",
)
parser.add_argument(
- "-b", "--build", action="store_true", help="Build test suite (no flashing)"
+ "-b",
+ "--build",
+ action="store_true",
+ help="Build test suite (no flashing)",
)
parser.add_argument(
"-f",
diff --git a/extra/cr50_rma_open/cr50_rma_open.py b/extra/cr50_rma_open/cr50_rma_open.py
index 47cb7cd6e3..b2600a6454 100755
--- a/extra/cr50_rma_open/cr50_rma_open.py
+++ b/extra/cr50_rma_open/cr50_rma_open.py
@@ -357,7 +357,9 @@ class RMAOpen(object):
def _run_on_dut(self, command):
"""Run the command on the DUT."""
- return subprocess.check_output(["ssh", self.ip, command], encoding="utf-8")
+ return subprocess.check_output(
+ ["ssh", self.ip, command], encoding="utf-8"
+ )
def _open_in_dev_mode(self):
"""Open Cr50 when it's in dev mode"""
@@ -471,7 +473,9 @@ class RMAOpen(object):
rma_support,
)
if not self.is_prepvt and self._running_version_is_older(TESTLAB_PROD):
- raise ValueError("Update cr50. No testlab support in old prod images.")
+ raise ValueError(
+ "Update cr50. No testlab support in old prod images."
+ )
if self._running_version_is_older(rma_support):
raise ValueError(
"%s does not have RMA support. Update to at least %s"
@@ -565,7 +569,9 @@ class RMAOpen(object):
raise ValueError('Could not find usb device "%s"' % usb_serial)
return usb_serial
if len(serialnames) > 1:
- logging.info("Found Cr50 device serialnames %s", ", ".join(serialnames))
+ logging.info(
+ "Found Cr50 device serialnames %s", ", ".join(serialnames)
+ )
logging.warning(DEBUG_TOO_MANY_USB_DEVICES)
raise ValueError("Too many cr50 usb devices")
return serialnames[0]
@@ -605,7 +611,10 @@ def parse_args(argv):
help="Generate Cr50 challenge. Must be used with -i",
)
parser.add_argument(
- "-t", "--enable_testlab", action="store_true", help="enable testlab mode"
+ "-t",
+ "--enable_testlab",
+ action="store_true",
+ help="enable testlab mode",
)
parser.add_argument(
"-w", "--wp_disable", action="store_true", help="Disable write protect"
@@ -617,7 +626,11 @@ def parse_args(argv):
help="Check cr50 console connection works",
)
parser.add_argument(
- "-s", "--serialname", type=str, default="", help="The cr50 usb serialname"
+ "-s",
+ "--serialname",
+ type=str,
+ default="",
+ help="The cr50 usb serialname",
)
parser.add_argument(
"-D", "--debug", action="store_true", help="print debug messages"
@@ -647,7 +660,11 @@ def parse_args(argv):
"-P", "--servo_port", type=str, default="", help="the servo port"
)
parser.add_argument(
- "-I", "--ip", type=str, default="", help="The DUT IP. Necessary to do ccd open"
+ "-I",
+ "--ip",
+ type=str,
+ default="",
+ help="The DUT IP. Necessary to do ccd open",
)
return parser.parse_args(argv)
@@ -667,7 +684,9 @@ def main(argv):
tried_authcode = False
logging.info("Running cr50_rma_open version %s", SCRIPT_VERSION)
- cr50_rma_open = RMAOpen(opts.device, opts.serialname, opts.servo_port, opts.ip)
+ cr50_rma_open = RMAOpen(
+ opts.device, opts.serialname, opts.servo_port, opts.ip
+ )
if opts.check_connection:
sys.exit(0)
@@ -683,14 +702,18 @@ def main(argv):
cr50_rma_open.try_authcode(opts.authcode)
tried_authcode = True
- if not cr50_rma_open.check(WP_IS_DISABLED) and (tried_authcode or opts.wp_disable):
+ if not cr50_rma_open.check(WP_IS_DISABLED) and (
+ tried_authcode or opts.wp_disable
+ ):
if not cr50_rma_open.check(CCD_IS_UNRESTRICTED):
raise ValueError(
"Can't disable write protect unless ccd is "
"open. Run through the rma open process first"
)
if tried_authcode:
- logging.warning("RMA Open did not disable write protect. File a bug")
+ logging.warning(
+ "RMA Open did not disable write protect. File a bug"
+ )
logging.warning("Trying to disable it manually")
cr50_rma_open.wp_disable()
diff --git a/extra/stack_analyzer/stack_analyzer.py b/extra/stack_analyzer/stack_analyzer.py
index 9feb2423c9..cd5ca29011 100755
--- a/extra/stack_analyzer/stack_analyzer.py
+++ b/extra/stack_analyzer/stack_analyzer.py
@@ -59,7 +59,9 @@ class Task(object):
routine_address: Resolved routine address. None if it hasn't been resolved.
"""
- def __init__(self, name, routine_name, stack_max_size, routine_address=None):
+ def __init__(
+ self, name, routine_name, stack_max_size, routine_address=None
+ ):
"""Constructor.
Args:
@@ -250,7 +252,9 @@ class Function(object):
if len(self.stack_max_path) != len(other.stack_max_path):
return False
- for self_func, other_func in zip(self.stack_max_path, other.stack_max_path):
+ for self_func, other_func in zip(
+ self.stack_max_path, other.stack_max_path
+ ):
# Assume the addresses of functions are unique.
if self_func.address != other_func.address:
return False
@@ -294,10 +298,14 @@ class AndesAnalyzer(object):
r"^(b{0}|j|jr|jr.|jrnez)(\d?|\d\d)$".format(CONDITION_CODES_RE)
)
# Call instructions.
- CALL_OPCODE_RE = re.compile(r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$")
+ CALL_OPCODE_RE = re.compile(
+ r"^(jal|jral|jral.|jralnez|beqzal|bltzal|bgezal)(\d)?$"
+ )
CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE))
# Ignore lp register because it's for return.
- INDIRECT_CALL_OPERAND_RE = re.compile(r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$")
+ INDIRECT_CALL_OPERAND_RE = re.compile(
+ r"^\$r\d{1,}$|\$fp$|\$gp$|\$ta$|\$sp$|\$pc$"
+ )
# TODO: Handle other kinds of store instructions.
PUSH_OPCODE_RE = re.compile(r"^push(\d{1,})$")
PUSH_OPERAND_RE = re.compile(r"^\$r\d{1,}, \#\d{1,} \! \{([^\]]+)\}")
@@ -363,7 +371,10 @@ class AndesAnalyzer(object):
result = self.CALL_OPERAND_RE.match(operand_text)
if result is None:
- if self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None:
+ if (
+ self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
# Found an indirect call.
callsites.append(Callsite(address, None, is_tail))
@@ -378,7 +389,9 @@ class AndesAnalyzer(object):
< (function_symbol.address + function_symbol.size)
):
# Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
elif self.LWI_OPCODE_RE.match(opcode) is not None:
result = self.LWI_PC_OPERAND_RE.match(operand_text)
@@ -400,14 +413,25 @@ class AndesAnalyzer(object):
result = self.PUSH_OPERAND_RE.match(operand_text)
operandgroup_text = result.group(1)
# capture $rx~$ry
- if self.OPERANDGROUP_RE.match(operandgroup_text) is not None:
+ if (
+ self.OPERANDGROUP_RE.match(operandgroup_text)
+ is not None
+ ):
# capture number & transfer string to integer
oprandgrouphead = operandgroup_text.split(",")[0]
rx = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[0]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[0]
+ )
+ )
)
ry = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[1]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[1]
+ )
+ )
)
stack_frame += (
@@ -425,14 +449,25 @@ class AndesAnalyzer(object):
result = self.SMW_OPERAND_RE.match(operand_text)
operandgroup_text = result.group(3)
# capture $rx~$ry
- if self.OPERANDGROUP_RE.match(operandgroup_text) is not None:
+ if (
+ self.OPERANDGROUP_RE.match(operandgroup_text)
+ is not None
+ ):
# capture number & transfer string to integer
oprandgrouphead = operandgroup_text.split(",")[0]
rx = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[0]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[0]
+ )
+ )
)
ry = int(
- "".join(filter(str.isdigit, oprandgrouphead.split("~")[1]))
+ "".join(
+ filter(
+ str.isdigit, oprandgrouphead.split("~")[1]
+ )
+ )
)
stack_frame += (
@@ -482,9 +517,13 @@ class ArmAnalyzer(object):
# Fuzzy regular expressions for instruction and operand parsing.
# Branch instructions.
- JUMP_OPCODE_RE = re.compile(r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE))
+ JUMP_OPCODE_RE = re.compile(
+ r"^(b{0}|bx{0})(\.\w)?$".format(CONDITION_CODES_RE)
+ )
# Call instructions.
- CALL_OPCODE_RE = re.compile(r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE))
+ CALL_OPCODE_RE = re.compile(
+ r"^(bl{0}|blx{0})(\.\w)?$".format(CONDITION_CODES_RE)
+ )
CALL_OPERAND_RE = re.compile(r"^{}$".format(IMM_ADDRESS_RE))
CBZ_CBNZ_OPCODE_RE = re.compile(r"^(cbz|cbnz)(\.\w)?$")
# Example: "r0, 1009bcbe <host_cmd_motion_sense+0x1d2>"
@@ -555,7 +594,9 @@ class ArmAnalyzer(object):
for address, opcode, operand_text in instructions:
is_jump_opcode = self.JUMP_OPCODE_RE.match(opcode) is not None
is_call_opcode = self.CALL_OPCODE_RE.match(opcode) is not None
- is_cbz_cbnz_opcode = self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None
+ is_cbz_cbnz_opcode = (
+ self.CBZ_CBNZ_OPCODE_RE.match(opcode) is not None
+ )
if is_jump_opcode or is_call_opcode or is_cbz_cbnz_opcode:
is_tail = is_jump_opcode or is_cbz_cbnz_opcode
@@ -586,7 +627,9 @@ class ArmAnalyzer(object):
< (function_symbol.address + function_symbol.size)
):
# Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
elif self.LDR_OPCODE_RE.match(opcode) is not None:
result = self.LDR_PC_OPERAND_RE.match(operand_text)
@@ -599,7 +642,8 @@ class ArmAnalyzer(object):
elif self.PUSH_OPCODE_RE.match(opcode) is not None:
# Example: "{r4, r5, r6, r7, lr}"
stack_frame += (
- len(operand_text.split(",")) * self.GENERAL_PURPOSE_REGISTER_SIZE
+ len(operand_text.split(","))
+ * self.GENERAL_PURPOSE_REGISTER_SIZE
)
elif self.SUB_OPCODE_RE.match(opcode) is not None:
result = self.SUB_OPERAND_RE.match(operand_text)
@@ -614,7 +658,11 @@ class ArmAnalyzer(object):
# Subtract and writeback to stack register.
# Example: "sp!, {r4, r5, r6, r7, r8, r9, lr}"
# Get the text of pushed register list.
- unused_sp, unused_sep, parameter_text = operand_text.partition(",")
+ (
+ unused_sp,
+ unused_sep,
+ parameter_text,
+ ) = operand_text.partition(",")
stack_frame += (
len(parameter_text.split(","))
* self.GENERAL_PURPOSE_REGISTER_SIZE
@@ -716,13 +764,18 @@ class RiscvAnalyzer(object):
result = self.CALL_OPERAND_RE.match(operand_text)
if result is None:
- if self.INDIRECT_CALL_OPERAND_RE.match(operand_text) is not None:
+ if (
+ self.INDIRECT_CALL_OPERAND_RE.match(operand_text)
+ is not None
+ ):
# Found an indirect call.
callsites.append(Callsite(address, None, is_tail))
else:
# Capture address form operand_text and then convert to string
- address_str = "".join(self.CAPTURE_ADDRESS.findall(operand_text))
+ address_str = "".join(
+ self.CAPTURE_ADDRESS.findall(operand_text)
+ )
# String to integer
target_address = int(address_str, 16)
# Filter out the in-function target (branches and in-function calls,
@@ -734,7 +787,9 @@ class RiscvAnalyzer(object):
< (function_symbol.address + function_symbol.size)
):
# Maybe it is a callsite.
- callsites.append(Callsite(address, target_address, is_tail))
+ callsites.append(
+ Callsite(address, target_address, is_tail)
+ )
elif self.ADDI_OPCODE_RE.match(opcode) is not None:
# Example: sp,sp,-32
@@ -940,7 +995,9 @@ class StackAnalyzer(object):
instructions = []
# If symbol size exists, use it as a hint of function size.
if function_symbol.size > 0:
- function_end = function_symbol.address + function_symbol.size
+ function_end = (
+ function_symbol.address + function_symbol.size
+ )
else:
function_end = None
@@ -1193,7 +1250,10 @@ class StackAnalyzer(object):
else:
add_rules[src_sig].add(dst_sig)
- if "remove" in self.annotation and self.annotation["remove"] is not None:
+ if (
+ "remove" in self.annotation
+ and self.annotation["remove"] is not None
+ ):
for sigtxt_path in self.annotation["remove"]:
if isinstance(sigtxt_path, str):
# The path has only one vertex.
@@ -1232,7 +1292,9 @@ class StackAnalyzer(object):
# Append each signature of the current node to the all previous
# remove paths.
- sig_paths = [path + [sig] for path in sig_paths for sig in sig_set]
+ sig_paths = [
+ path + [sig] for path in sig_paths for sig in sig_set
+ ]
if not broken_flag:
# All signatures are normalized. The remove path has no error.
@@ -1282,7 +1344,9 @@ class StackAnalyzer(object):
signature_set.update(remove_sigs)
# Map signatures to functions.
- (signature_map, sig_error_map) = self.MapAnnotation(function_map, signature_set)
+ (signature_map, sig_error_map) = self.MapAnnotation(
+ function_map, signature_set
+ )
# Build the indirect callsite map indexed by callsite signature.
indirect_map = collections.defaultdict(set)
@@ -1326,7 +1390,9 @@ class StackAnalyzer(object):
# Assume the error is always the not found error. Since the signature
# found in indirect callsite map must be a full signature, it can't
# happen the ambiguous error.
- assert sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND
+ assert (
+ sig_error_map[src_sig] == self.ANNOTATION_ERROR_NOTFOUND
+ )
# Found in inline stack, remove the not found error.
del sig_error_map[src_sig]
@@ -1357,7 +1423,9 @@ class StackAnalyzer(object):
else:
# Append each function of the current signature to the all previous
# remove paths.
- remove_paths = [p + [f] for p in remove_paths for f in remove_funcs]
+ remove_paths = [
+ p + [f] for p in remove_paths for f in remove_funcs
+ ]
if skip_flag:
# Ignore the broken remove path.
@@ -1417,7 +1485,9 @@ class StackAnalyzer(object):
for src_func, dst_func in add_set:
# TODO(cheyuw): Support tailing call annotation.
- src_func.callsites.append(Callsite(None, dst_func.address, False, dst_func))
+ src_func.callsites.append(
+ Callsite(None, dst_func.address, False, dst_func)
+ )
# Delete simple remove paths.
remove_simple = set(tuple(p) for p in remove_list if len(p) <= 2)
@@ -1431,7 +1501,10 @@ class StackAnalyzer(object):
) in remove_simple:
continue
- if callsite.target is None and callsite.address in eliminated_addrs:
+ if (
+ callsite.target is None
+ and callsite.address in eliminated_addrs
+ ):
continue
cleaned_callsites.append(callsite)
@@ -1542,7 +1615,9 @@ class StackAnalyzer(object):
elif callee_state in stacked_states:
# The state is shown in the stack. There is a cycle.
sub_stack_usage = 0
- scc_lowlink = min(scc_lowlink, scc_index_map[callee_state])
+ scc_lowlink = min(
+ scc_lowlink, scc_index_map[callee_state]
+ )
if callee_state == curr_state:
self_loop = True
@@ -1559,9 +1634,13 @@ class StackAnalyzer(object):
if callsite.is_tail:
# For tailing call, since the callee reuses the stack frame of the
# caller, choose the larger one directly.
- stack_usage = max(curr_func.stack_frame, sub_stack_usage)
+ stack_usage = max(
+ curr_func.stack_frame, sub_stack_usage
+ )
else:
- stack_usage = curr_func.stack_frame + sub_stack_usage
+ stack_usage = (
+ curr_func.stack_frame + sub_stack_usage
+ )
if stack_usage > max_stack_usage:
max_stack_usage = stack_usage
@@ -1664,12 +1743,16 @@ class StackAnalyzer(object):
(function_name, path, linenum) = line_info
line_texts.append(
- "{}[{}:{}]".format(function_name, os.path.relpath(path), linenum)
+ "{}[{}:{}]".format(
+ function_name, os.path.relpath(path), linenum
+ )
)
output = "{}-> {} {:x}\n".format(prefix, line_texts[0], address)
for depth, line_text in enumerate(line_texts[1:]):
- output += "{} {}- {}\n".format(prefix, " " * depth, line_text)
+ output += "{} {}- {}\n".format(
+ prefix, " " * depth, line_text
+ )
# Remove the last newline character.
return (order_key, output.rstrip("\n"))
@@ -1677,7 +1760,8 @@ class StackAnalyzer(object):
# Analyze disassembly.
try:
disasm_text = subprocess.check_output(
- [self.options.objdump, "-d", self.options.elf_path], encoding="utf-8"
+ [self.options.objdump, "-d", self.options.elf_path],
+ encoding="utf-8",
)
except subprocess.CalledProcessError:
raise StackAnalyzerError("objdump failed to disassemble.")
@@ -1773,7 +1857,11 @@ class StackAnalyzer(object):
if len(cycle_functions) > 0:
print("There are cycles in the following function sets:")
for functions in cycle_functions:
- print("[{}]".format(", ".join(function.name for function in functions)))
+ print(
+ "[{}]".format(
+ ", ".join(function.name for function in functions)
+ )
+ )
def ParseArgs():
@@ -1795,7 +1883,9 @@ def ParseArgs():
help="the section.",
choices=[SECTION_RO, SECTION_RW],
)
- parser.add_argument("--objdump", default="objdump", help="the path of objdump")
+ parser.add_argument(
+ "--objdump", default="objdump", help="the path of objdump"
+ )
parser.add_argument(
"--addr2line", default="addr2line", help="the path of addr2line"
)
@@ -1954,7 +2044,9 @@ def main():
annotation = {}
elif not os.path.exists(options.annotation):
print(
- "Warning: Annotation file {} does not exist.".format(options.annotation)
+ "Warning: Annotation file {} does not exist.".format(
+ options.annotation
+ )
)
annotation = {}
else:
@@ -1964,11 +2056,15 @@ def main():
except yaml.YAMLError:
raise StackAnalyzerError(
- "Failed to parse annotation file {}.".format(options.annotation)
+ "Failed to parse annotation file {}.".format(
+ options.annotation
+ )
)
except IOError:
raise StackAnalyzerError(
- "Failed to open annotation file {}.".format(options.annotation)
+ "Failed to open annotation file {}.".format(
+ options.annotation
+ )
)
# TODO(cheyuw): Do complete annotation format verification.
@@ -1987,7 +2083,9 @@ def main():
encoding="utf-8",
)
except subprocess.CalledProcessError:
- raise StackAnalyzerError("objdump failed to dump symbol table or rodata.")
+ raise StackAnalyzerError(
+ "objdump failed to dump symbol table or rodata."
+ )
except OSError:
raise StackAnalyzerError("Failed to run objdump.")
diff --git a/extra/stack_analyzer/stack_analyzer_unittest.py b/extra/stack_analyzer/stack_analyzer_unittest.py
index 228b7e010e..a40edaa5d8 100755
--- a/extra/stack_analyzer/stack_analyzer_unittest.py
+++ b/extra/stack_analyzer/stack_analyzer_unittest.py
@@ -70,7 +70,9 @@ class ArmAnalyzerTest(unittest.TestCase):
cbz_list = ["cbz", "cbnz", "cbz.n", "cbnz.n", "cbz.w", "cbnz.w"]
for opcode in cbz_list:
- self.assertIsNotNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode))
+ self.assertIsNotNone(
+ sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match(opcode)
+ )
self.assertIsNone(sa.ArmAnalyzer.CBZ_CBNZ_OPCODE_RE.match("cbn"))
@@ -199,7 +201,10 @@ class StackAnalyzerTest(unittest.TestCase):
" 20000 dead1000 00100000 dead2000 00200000 He..f.He..s.\n"
)
rodata = sa.ParseRoDataText(rodata_text)
- expect_rodata = (0x20000, [0x0010ADDE, 0x00001000, 0x0020ADDE, 0x00002000])
+ expect_rodata = (
+ 0x20000,
+ [0x0010ADDE, 0x00001000, 0x0020ADDE, 0x00002000],
+ )
self.assertEqual(rodata, expect_rodata)
def testLoadTasklist(self):
@@ -246,13 +251,21 @@ class StackAnalyzerTest(unittest.TestCase):
def testResolveAnnotation(self):
self.analyzer.annotation = {}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(add_rules, {})
self.assertEqual(remove_rules, [])
self.assertEqual(invalid_sigtxts, set())
self.analyzer.annotation = {"add": None, "remove": None}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(add_rules, {})
self.assertEqual(remove_rules, [])
self.assertEqual(invalid_sigtxts, set())
@@ -264,7 +277,11 @@ class StackAnalyzerTest(unittest.TestCase):
[["a", "b[x:3]"], ["0", "1", "2"], "x"],
],
}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(add_rules, {})
self.assertEqual(
list.sort(remove_rules),
@@ -298,7 +315,11 @@ class StackAnalyzerTest(unittest.TestCase):
"touchpad_calc": [dict(name="__array", stride=8, offset=4)],
}
}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(
add_rules,
{
@@ -351,7 +372,11 @@ class StackAnalyzerTest(unittest.TestCase):
["inlined_mul", "inlined_mul_alias", "console_task"],
],
}
- (add_rules, remove_rules, invalid_sigtxts) = self.analyzer.LoadAnnotation()
+ (
+ add_rules,
+ remove_rules,
+ invalid_sigtxts,
+ ) = self.analyzer.LoadAnnotation()
self.assertEqual(invalid_sigtxts, {"touchpad?calc["})
signature_set = set()
@@ -362,7 +387,9 @@ class StackAnalyzerTest(unittest.TestCase):
for remove_sigs in remove_rules:
signature_set.update(remove_sigs)
- (signature_map, failed_sigs) = self.analyzer.MapAnnotation(funcs, signature_set)
+ (signature_map, failed_sigs) = self.analyzer.MapAnnotation(
+ funcs, signature_set
+ )
result = self.analyzer.ResolveAnnotation(funcs)
(add_set, remove_list, eliminated_addrs, failed_sigs) = result
@@ -416,7 +443,10 @@ class StackAnalyzerTest(unittest.TestCase):
("touchpad_calc", sa.StackAnalyzer.ANNOTATION_ERROR_AMBIGUOUS),
("hook_task[q.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
("task_unk[a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
- ("touchpad_calc[x/a.c]", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
+ (
+ "touchpad_calc[x/a.c]",
+ sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND,
+ ),
("trackpad_range", sa.StackAnalyzer.ANNOTATION_ERROR_NOTFOUND),
},
)
@@ -427,7 +457,9 @@ class StackAnalyzerTest(unittest.TestCase):
0x2000: sa.Function(0x2000, "console_task", 0, []),
0x4000: sa.Function(0x4000, "touchpad_calc", 0, []),
}
- funcs[0x1000].callsites = [sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])]
+ funcs[0x1000].callsites = [
+ sa.Callsite(0x1002, 0x1000, False, funcs[0x1000])
+ ]
funcs[0x2000].callsites = [
sa.Callsite(0x2002, 0x1000, False, funcs[0x1000]),
sa.Callsite(0x2006, None, True, None),
@@ -495,7 +527,10 @@ class StackAnalyzerTest(unittest.TestCase):
)
function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
func_hook_task = sa.Function(
- 0x1000, "hook_task", 48, [sa.Callsite(0x1006, 0x100929DE, True, None)]
+ 0x1000,
+ "hook_task",
+ 48,
+ [sa.Callsite(0x1006, 0x100929DE, True, None)],
)
expect_funcmap = {
0x1000: func_hook_task,
@@ -537,7 +572,10 @@ class StackAnalyzerTest(unittest.TestCase):
)
function_map = self.analyzer.AnalyzeDisassembly(disasm_text)
func_hook_task = sa.Function(
- 0x1000, "hook_task", 0, [sa.Callsite(0x1006, 0x100929DE, True, None)]
+ 0x1000,
+ "hook_task",
+ 0,
+ [sa.Callsite(0x1006, 0x100929DE, True, None)],
)
expect_funcmap = {
0x1000: func_hook_task,
@@ -585,11 +623,21 @@ class StackAnalyzerTest(unittest.TestCase):
sa.Callsite(0x4006, 0x7000, False, funcs[0x7000]),
sa.Callsite(0x400A, 0x8000, False, funcs[0x8000]),
]
- funcs[0x5000].callsites = [sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])]
- funcs[0x7000].callsites = [sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])]
- funcs[0x8000].callsites = [sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])]
- funcs[0x9000].callsites = [sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])]
- funcs[0x10000].callsites = [sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])]
+ funcs[0x5000].callsites = [
+ sa.Callsite(0x5002, 0x4000, False, funcs[0x4000])
+ ]
+ funcs[0x7000].callsites = [
+ sa.Callsite(0x7002, 0x7000, False, funcs[0x7000])
+ ]
+ funcs[0x8000].callsites = [
+ sa.Callsite(0x8002, 0x9000, False, funcs[0x9000])
+ ]
+ funcs[0x9000].callsites = [
+ sa.Callsite(0x9002, 0x4000, False, funcs[0x4000])
+ ]
+ funcs[0x10000].callsites = [
+ sa.Callsite(0x10002, 0x2000, False, funcs[0x2000])
+ ]
cycles = self.analyzer.AnalyzeCallGraph(
funcs,
@@ -643,7 +691,10 @@ class StackAnalyzerTest(unittest.TestCase):
0x5000: (152, [funcs[0x5000], funcs[0x4000], funcs[0x7000]]),
0x6000: (100, [funcs[0x6000]]),
0x7000: (24, [funcs[0x7000]]),
- 0x8000: (160, [funcs[0x8000], funcs[0x9000], funcs[0x4000], funcs[0x7000]]),
+ 0x8000: (
+ 160,
+ [funcs[0x8000], funcs[0x9000], funcs[0x4000], funcs[0x7000]],
+ ),
0x9000: (140, [funcs[0x9000], funcs[0x4000], funcs[0x7000]]),
0x10000: (
200,
@@ -688,11 +739,14 @@ class StackAnalyzerTest(unittest.TestCase):
[("fake_func", "/a.c", 1), ("bake_func", "/b.c", 2)],
)
checkoutput_mock.assert_called_once_with(
- ["addr2line", "-f", "-e", "./ec.RW.elf", "1234", "-i"], encoding="utf-8"
+ ["addr2line", "-f", "-e", "./ec.RW.elf", "1234", "-i"],
+ encoding="utf-8",
)
checkoutput_mock.reset_mock()
- checkoutput_mock.return_value = "fake_func\n/test.c:1 (discriminator 128)"
+ checkoutput_mock.return_value = (
+ "fake_func\n/test.c:1 (discriminator 128)"
+ )
self.assertEqual(
self.analyzer.AddressToLine(0x12345), [("fake_func", "/test.c", 1)]
)
@@ -703,7 +757,8 @@ class StackAnalyzerTest(unittest.TestCase):
checkoutput_mock.return_value = "??\n:?\nbake_func\n/b.c:2\n"
self.assertEqual(
- self.analyzer.AddressToLine(0x123456), [None, ("bake_func", "/b.c", 2)]
+ self.analyzer.AddressToLine(0x123456),
+ [None, ("bake_func", "/b.c", 2)],
)
checkoutput_mock.assert_called_once_with(
["addr2line", "-f", "-e", "./ec.RW.elf", "123456"], encoding="utf-8"
@@ -716,7 +771,9 @@ class StackAnalyzerTest(unittest.TestCase):
checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
self.analyzer.AddressToLine(0x5678)
- with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run addr2line."):
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run addr2line."
+ ):
checkoutput_mock.side_effect = OSError()
self.analyzer.AddressToLine(0x9012)
@@ -773,7 +830,9 @@ class StackAnalyzerTest(unittest.TestCase):
]
)
- with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run objdump."):
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run objdump."
+ ):
checkoutput_mock.side_effect = OSError()
self.analyzer.Analyze()
@@ -836,7 +895,9 @@ class StackAnalyzerTest(unittest.TestCase):
]
)
- with self.assertRaisesRegexp(sa.StackAnalyzerError, "Failed to run objdump."):
+ with self.assertRaisesRegexp(
+ sa.StackAnalyzerError, "Failed to run objdump."
+ ):
checkoutput_mock.side_effect = OSError()
self.analyzer.Analyze()
@@ -911,7 +972,9 @@ class StackAnalyzerTest(unittest.TestCase):
with mock.patch("builtins.print") as print_mock:
checkoutput_mock.side_effect = [symbol_text, rodata_text]
sa.main()
- print_mock.assert_called_once_with("Error: Failed to load export_taskinfo.")
+ print_mock.assert_called_once_with(
+ "Error: Failed to load export_taskinfo."
+ )
with mock.patch("builtins.print") as print_mock:
checkoutput_mock.side_effect = subprocess.CalledProcessError(1, "")
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py
index c87378b94d..ec38e7dd0a 100644
--- a/extra/tigertool/ecusb/pty_driver.py
+++ b/extra/tigertool/ecusb/pty_driver.py
@@ -126,7 +126,9 @@ class ptyDriver(object):
"""
self._issue_cmd_get_results(cmds, [])
- def _issue_cmd_get_results(self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT):
+ def _issue_cmd_get_results(
+ self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT
+ ):
"""Send command to the device and wait for response.
This function waits for response message matching a regular
@@ -199,10 +201,14 @@ class ptyDriver(object):
try:
self._child.expect(regex, timeout=0.1)
match = self._child.match
- lastindex = match.lastindex if match and match.lastindex else 0
+ lastindex = (
+ match.lastindex if match and match.lastindex else 0
+ )
# Create a tuple which contains the entire matched string and all
# the subgroups of the match.
- result = match.group(*range(lastindex + 1)) if match else None
+ result = (
+ match.group(*range(lastindex + 1)) if match else None
+ )
if result:
result = tuple(res.decode("utf-8") for res in result)
result_list.append(result)
diff --git a/extra/tigertool/ecusb/stm32uart.py b/extra/tigertool/ecusb/stm32uart.py
index a6fa73970d..917ae8d7fd 100644
--- a/extra/tigertool/ecusb/stm32uart.py
+++ b/extra/tigertool/ecusb/stm32uart.py
@@ -66,7 +66,10 @@ class Suart(object):
self._tx_thread = None
self._debuglog = debuglog
self._susb = stm32usb.Susb(
- vendor=vendor, product=product, interface=interface, serialname=serialname
+ vendor=vendor,
+ product=product,
+ interface=interface,
+ serialname=serialname,
)
self._running = False
diff --git a/extra/tigertool/ecusb/stm32usb.py b/extra/tigertool/ecusb/stm32usb.py
index e8a0bc4f9b..b36fd5b3d7 100644
--- a/extra/tigertool/ecusb/stm32usb.py
+++ b/extra/tigertool/ecusb/stm32usb.py
@@ -35,7 +35,12 @@ class Susb(object):
TIMEOUT_MS = 100
def __init__(
- self, vendor=0x18D1, product=0x5027, interface=1, serialname=None, logger=None
+ self,
+ vendor=0x18D1,
+ product=0x5027,
+ interface=1,
+ serialname=None,
+ logger=None,
):
"""Susb constructor.
@@ -83,7 +88,8 @@ class Susb(object):
dev = dev_list[0]
except StopIteration:
raise SusbError(
- "USB device %04x:%04x not found" % (self._vendor, self._product)
+ "USB device %04x:%04x not found"
+ % (self._vendor, self._product)
)
# If we can't set configuration, it's already been set.
@@ -111,11 +117,15 @@ class Susb(object):
dev.detach_kernel_driver(intf.bInterfaceNumber)
read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
- read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
+ read_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=read_ep_number
+ )
self._read_ep = read_ep
write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
- write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
+ write_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=write_ep_number
+ )
self._write_ep = write_ep
def close(self):
diff --git a/extra/tigertool/ecusb/tiny_servo_common.py b/extra/tigertool/ecusb/tiny_servo_common.py
index 889abcc0a0..3f0a8ff46c 100644
--- a/extra/tigertool/ecusb/tiny_servo_common.py
+++ b/extra/tigertool/ecusb/tiny_servo_common.py
@@ -146,7 +146,9 @@ def wait_for_usb_remove(vidpid, serialname=None, timeout=None):
Wrapper for wait_for_usb below
"""
- wait_for_usb(vidpid, serialname=serialname, timeout=timeout, desiredpresence=False)
+ wait_for_usb(
+ vidpid, serialname=serialname, timeout=timeout, desiredpresence=False
+ )
def wait_for_usb(vidpid, serialname=None, timeout=None, desiredpresence=True):
@@ -167,7 +169,9 @@ def wait_for_usb(vidpid, serialname=None, timeout=None, desiredpresence=True):
time.sleep(0.1)
if timeout:
if datetime.datetime.now() > finish:
- raise TinyServoError("Timeout", "Timeout waiting for USB %s" % vidpid)
+ raise TinyServoError(
+ "Timeout", "Timeout waiting for USB %s" % vidpid
+ )
def do_serialno(serialno, pty):
@@ -198,7 +202,8 @@ def do_serialno(serialno, pty):
else:
log("Serial number set to %s but saved as %s." % (serialno, sn))
raise TinyServoError(
- "Serial Number", "Serial number set to %s but saved as %s." % (serialno, sn)
+ "Serial Number",
+ "Serial number set to %s but saved as %s." % (serialno, sn),
)
diff --git a/extra/tigertool/tigertest.py b/extra/tigertool/tigertest.py
index 924ec8c97c..bd748f9b34 100755
--- a/extra/tigertool/tigertest.py
+++ b/extra/tigertool/tigertest.py
@@ -76,7 +76,9 @@ def test_sequence():
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("-c", "--count", type=int, default=1, help="loops to run")
+ parser.add_argument(
+ "-c", "--count", type=int, default=1, help="loops to run"
+ )
opts = parser.parse_args(argv)
diff --git a/extra/tigertool/tigertool.py b/extra/tigertool/tigertool.py
index 241e7fe155..45dfec8286 100755
--- a/extra/tigertool/tigertool.py
+++ b/extra/tigertool/tigertool.py
@@ -202,7 +202,9 @@ def do_sysjump(region, pty, serialname):
"""
validregion = ["ro", "rw"]
if region not in validregion:
- c.log("Region setting %s invalid, try one of %s" % (region, validregion))
+ c.log(
+ "Region setting %s invalid, try one of %s" % (region, validregion)
+ )
return False
cmd = "sysjump %s" % region
@@ -232,7 +234,11 @@ def do_sysjump(region, pty, serialname):
def get_parser():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
- "-s", "--serialno", type=str, default=None, help="serial number of board to use"
+ "-s",
+ "--serialno",
+ type=str,
+ default=None,
+ help="serial number of board to use",
)
parser.add_argument(
"-b",
@@ -253,9 +259,13 @@ def get_parser():
action="store_true",
help="check serial number set on the board.",
)
- group.add_argument("-m", "--mux", type=str, default=None, help="mux selection")
+ group.add_argument(
+ "-m", "--mux", type=str, default=None, help="mux selection"
+ )
group.add_argument("-p", "--power", action="store_true", help="check VBUS")
- group.add_argument("-l", "--powerlog", type=int, default=None, help="log VBUS")
+ group.add_argument(
+ "-l", "--powerlog", type=int, default=None, help="log VBUS"
+ )
group.add_argument(
"-r", "--sysjump", type=str, default=None, help="region selection"
)
diff --git a/extra/usb_power/convert_power_log_board.py b/extra/usb_power/convert_power_log_board.py
index 159b4621d7..a555ee30ea 100644
--- a/extra/usb_power/convert_power_log_board.py
+++ b/extra/usb_power/convert_power_log_board.py
@@ -55,11 +55,15 @@ def write_to_file(file, sweetberry, inas):
# EX : ('sweetberry', 0x40, 'SB_FW_CAM_2P8', 5.0, 1.000, 3, False),
channel, i2c_addr = Spower.CHMAP[rec["channel"]]
- record = " ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')" ",\n" % (
- i2c_addr,
- rec["name"],
- rec["rs"],
- channel,
+ record = (
+ " ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')"
+ ",\n"
+ % (
+ i2c_addr,
+ rec["name"],
+ rec["rs"],
+ channel,
+ )
)
pyfile.write(record)
diff --git a/extra/usb_power/convert_servo_ina.py b/extra/usb_power/convert_servo_ina.py
index c18b7368dc..f3840ddc7c 100755
--- a/extra/usb_power/convert_servo_ina.py
+++ b/extra/usb_power/convert_servo_ina.py
@@ -61,10 +61,13 @@ def main(argv):
boardfile.write(",\n")
scenario.write(",\n")
- record = ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}' % (
- rec[2],
- rec[4],
- rec[1] - 64,
+ record = (
+ ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}'
+ % (
+ rec[2],
+ rec[4],
+ rec[1] - 64,
+ )
)
boardfile.write(record)
scenario.write('"%s"' % rec[2])
diff --git a/extra/usb_power/powerlog.py b/extra/usb_power/powerlog.py
index 44dd6ec12f..3649c9f411 100755
--- a/extra/usb_power/powerlog.py
+++ b/extra/usb_power/powerlog.py
@@ -27,11 +27,17 @@ import usb # pylint:disable=import-error
from stats_manager import StatsManager # pylint:disable=import-error
# Directory where hdctools installs configuration files into.
-LIB_DIR = os.path.join(sysconfig.get_python_lib(standard_lib=False), "servo", "data")
+LIB_DIR = os.path.join(
+ sysconfig.get_python_lib(standard_lib=False), "servo", "data"
+)
# Potential config file locations: current working directory, the same directory
# as powerlog.py file or LIB_DIR.
-CONFIG_LOCATIONS = [os.getcwd(), os.path.dirname(os.path.realpath(__file__)), LIB_DIR]
+CONFIG_LOCATIONS = [
+ os.getcwd(),
+ os.path.dirname(os.path.realpath(__file__)),
+ LIB_DIR,
+]
def logoutput(msg):
@@ -176,7 +182,9 @@ class Spower(object):
dev = d
break
if dev is None:
- raise Exception("Power", "USB device(%s) not found" % serialname)
+ raise Exception(
+ "Power", "USB device(%s) not found" % serialname
+ )
else:
dev = dev_list[0]
@@ -202,7 +210,9 @@ class Spower(object):
read_ep = usb.util.find_descriptor(
intf,
# match the first IN endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_IN,
)
@@ -212,7 +222,9 @@ class Spower(object):
write_ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_OUT,
)
@@ -227,7 +239,9 @@ class Spower(object):
"""Clear INA description struct."""
self._inas = []
- def append_ina_struct(self, name, rs, port, addr, data=None, ina_type=INA_POWER):
+ def append_ina_struct(
+ self, name, rs, port, addr, data=None, ina_type=INA_POWER
+ ):
"""Add an INA descriptor into the list of active INAs.
Args:
@@ -313,7 +327,9 @@ class Spower(object):
"""Clear pending reads on the stm32"""
try:
while True:
- ret = self.wr_command(b"", read_count=512, rtimeout=100, wtimeout=50)
+ ret = self.wr_command(
+ b"", read_count=512, rtimeout=100, wtimeout=50
+ )
self._logger.debug(
"Try Clear: read %s", "success" if ret == 0 else "failure"
)
@@ -324,7 +340,9 @@ class Spower(object):
"""Reset the power interface on the stm32"""
cmd = struct.pack("<H", self.CMD_RESET)
ret = self.wr_command(cmd, rtimeout=50, wtimeout=50)
- self._logger.debug("Command RESET: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command RESET: %s", "success" if ret == 0 else "failure"
+ )
def reset(self):
"""Try resetting the USB interface until success.
@@ -343,7 +361,9 @@ class Spower(object):
except Exception as e:
self.clear()
self.clear()
- self._logger.debug("TRY %d of %d: %s", count, max_reset_retry, e)
+ self._logger.debug(
+ "TRY %d of %d: %s", count, max_reset_retry, e
+ )
time.sleep(count * 0.01)
raise Exception("Power", "Failed to reset")
@@ -351,7 +371,9 @@ class Spower(object):
"""Stop any active data acquisition."""
cmd = struct.pack("<H", self.CMD_STOP)
ret = self.wr_command(cmd)
- self._logger.debug("Command STOP: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command STOP: %s", "success" if ret == 0 else "failure"
+ )
def start(self, integration_us):
"""Start data acquisition.
@@ -420,7 +442,9 @@ class Spower(object):
cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us)
ret = self.wr_command(cmd)
- self._logger.debug("Command SETTIME: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command SETTIME: %s", "success" if ret == 0 else "failure"
+ )
def add_ina(self, bus, ina_type, addr, extra, resistance, data=None):
"""Add an INA to the data acquisition list.
@@ -445,7 +469,9 @@ class Spower(object):
self.append_ina_struct(
name, resistance, bus, addr, data=data, ina_type=ina_type
)
- self._logger.debug("Command ADD_INA: %s", "success" if ret == 0 else "failure")
+ self._logger.debug(
+ "Command ADD_INA: %s", "success" if ret == 0 else "failure"
+ )
def report_header_size(self):
"""Helper function to calculate power record header size."""
@@ -481,13 +507,17 @@ class Spower(object):
if len(bytesread) == 1:
if bytesread[0] != 0x6:
self._logger.debug(
- "READ LINE FAILED bytes: %d ret: %02x", len(bytesread), bytesread[0]
+ "READ LINE FAILED bytes: %d ret: %02x",
+ len(bytesread),
+ bytesread[0],
)
return None
if len(bytesread) % expected_bytes != 0:
self._logger.debug(
- "READ LINE WARNING: expected %d, got %d", expected_bytes, len(bytesread)
+ "READ LINE WARNING: expected %d, got %d",
+ expected_bytes,
+ len(bytesread),
)
packet_count = len(bytesread) // expected_bytes
@@ -694,7 +724,8 @@ class powerlog(object):
raise Exception(
"Invalid INA type",
"Type of %s [%s] not recognized,"
- " try one of POWER, BUSV, CURRENT" % (entry[0], entry[1]),
+ " try one of POWER, BUSV, CURRENT"
+ % (entry[0], entry[1]),
)
else:
name = entry
@@ -774,7 +805,9 @@ class powerlog(object):
aggregate_record["boards"].add(record["berry"])
else:
self._logger.info(
- "break %s, %s", record["berry"], aggregate_record["boards"]
+ "break %s, %s",
+ record["berry"],
+ aggregate_record["boards"],
)
break
@@ -784,7 +817,10 @@ class powerlog(object):
if name in aggregate_record:
multiplier = (
0.001
- if (self._use_mW and name[1] == Spower.INA_POWER)
+ if (
+ self._use_mW
+ and name[1] == Spower.INA_POWER
+ )
else 1
)
value = aggregate_record[name] * multiplier
@@ -826,7 +862,9 @@ def main(argv=None):
if argv is None:
argv = sys.argv[1:]
# Command line argument description.
- parser = argparse.ArgumentParser(description="Gather CSV data from sweetberry")
+ parser = argparse.ArgumentParser(
+ description="Gather CSV data from sweetberry"
+ )
parser.add_argument(
"-b",
"--board",
@@ -842,10 +880,18 @@ def main(argv=None):
default="",
)
parser.add_argument(
- "-A", "--serial", type=str, help="Serial number of sweetberry A", default=""
+ "-A",
+ "--serial",
+ type=str,
+ help="Serial number of sweetberry A",
+ default="",
)
parser.add_argument(
- "-B", "--serial_b", type=str, help="Serial number of sweetberry B", default=""
+ "-B",
+ "--serial_b",
+ type=str,
+ help="Serial number of sweetberry B",
+ default="",
)
parser.add_argument(
"-t",
@@ -855,7 +901,11 @@ def main(argv=None):
default=100000,
)
parser.add_argument(
- "-s", "--seconds", type=float, help="Seconds to run capture", default=0.0
+ "-s",
+ "--seconds",
+ type=float,
+ help="Seconds to run capture",
+ default=0.0,
)
parser.add_argument(
"--date",
@@ -876,7 +926,10 @@ def main(argv=None):
action="store_true",
)
parser.add_argument(
- "--slow", default=False, help="Intentionally overflow", action="store_true"
+ "--slow",
+ default=False,
+ help="Intentionally overflow",
+ action="store_true",
)
parser.add_argument(
"--print_stats",
@@ -917,7 +970,8 @@ def main(argv=None):
dest="print_raw_data",
default=True,
action="store_false",
- help="Not print raw sweetberry readings at real time, default is to " "print",
+ help="Not print raw sweetberry readings at real time, default is to "
+ "print",
)
parser.add_argument(
"--save_raw_data",
@@ -952,7 +1006,9 @@ def main(argv=None):
# if powerlog is used through main, log to sys.stdout
if __name__ == "__main__":
stdout_handler = logging.StreamHandler(sys.stdout)
- stdout_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
+ stdout_handler.setFormatter(
+ logging.Formatter("%(levelname)s: %(message)s")
+ )
root_logger.addHandler(stdout_handler)
integration_us_request = args.integration_us
diff --git a/extra/usb_power/stats_manager.py b/extra/usb_power/stats_manager.py
index 0f92916251..d200d66af7 100644
--- a/extra/usb_power/stats_manager.py
+++ b/extra/usb_power/stats_manager.py
@@ -84,7 +84,9 @@ class StatsManager(object):
"""
# pylint: disable=W0102
- def __init__(self, smid="", title="", order=[], hide_domains=[], accept_nan=True):
+ def __init__(
+ self, smid="", title="", order=[], hide_domains=[], accept_nan=True
+ ):
"""Initialize infrastructure for data and their statistics."""
self._title = title
self._data = collections.defaultdict(list)
@@ -112,11 +114,15 @@ class StatsManager(object):
except ValueError:
# if we don't accept nan this will be caught below
self._logger.debug(
- "sample %s for domain %s is not a number. Making NaN", sample, domain
+ "sample %s for domain %s is not a number. Making NaN",
+ sample,
+ domain,
)
sample = float("NaN")
if not self._accept_nan and math.isnan(sample):
- raise StatsManagerError("accept_nan is false. Cannot add NaN sample.")
+ raise StatsManagerError(
+ "accept_nan is false. Cannot add NaN sample."
+ )
self._data[domain].append(sample)
if math.isnan(sample):
self._nan_domains.add(domain)
@@ -133,7 +139,8 @@ class StatsManager(object):
"""
if domain in self._unit:
self._logger.warning(
- "overwriting the unit of %s, old unit is %s, new " "unit is %s.",
+ "overwriting the unit of %s, old unit is %s, new "
+ "unit is %s.",
domain,
self._unit[domain],
unit,
@@ -179,7 +186,9 @@ class StatsManager(object):
table = [headers]
# determine what domains to display & and the order
domains_to_display = self.DomainsToDisplay
- display_order = [key for key in self._order if key in domains_to_display]
+ display_order = [
+ key for key in self._order if key in domains_to_display
+ ]
domains_to_display -= set(display_order)
display_order.extend(sorted(domains_to_display))
for domain in display_order:
@@ -259,8 +268,13 @@ class StatsManager(object):
# line is a seperator line consisting of -----
line = "%s%s" % (prefix, "-" * (line_length - dec_length))
# prepend the prefix to the centered title
- padded_title = "%s%s" % (prefix, title.center(line_length)[dec_length:])
- formatted_lines = [line, padded_title, line] + formatted_lines + [line]
+ padded_title = "%s%s" % (
+ prefix,
+ title.center(line_length)[dec_length:],
+ )
+ formatted_lines = (
+ [line, padded_title, line] + formatted_lines + [line]
+ )
formatted_output = "\n".join(formatted_lines)
return formatted_output
@@ -399,7 +413,9 @@ class StatsManager(object):
for domain, data in self._data.items():
if not domain.endswith(self._unit[domain]):
domain = "%s_%s" % (domain, self._unit[domain])
- fname = self._MakeUniqueFName(os.path.join(dirname, "%s.txt" % domain))
+ fname = self._MakeUniqueFName(
+ os.path.join(dirname, "%s.txt" % domain)
+ )
with open(fname, "w") as f:
f.write("\n".join("%.2f" % sample for sample in data) + "\n")
fnames.append(fname)
diff --git a/extra/usb_power/stats_manager_unittest.py b/extra/usb_power/stats_manager_unittest.py
index 2a7317546c..6c2d4be771 100644
--- a/extra/usb_power/stats_manager_unittest.py
+++ b/extra/usb_power/stats_manager_unittest.py
@@ -228,7 +228,9 @@ class TestStatsManager(unittest.TestCase):
"""Order passed into StatsManager is honoured when formatting summary."""
# StatsManager that should print D & B first, and the subsequent elements
# are sorted.
- d_b_a_c_regexp = re.compile("D-domain.*B-domain.*A-domain.*C-domain", re.DOTALL)
+ d_b_a_c_regexp = re.compile(
+ "D-domain.*B-domain.*A-domain.*C-domain", re.DOTALL
+ )
data = stats_manager.StatsManager(order=["D-domain", "B-domain"])
data.AddSample("A-domain", 17)
data.AddSample("B-domain", 17)
@@ -255,7 +257,9 @@ class TestStatsManager(unittest.TestCase):
# Assert the reported fname is the same as the expected fname
self.assertEqual(expected_fname, fname)
# Assert only the reported fname is output (in the tempdir)
- self.assertEqual(set([os.path.basename(fname)]), set(os.listdir(self.tempdir)))
+ self.assertEqual(
+ set([os.path.basename(fname)]), set(os.listdir(self.tempdir))
+ )
with open(fname, "r") as f:
self.assertEqual(
"@@ NAME COUNT MEAN STDDEV MAX MIN\n",
@@ -287,7 +291,9 @@ class TestStatsManager(unittest.TestCase):
# Assert the reported fname is the same as the expected fname
self.assertEqual(expected_fname, fname)
# Assert only the reported fname is output (in the tempdir)
- self.assertEqual(set([os.path.basename(fname)]), set(os.listdir(self.tempdir)))
+ self.assertEqual(
+ set([os.path.basename(fname)]), set(os.listdir(self.tempdir))
+ )
with open(fname, "r") as f:
summary = json.load(f)
self.assertAlmostEqual(100000.0, summary["A"]["mean"])
diff --git a/extra/usb_serial/console.py b/extra/usb_serial/console.py
index 839330d86a..49b731ad42 100755
--- a/extra/usb_serial/console.py
+++ b/extra/usb_serial/console.py
@@ -71,7 +71,9 @@ class Susb:
WRITE_ENDPOINT = 0x1
TIMEOUT_MS = 100
- def __init__(self, vendor=0x18D1, product=0x500F, interface=1, serialname=None):
+ def __init__(
+ self, vendor=0x18D1, product=0x500F, interface=1, serialname=None
+ ):
"""Susb constructor.
Discovers and connects to USB endpoints.
@@ -109,7 +111,9 @@ class Susb:
try:
dev = dev_list[0]
except:
- raise SusbError("USB device %04x:%04x not found" % (vendor, product))
+ raise SusbError(
+ "USB device %04x:%04x not found" % (vendor, product)
+ )
# If we can't set configuration, it's already been set.
try:
@@ -130,11 +134,15 @@ class Susb:
dev.detach_kernel_driver(intf.bInterfaceNumber)
read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
- read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
+ read_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=read_ep_number
+ )
self._read_ep = read_ep
write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
- write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
+ write_ep = usb.util.find_descriptor(
+ intf, bEndpointAddress=write_ep_number
+ )
self._write_ep = write_ep
@@ -163,7 +171,9 @@ class SuartError(Exception):
class Suart:
"""Provide interface to serial usb endpoint."""
- def __init__(self, vendor=0x18D1, product=0x501C, interface=0, serialname=None):
+ def __init__(
+ self, vendor=0x18D1, product=0x501C, interface=0, serialname=None
+ ):
"""Suart contstructor.
Initializes USB stream interface.
@@ -179,7 +189,10 @@ class Suart:
"""
self._done = threading.Event()
self._susb = Susb(
- vendor=vendor, product=product, interface=interface, serialname=serialname
+ vendor=vendor,
+ product=product,
+ interface=interface,
+ serialname=serialname,
)
def wait_until_done(self, timeout=None):
@@ -239,7 +252,11 @@ class Suart:
parser = argparse.ArgumentParser(description="Open a console to a USB device")
parser.add_argument(
- "-d", "--device", type=str, help="vid:pid of target device", default="18d1:501c"
+ "-d",
+ "--device",
+ type=str,
+ help="vid:pid of target device",
+ default="18d1:501c",
)
parser.add_argument(
"-i", "--interface", type=int, help="interface number of console", default=0
@@ -272,7 +289,9 @@ def runconsole():
serialno = args.serialno
interface = args.interface
- sobj = Suart(vendor=vid, product=pid, interface=interface, serialname=serialno)
+ sobj = Suart(
+ vendor=vid, product=pid, interface=interface, serialname=serialno
+ )
if sys.stdin.isatty():
tty.setraw(sys.stdin.fileno())
sobj.run()
diff --git a/extra/usb_updater/fw_update.py b/extra/usb_updater/fw_update.py
index 8658d92c48..ca4e0e2e94 100755
--- a/extra/usb_updater/fw_update.py
+++ b/extra/usb_updater/fw_update.py
@@ -113,7 +113,9 @@ class Supdate(object):
read_ep = usb.util.find_descriptor(
intf,
# match the first IN endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_IN,
)
@@ -123,7 +125,9 @@ class Supdate(object):
write_ep = usb.util.find_descriptor(
intf,
# match the first OUT endpoint
- custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
== usb.util.ENDPOINT_OUT,
)
@@ -267,7 +271,9 @@ class Supdate(object):
result = struct.unpack("<I", read)
result = result[0]
if result != 0:
- raise Exception("Update", "Upload failed with rc: 0x%x" % result)
+ raise Exception(
+ "Update", "Upload failed with rc: 0x%x" % result
+ )
def start(self):
"""Start a transaction and erase currently inactive region.
@@ -297,7 +303,9 @@ class Supdate(object):
log("Update protocol v. %d" % version)
log("Available flash region base: %x" % base)
else:
- raise Exception("Update", "Start command returned %d bytes" % len(read))
+ raise Exception(
+ "Update", "Start command returned %d bytes" % len(read)
+ )
if base < 256:
raise Exception("Update", "Start returned error code 0x%x" % base)
@@ -380,7 +388,8 @@ class Supdate(object):
if self._filesize != self._flashsize:
raise Exception(
"Update",
- "Flash size 0x%x != file size 0x%x" % (self._flashsize, self._filesize),
+ "Flash size 0x%x != file size 0x%x"
+ % (self._flashsize, self._filesize),
)
@@ -396,9 +405,13 @@ parser.add_argument(
parser.add_argument(
"-f", "--file", type=str, help="Complete ec.bin file", default="ec.bin"
)
-parser.add_argument("-s", "--serial", type=str, help="Serial number", default="")
+parser.add_argument(
+ "-s", "--serial", type=str, help="Serial number", default=""
+)
parser.add_argument("-l", "--list", action="store_true", help="List regions")
-parser.add_argument("-v", "--verbose", action="store_true", help="Chatty output")
+parser.add_argument(
+ "-v", "--verbose", action="store_true", help="Chatty output"
+)
def main():
diff --git a/extra/usb_updater/servo_updater.py b/extra/usb_updater/servo_updater.py
index 7fa4608289..b5c23a9ad1 100755
--- a/extra/usb_updater/servo_updater.py
+++ b/extra/usb_updater/servo_updater.py
@@ -97,7 +97,9 @@ def do_with_retries(func, *args):
time.sleep(RETRIES_DELAY)
continue
- raise Exception("'{}' failed after {} retries".format(func.__name__, RETRIES_COUNT))
+ raise Exception(
+ "'{}' failed after {} retries".format(func.__name__, RETRIES_COUNT)
+ )
def flash(brdfile, serialno, binfile):
@@ -144,7 +146,9 @@ def flash2(vidpid, serialno, binfile):
print(cmd)
help_cmd = "%s --help" % tool
with open("/dev/null") as devnull:
- valid_check = subprocess.call(help_cmd.split(), stdout=devnull, stderr=devnull)
+ valid_check = subprocess.call(
+ help_cmd.split(), stdout=devnull, stderr=devnull
+ )
if valid_check:
raise ServoUpdaterException(
"%s exit with res = %d. Make sure the tool "
@@ -236,7 +240,9 @@ def do_updater_version(tinys):
return 2
else:
return 6
- raise ServoUpdaterException("Can't determine updater target from vers: [%s]" % vers)
+ raise ServoUpdaterException(
+ "Can't determine updater target from vers: [%s]" % vers
+ )
def _extract_version(boardname, binfile):
@@ -260,7 +266,9 @@ def _extract_version(boardname, binfile):
if m:
newvers = m.group(0).strip(" \t\r\n\0")
else:
- raise ServoUpdaterException("Can't find version from file: %s." % binfile)
+ raise ServoUpdaterException(
+ "Can't find version from file: %s." % binfile
+ )
return newvers
@@ -305,7 +313,9 @@ def get_files_and_version(cname, fname=None, channel=DEFAULT_CHANNEL):
if os.path.exists(updater_path):
break
else:
- raise ServoUpdaterException("servo_updater/ dir not found in known spots.")
+ raise ServoUpdaterException(
+ "servo_updater/ dir not found in known spots."
+ )
firmware_path = os.path.join(updater_path, FIRMWARE_DIR)
configs_path = os.path.join(updater_path, CONFIGS_DIR)
@@ -338,7 +348,9 @@ def get_files_and_version(cname, fname=None, channel=DEFAULT_CHANNEL):
if os.path.isfile(newname):
fname = newname
else:
- raise ServoUpdaterException("Can't find firmware binary: %s." % binary_file)
+ raise ServoUpdaterException(
+ "Can't find firmware binary: %s." % binary_file
+ )
elif not os.path.isfile(fname):
# If a name is specified but not found, try the default path.
newname = os.path.join(firmware_path, fname)
@@ -365,7 +377,11 @@ def main():
help="only print available firmware for board/channel",
)
parser.add_argument(
- "-s", "--serialno", type=str, help="serial number to program", default=None
+ "-s",
+ "--serialno",
+ type=str,
+ help="serial number to program",
+ default=None,
)
parser.add_argument(
"-b",
@@ -392,9 +408,14 @@ def main():
help="Update even if version match",
default=False,
)
- parser.add_argument("-v", "--verbose", action="store_true", help="Chatty output")
parser.add_argument(
- "-r", "--reboot", action="store_true", help="Always reboot, even after probe."
+ "-v", "--verbose", action="store_true", help="Chatty output"
+ )
+ parser.add_argument(
+ "-r",
+ "--reboot",
+ action="store_true",
+ help="Always reboot, even after probe.",
)
args = parser.parse_args()
diff --git a/firmware_builder.py b/firmware_builder.py
index 1a8f52df56..980486df6a 100755
--- a/firmware_builder.py
+++ b/firmware_builder.py
@@ -46,7 +46,9 @@ def build(opts):
metric_list = firmware_pb2.FwBuildMetricList()
# Run formatting checks on all python files.
- subprocess.run(["black", "--check", "."], cwd=os.path.dirname(__file__), check=True)
+ subprocess.run(
+ ["black", "--check", "."], cwd=os.path.dirname(__file__), check=True
+ )
subprocess.run(
[
"isort",
@@ -217,9 +219,13 @@ def test(opts):
["util/ec3po/run_tests.sh"], cwd=os.path.dirname(__file__), check=True
)
subprocess.run(
- ["extra/stack_analyzer/run_tests.sh"], cwd=os.path.dirname(__file__), check=True
+ ["extra/stack_analyzer/run_tests.sh"],
+ cwd=os.path.dirname(__file__),
+ check=True,
+ )
+ subprocess.run(
+ ["util/run_tests.sh"], cwd=os.path.dirname(__file__), check=True
)
- subprocess.run(["util/run_tests.sh"], cwd=os.path.dirname(__file__), check=True)
# If building for code coverage, build the 'coverage' target, which
# builds the posix-based unit tests for code coverage and assembles
diff --git a/pylintrc b/pylintrc
index 5af5f4e4dd..68f27ecf12 100644
--- a/pylintrc
+++ b/pylintrc
@@ -15,5 +15,4 @@ disable=
[format]
-max-line-length=88
string-quote=double
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000000..83c116eb5f
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,2 @@
+[tool.black]
+line-length = 80
diff --git a/test/run_device_tests.py b/test/run_device_tests.py
index 3d7e709c0e..bf77e6d02a 100755
--- a/test/run_device_tests.py
+++ b/test/run_device_tests.py
@@ -108,7 +108,9 @@ DARTMONKEY_IMAGE_PATH = os.path.join(
NOCTURNE_FP_IMAGE_PATH = os.path.join(
TEST_ASSETS_BUCKET, "nocturne_fp_v2.2.64-58cf5974e.bin"
)
-NAMI_FP_IMAGE_PATH = os.path.join(TEST_ASSETS_BUCKET, "nami_fp_v2.2.144-7a08e07eb.bin")
+NAMI_FP_IMAGE_PATH = os.path.join(
+ TEST_ASSETS_BUCKET, "nami_fp_v2.2.144-7a08e07eb.bin"
+)
BLOONCHIPPER_V4277_IMAGE_PATH = os.path.join(
TEST_ASSETS_BUCKET, "bloonchipper_v2.0.4277-9f652bb3.bin"
)
@@ -161,7 +163,10 @@ class TestConfig:
def __post_init__(self):
if self.finish_regexes is None:
- self.finish_regexes = [ALL_TESTS_PASSED_REGEX, ALL_TESTS_FAILED_REGEX]
+ self.finish_regexes = [
+ ALL_TESTS_PASSED_REGEX,
+ ALL_TESTS_FAILED_REGEX,
+ ]
if self.fail_regexes is None:
self.fail_regexes = [
SINGLE_CHECK_FAILED_REGEX,
@@ -193,7 +198,9 @@ class AllTests:
TestConfig(test_name="cortexm_fpu"),
TestConfig(test_name="crc"),
TestConfig(
- test_name="flash_physical", image_to_use=ImageType.RO, toggle_power=True
+ test_name="flash_physical",
+ image_to_use=ImageType.RO,
+ toggle_power=True,
),
TestConfig(
test_name="flash_write_protect",
@@ -209,7 +216,9 @@ class AllTests:
test_args=["spi"],
),
TestConfig(
- config_name="fpsensor_spi_rw", test_name="fpsensor", test_args=["spi"]
+ config_name="fpsensor_spi_rw",
+ test_name="fpsensor",
+ test_args=["spi"],
),
TestConfig(
config_name="fpsensor_uart_ro",
@@ -218,7 +227,9 @@ class AllTests:
test_args=["uart"],
),
TestConfig(
- config_name="fpsensor_uart_rw", test_name="fpsensor", test_args=["uart"]
+ config_name="fpsensor_uart_rw",
+ test_name="fpsensor",
+ test_args=["uart"],
),
TestConfig(
config_name="mpu_ro",
@@ -281,7 +292,10 @@ class AllTests:
TestConfig(
config_name="panic_data_" + variant_name,
test_name="panic_data",
- fail_regexes=[SINGLE_CHECK_FAILED_REGEX, ALL_TESTS_FAILED_REGEX],
+ fail_regexes=[
+ SINGLE_CHECK_FAILED_REGEX,
+ ALL_TESTS_FAILED_REGEX,
+ ],
ro_image=variant_info.get("ro_image_path"),
build_board=variant_info.get("build_board"),
)
@@ -320,8 +334,12 @@ BLOONCHIPPER_CONFIG = BoardConfig(
rollback_region1_regex=DATA_ACCESS_VIOLATION_8040000_REGEX,
mpu_regex=DATA_ACCESS_VIOLATION_20000000_REGEX,
variants={
- "bloonchipper_v2.0.4277": {"ro_image_path": BLOONCHIPPER_V4277_IMAGE_PATH},
- "bloonchipper_v2.0.5938": {"ro_image_path": BLOONCHIPPER_V5938_IMAGE_PATH},
+ "bloonchipper_v2.0.4277": {
+ "ro_image_path": BLOONCHIPPER_V4277_IMAGE_PATH
+ },
+ "bloonchipper_v2.0.5938": {
+ "ro_image_path": BLOONCHIPPER_V5938_IMAGE_PATH
+ },
},
)
@@ -452,7 +470,9 @@ def power(board_config: BoardConfig, on: bool) -> None:
board_config.servo_power_enable + ":" + state,
]
logging.debug('Running command: "%s"', " ".join(cmd))
- subprocess.run(cmd).check_returncode() # pylint: disable=subprocess-run-check
+ subprocess.run(
+ cmd
+ ).check_returncode() # pylint: disable=subprocess-run-check
def hw_write_protect(enable: bool) -> None:
@@ -467,7 +487,9 @@ def hw_write_protect(enable: bool) -> None:
"fw_wp_state:" + state,
]
logging.debug('Running command: "%s"', " ".join(cmd))
- subprocess.run(cmd).check_returncode() # pylint: disable=subprocess-run-check
+ subprocess.run(
+ cmd
+ ).check_returncode() # pylint: disable=subprocess-run-check
def build(test_name: str, board_name: str, compiler: str) -> None:
@@ -484,7 +506,9 @@ def build(test_name: str, board_name: str, compiler: str) -> None:
]
logging.debug('Running command: "%s"', " ".join(cmd))
- subprocess.run(cmd).check_returncode() # pylint: disable=subprocess-run-check
+ subprocess.run(
+ cmd
+ ).check_returncode() # pylint: disable=subprocess-run-check
def flash(
@@ -512,7 +536,9 @@ def flash(
]
)
logging.debug('Running command: "%s"', " ".join(cmd))
- completed_process = subprocess.run(cmd) # pylint: disable=subprocess-run-check
+ completed_process = subprocess.run(
+ cmd
+ ) # pylint: disable=subprocess-run-check
return completed_process.returncode == 0
@@ -538,7 +564,9 @@ def readline(
return None
-def readlines_until_timeout(executor, f: BinaryIO, timeout_secs: int) -> List[bytes]:
+def readlines_until_timeout(
+ executor, f: BinaryIO, timeout_secs: int
+) -> List[bytes]:
"""Continuously read lines for timeout_secs."""
lines: List[bytes] = []
while True:
@@ -641,7 +669,10 @@ def get_test_list(config: BoardConfig, test_args) -> List[TestConfig]:
def flash_and_run_test(
- test: TestConfig, board_config: BoardConfig, args: argparse.Namespace, executor
+ test: TestConfig,
+ board_config: BoardConfig,
+ args: argparse.Namespace,
+ executor,
) -> bool:
"""Run a single test using the test and board configuration specified"""
build_board = args.board
@@ -672,13 +703,17 @@ def flash_and_run_test(
flash_succeeded = False
for i in range(0, test.num_flash_attempts):
logging.debug("Flash attempt %d", i + 1)
- if flash(image_path, args.board, args.flasher, args.remote, args.jlink_port):
+ if flash(
+ image_path, args.board, args.flasher, args.remote, args.jlink_port
+ ):
flash_succeeded = True
break
time.sleep(1)
if not flash_succeeded:
- logging.debug("Flashing failed after max attempts: %d", test.num_flash_attempts)
+ logging.debug(
+ "Flashing failed after max attempts: %d", test.num_flash_attempts
+ )
return False
if test.toggle_power:
@@ -775,13 +810,19 @@ def main():
)
log_level_choices = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
- parser.add_argument("--log_level", "-l", choices=log_level_choices, default="DEBUG")
+ parser.add_argument(
+ "--log_level", "-l", choices=log_level_choices, default="DEBUG"
+ )
flasher_choices = [SERVO_MICRO, JTRACE]
- parser.add_argument("--flasher", "-f", choices=flasher_choices, default=JTRACE)
+ parser.add_argument(
+ "--flasher", "-f", choices=flasher_choices, default=JTRACE
+ )
compiler_options = [GCC, CLANG]
- parser.add_argument("--compiler", "-c", choices=compiler_options, default=GCC)
+ parser.add_argument(
+ "--compiler", "-c", choices=compiler_options, default=GCC
+ )
# This might be expanded to serve as a "remote" for flash_ec also, so
# we will leave it generic.
@@ -793,7 +834,10 @@ def main():
)
parser.add_argument(
- "--jlink_port", "-j", type=int, help="The port to use when connecting to JLink."
+ "--jlink_port",
+ "-j",
+ type=int,
+ help="The port to use when connecting to JLink.",
)
parser.add_argument(
"--console_port",
diff --git a/test/timer_calib.py b/test/timer_calib.py
index c7ac9fc23b..0de0d6b4e7 100644
--- a/test/timer_calib.py
+++ b/test/timer_calib.py
@@ -10,21 +10,23 @@ import time
def one_pass(helper):
helper.wait_output("=== Timer calibration ===")
- res = helper.wait_output("back-to-back get_time : (?P<lat>[0-9]+) us", use_re=True)[
- "lat"
- ]
+ res = helper.wait_output(
+ "back-to-back get_time : (?P<lat>[0-9]+) us", use_re=True
+ )["lat"]
minlat = int(res)
helper.trace("get_time latency %d us\n" % minlat)
helper.wait_output("sleep 1s")
t0 = time.time()
- second = helper.wait_output("done. delay = (?P<second>[0-9]+) us", use_re=True)[
- "second"
- ]
+ second = helper.wait_output(
+ "done. delay = (?P<second>[0-9]+) us", use_re=True
+ )["second"]
t1 = time.time()
secondreal = t1 - t0
secondlat = int(second) - 1000000
- helper.trace("1s timer latency %d us / real time %f s\n" % (secondlat, secondreal))
+ helper.trace(
+ "1s timer latency %d us / real time %f s\n" % (secondlat, secondreal)
+ )
us = {}
for pow2 in range(7):
diff --git a/util/build_with_clang.py b/util/build_with_clang.py
index 98da942152..3606e53657 100755
--- a/util/build_with_clang.py
+++ b/util/build_with_clang.py
@@ -41,7 +41,9 @@ def main() -> int:
parser = argparse.ArgumentParser()
log_level_choices = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
- parser.add_argument("--log_level", "-l", choices=log_level_choices, default="DEBUG")
+ parser.add_argument(
+ "--log_level", "-l", choices=log_level_choices, default="DEBUG"
+ )
parser.add_argument(
"--num_threads", "-j", type=int, default=multiprocessing.cpu_count()
@@ -67,7 +69,8 @@ def main() -> int:
if len(failed_boards) > 0:
logging.error(
- "The following boards failed to compile:\n%s", "\n".join(failed_boards)
+ "The following boards failed to compile:\n%s",
+ "\n".join(failed_boards),
)
return 1
diff --git a/util/config_option_check.py b/util/config_option_check.py
index 8263e61709..c1e57dfc82 100755
--- a/util/config_option_check.py
+++ b/util/config_option_check.py
@@ -172,7 +172,8 @@ def print_missing_config_options(hunks, config_options):
for l in h.lines:
# Check for the existence of a CONFIG_* in the line.
match = filter(
- lambda opt: opt in ALLOWLIST_CONFIGS, config_option_re.findall(l.string)
+ lambda opt: opt in ALLOWLIST_CONFIGS,
+ config_option_re.findall(l.string),
)
if not match:
continue
@@ -189,7 +190,10 @@ def print_missing_config_options(hunks, config_options):
# is no longer being used in the entire repo.
if l.line_type == "-":
- if option not in options_in_use and option in config_options:
+ if (
+ option not in options_in_use
+ and option in config_options
+ ):
deprecated_options.add(option)
else:
violations.add(option)
@@ -370,7 +374,9 @@ def get_hunks():
line_type = match.groups(1)[0]
# We only care about modifications.
if line_type != " ":
- hunk_lines.append(Line(line_num, match.groups(2)[1], line_type))
+ hunk_lines.append(
+ Line(line_num, match.groups(2)[1], line_type)
+ )
# Deletions don't count towards the line numbers.
if line_type != "-":
line_num += 1
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index 3b3c1c89f5..289c40f825 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -140,7 +140,13 @@ class Console(object):
"""
def __init__(
- self, controller_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe, name=None
+ self,
+ controller_pty,
+ user_pty,
+ interface_pty,
+ cmd_pipe,
+ dbg_pipe,
+ name=None,
):
"""Initalises a Console object with the provided arguments.
@@ -320,7 +326,9 @@ class Console(object):
# Restore the partial cmd.
if self.history_pos == len(self.history):
- self.logger.debug("Restoring partial command of %r", self.partial_cmd)
+ self.logger.debug(
+ "Restoring partial command of %r", self.partial_cmd
+ )
# Backspace the line.
for _ in range(self.input_buffer_pos):
self.SendBackspace()
@@ -439,7 +447,9 @@ class Console(object):
else:
self.logger.error(
- r"Bad or unhandled escape sequence. got ^[%c\(%d)", chr(byte), byte
+ r"Bad or unhandled escape sequence. got ^[%c\(%d)",
+ chr(byte),
+ byte,
)
self.esc_state = 0
return
@@ -468,7 +478,9 @@ class Console(object):
# END key.
if byte == ord("~"):
self.logger.debug("End key pressed.")
- self.MoveCursor("right", len(self.input_buffer) - self.input_buffer_pos)
+ self.MoveCursor(
+ "right", len(self.input_buffer) - self.input_buffer_pos
+ )
self.esc_state = 0 # Reset the state.
self.logger.debug("ESC sequence complete.")
return
@@ -488,7 +500,11 @@ class Console(object):
return
# Don't store 2 consecutive identical commands in the history.
- if self.history and self.history[-1] != self.input_buffer or not self.history:
+ if (
+ self.history
+ and self.history[-1] != self.input_buffer
+ or not self.history
+ ):
self.history.append(self.input_buffer)
# Split the command up by spaces.
@@ -536,13 +552,15 @@ class Console(object):
# Increase the interrogation timeout for stability purposes.
self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT
self.logger.debug(
- "Increasing interrogation timeout to %rs.", self.interrogation_timeout
+ "Increasing interrogation timeout to %rs.",
+ self.interrogation_timeout,
)
else:
# Reduce the timeout in order to reduce the perceivable delay.
self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
self.logger.debug(
- "Reducing interrogation timeout to %rs.", self.interrogation_timeout
+ "Reducing interrogation timeout to %rs.",
+ self.interrogation_timeout,
)
return is_enhanced
@@ -582,7 +600,8 @@ class Console(object):
if self.pending_oobm_cmd:
self.oobm_queue.put(self.pending_oobm_cmd)
self.logger.debug(
- "Placed %r into OOBM command queue.", self.pending_oobm_cmd
+ "Placed %r into OOBM command queue.",
+ self.pending_oobm_cmd,
)
# Reset the state.
@@ -687,7 +706,9 @@ class Console(object):
# Ctrl+E. Move cursor to end of the line.
elif byte == ControlKey.CTRL_E:
self.logger.debug("Control+E pressed.")
- self.MoveCursor("right", len(self.input_buffer) - self.input_buffer_pos)
+ self.MoveCursor(
+ "right", len(self.input_buffer) - self.input_buffer_pos
+ )
# Ctrl+F. Move cursor right 1 column.
elif byte == ControlKey.CTRL_F:
@@ -776,7 +797,9 @@ class Console(object):
self.input_buffer_pos += count
else:
- raise AssertionError(("The only valid directions are 'left' and 'right'"))
+ raise AssertionError(
+ ("The only valid directions are 'left' and 'right'")
+ )
self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
# Move the cursor.
@@ -792,7 +815,8 @@ class Console(object):
# correct the cursor.
if diff < 0:
self.logger.warning(
- "Resetting input buffer position to %d...", len(self.input_buffer)
+ "Resetting input buffer position to %d...",
+ len(self.input_buffer),
)
self.MoveCursor("left", -diff)
return
@@ -835,14 +859,16 @@ class Console(object):
mode = cmd[1].lower()
self.timestamp_enabled = mode == b"on"
self.logger.info(
- "%sabling uart timestamps.", "En" if self.timestamp_enabled else "Dis"
+ "%sabling uart timestamps.",
+ "En" if self.timestamp_enabled else "Dis",
)
elif cmd[0] == b"rawdebug":
mode = cmd[1].lower()
self.raw_debug = mode == b"on"
self.logger.info(
- "%sabling per interrupt debug logs.", "En" if self.raw_debug else "Dis"
+ "%sabling per interrupt debug logs.",
+ "En" if self.raw_debug else "Dis",
)
elif cmd[0] == b"interrogate" and len(cmd) >= 2:
@@ -858,10 +884,14 @@ class Console(object):
# Update the assumptions of the EC image.
self.enhanced_ec = enhanced
- self.logger.debug("Enhanced EC image is now %r", self.enhanced_ec)
+ self.logger.debug(
+ "Enhanced EC image is now %r", self.enhanced_ec
+ )
# Send command to interpreter as well.
- self.cmd_pipe.send(b"enhanced " + str(self.enhanced_ec).encode("ascii"))
+ self.cmd_pipe.send(
+ b"enhanced " + str(self.enhanced_ec).encode("ascii")
+ )
else:
self.PrintOOBMHelp()
@@ -904,7 +934,9 @@ class Console(object):
self.enhanced_ec = False
# Inform the interpreter of the result.
- self.cmd_pipe.send(b"enhanced " + str(self.enhanced_ec).encode("ascii"))
+ self.cmd_pipe.send(
+ b"enhanced " + str(self.enhanced_ec).encode("ascii")
+ )
self.logger.debug("Enhanced EC image? %r", self.enhanced_ec)
# Clear look buffer since a match was found.
@@ -952,7 +984,9 @@ def StartLoop(console, command_active, shutdown_pipe=None):
"""
try:
console.logger.debug("Console is being served on %s.", console.user_pty)
- console.logger.debug("Console controller is on %s.", console.controller_pty)
+ console.logger.debug(
+ "Console controller is on %s.", console.controller_pty
+ )
console.logger.debug(
"Command interface is being served on %s.", console.interface_pty
)
@@ -975,7 +1009,11 @@ def StartLoop(console, command_active, shutdown_pipe=None):
controller_connected = not events
# Check to see if pipes or the console are ready for reading.
- read_list = [console.interface_pty, console.cmd_pipe, console.dbg_pipe]
+ read_list = [
+ console.interface_pty,
+ console.cmd_pipe,
+ console.dbg_pipe,
+ ]
if controller_connected:
read_list.append(console.controller_pty)
if shutdown_pipe is not None:
@@ -995,7 +1033,9 @@ def StartLoop(console, command_active, shutdown_pipe=None):
# Ctrl+A, Ctrl+E, etc.
try:
line = bytearray(
- os.read(console.controller_pty, CONSOLE_MAX_READ)
+ os.read(
+ console.controller_pty, CONSOLE_MAX_READ
+ )
)
console.logger.debug(
"Input from user: %s, locked:%s",
@@ -1046,7 +1086,9 @@ def StartLoop(console, command_active, shutdown_pipe=None):
try:
data = console.cmd_pipe.recv()
except EOFError:
- console.logger.debug("ec3po console received EOF from cmd_pipe")
+ console.logger.debug(
+ "ec3po console received EOF from cmd_pipe"
+ )
continue_looping = False
else:
# Write it to the user console.
@@ -1066,7 +1108,9 @@ def StartLoop(console, command_active, shutdown_pipe=None):
try:
data = console.dbg_pipe.recv()
except EOFError:
- console.logger.debug("ec3po console received EOF from dbg_pipe")
+ console.logger.debug(
+ "ec3po console received EOF from dbg_pipe"
+ )
continue_looping = False
else:
if console.interrogation_mode == b"auto":
@@ -1151,10 +1195,14 @@ def main(argv):
)
parser.add_argument(
"ec_uart_pty",
- help=("The full PTY name that the EC UART is present on. eg: /dev/pts/12"),
+ help=(
+ "The full PTY name that the EC UART is present on. eg: /dev/pts/12"
+ ),
)
parser.add_argument(
- "--log-level", default="info", help="info, debug, warning, error, or critical"
+ "--log-level",
+ default="info",
+ help="info, debug, warning, error, or critical",
)
# Parse arguments.
@@ -1173,7 +1221,9 @@ def main(argv):
elif opts.log_level == "critical":
log_level = logging.CRITICAL
else:
- parser.error("Invalid log level. (info, debug, warning, error, critical)")
+ parser.error(
+ "Invalid log level. (info, debug, warning, error, critical)"
+ )
# Start logging with a timestamp, module, and log level shown in each log
# entry.
diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py
index 45ab2ff44c..dcf66810e0 100755
--- a/util/ec3po/console_unittest.py
+++ b/util/ec3po/console_unittest.py
@@ -625,7 +625,9 @@ class TestConsoleEditingMethods(unittest.TestCase):
# We expect the test string, followed by a jump to the beginning of the
# line, and finally a move right 1.
- exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str)))
+ exp_console_out = test_str + OutputStream.MoveCursorLeft(
+ len((test_str))
+ )
# A move right 1 column.
exp_console_out += OutputStream.MoveCursorRight(1)
@@ -653,7 +655,9 @@ class TestConsoleEditingMethods(unittest.TestCase):
# We expect the test string, followed by a jump to the beginning of the
# line, and finally a move right 1.
- exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str)))
+ exp_console_out = test_str + OutputStream.MoveCursorLeft(
+ len((test_str))
+ )
# A move right 1 column.
exp_console_out += OutputStream.MoveCursorRight(1)
@@ -704,7 +708,9 @@ class TestConsoleEditingMethods(unittest.TestCase):
test_str = b"accelinfo on"
input_stream = BytesToByteList(test_str)
# Jump to beginning of line and then kill it with Ctrl+K.
- input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K])
+ input_stream.extend(
+ [console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]
+ )
# Send the sequence out.
for byte in input_stream:
@@ -1207,7 +1213,8 @@ class TestConsoleCompatibility(unittest.TestCase):
# At this point, we should have negotiated to enhanced.
self.assertTrue(
- self.console.enhanced_ec, msg=("Did not negotiate to enhanced EC image.")
+ self.console.enhanced_ec,
+ msg=("Did not negotiate to enhanced EC image."),
)
# The command would have been dropped however, so verify this...
@@ -1623,7 +1630,8 @@ class TestOOBMConsoleCommands(unittest.TestCase):
# The EC image should be assumed to be enhanced.
self.assertTrue(
- self.console.enhanced_ec, "The image should be assumed to be enhanced."
+ self.console.enhanced_ec,
+ "The image should be assumed to be enhanced.",
)
diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py
index 591603e038..bfe7d1ed8d 100644
--- a/util/ec3po/interpreter.py
+++ b/util/ec3po/interpreter.py
@@ -147,7 +147,9 @@ class Interpreter(object):
command: A string which contains the command to be sent.
"""
self.ec_cmd_queue.put(command)
- self.logger.log(1, "Commands now in queue: %d", self.ec_cmd_queue.qsize())
+ self.logger.log(
+ 1, "Commands now in queue: %d", self.ec_cmd_queue.qsize()
+ )
# Add the EC UART as an output to be serviced.
if self.connected and self.ec_uart_pty not in self.outputs:
@@ -215,12 +217,16 @@ class Interpreter(object):
self.inputs.remove(fileobj)
if fileobj in self.outputs:
self.outputs.remove(fileobj)
- self.logger.debug("Removed fileobj. Remaining inputs: %r", self.inputs)
+ self.logger.debug(
+ "Removed fileobj. Remaining inputs: %r", self.inputs
+ )
# Close the file.
fileobj.close()
# Mark the interpreter as disconnected now.
self.connected = False
- self.logger.debug("Disconnected from %s.", self.ec_uart_pty_name)
+ self.logger.debug(
+ "Disconnected from %s.", self.ec_uart_pty_name
+ )
return
elif command == b"reconnect":
@@ -248,7 +254,9 @@ class Interpreter(object):
# Ignore any other commands while in the disconnected state.
self.logger.log(1, "command: '%s'", command)
if not self.connected:
- self.logger.debug("Ignoring command because currently disconnected.")
+ self.logger.debug(
+ "Ignoring command because currently disconnected."
+ )
return
# Remove leading and trailing spaces only if this is an enhanced EC image.
@@ -354,7 +362,9 @@ class Interpreter(object):
if self.enhanced_ec:
self.logger.debug("The current EC image seems enhanced.")
else:
- self.logger.debug("The current EC image does NOT seem enhanced.")
+ self.logger.debug(
+ "The current EC image does NOT seem enhanced."
+ )
# Done interrogating.
self.interrogating = False
# For now, just forward everything the EC sends us.
diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py
index 73188fab9f..b7a29baa57 100755
--- a/util/ec3po/interpreter_unittest.py
+++ b/util/ec3po/interpreter_unittest.py
@@ -40,7 +40,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
# Create the pipes that the interpreter will use.
self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
- self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
+ self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(
+ duplex=False
+ )
# Mock the open() function so we can inspect reads/writes to the EC.
self.ec_uart_pty = mock.mock_open()
@@ -91,7 +93,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
self.itpr.HandleUserData()
self.itpr.SendCmdToEC()
# Since the EC image is enhanced, we should have sent a packed command.
- expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
+ expected_ec_calls.append(
+ mock.call().write(self.itpr.PackCommand(test_cmd))
+ )
expected_ec_calls.append(mock.call().flush())
# Now that the first command was sent, we should send another command which
@@ -116,7 +120,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
self.itpr.HandleUserData()
self.itpr.SendCmdToEC()
# Since the EC image is enhanced, we should have sent a packed command.
- expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
+ expected_ec_calls.append(
+ mock.call().write(self.itpr.PackCommand(test_cmd))
+ )
expected_ec_calls.append(mock.call().flush())
# Finally, verify that the appropriate writes were actually sent to the EC.
@@ -156,7 +162,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
self.itpr.HandleUserData()
self.itpr.SendCmdToEC()
packed_cmd = self.itpr.PackCommand(test_cmd)
- expected_ec_calls.extend([mock.call().write(packed_cmd), mock.call().flush()])
+ expected_ec_calls.extend(
+ [mock.call().write(packed_cmd), mock.call().flush()]
+ )
# Have the EC return the error string twice.
mock_os.read.side_effect = [b"&&EE", b"&&EE"]
for i in range(2):
@@ -180,7 +188,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
self.itpr.SendCmdToEC()
# Now assume that the last one goes through with no trouble.
- expected_ec_calls.extend([mock.call().write(packed_cmd), mock.call().flush()])
+ expected_ec_calls.extend(
+ [mock.call().write(packed_cmd), mock.call().flush()]
+ )
self.itpr.SendCmdToEC()
# Verify all the calls.
@@ -247,7 +257,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
# Now, the interrogation should be complete and we should know that the
# current EC image is enhanced.
- self.assertFalse(self.itpr.interrogating, msg=("interrogating should be False"))
+ self.assertFalse(
+ self.itpr.interrogating, msg=("interrogating should be False")
+ )
self.assertTrue(self.itpr.enhanced_ec, msg="enhanced_ec sholud be True")
# Now let's perform another interrogation, but pretend that the EC ignores
@@ -259,7 +271,9 @@ class TestEnhancedECBehaviour(unittest.TestCase):
self.assertTrue(self.itpr.interrogating, "interrogating sholud be True")
# We should assume that the image is not enhanced until we get the valid
# response.
- self.assertFalse(self.itpr.enhanced_ec, "enhanced_ec should be False now.")
+ self.assertFalse(
+ self.itpr.enhanced_ec, "enhanced_ec should be False now."
+ )
# Let's pretend that we get a random debug print. This should clear the
# interrogating flag.
@@ -267,8 +281,12 @@ class TestEnhancedECBehaviour(unittest.TestCase):
self.itpr.HandleECData()
# Verify that interrogating flag is cleared and enhanced_ec is still False.
- self.assertFalse(self.itpr.interrogating, "interrogating should be False.")
- self.assertFalse(self.itpr.enhanced_ec, "enhanced_ec should still be False.")
+ self.assertFalse(
+ self.itpr.interrogating, "interrogating should be False."
+ )
+ self.assertFalse(
+ self.itpr.enhanced_ec, "enhanced_ec should still be False."
+ )
class TestUARTDisconnection(unittest.TestCase):
@@ -287,7 +305,9 @@ class TestUARTDisconnection(unittest.TestCase):
# Create the pipes that the interpreter will use.
self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
- self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
+ self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(
+ duplex=False
+ )
# Mock the open() function so we can inspect reads/writes to the EC.
self.ec_uart_pty = mock.mock_open()
@@ -383,7 +403,9 @@ class TestUARTDisconnection(unittest.TestCase):
self.itpr.HandleUserData()
# Verify interpreter is connected.
- self.assertTrue(self.itpr.connected, ("The interpreter should be connected."))
+ self.assertTrue(
+ self.itpr.connected, ("The interpreter should be connected.")
+ )
# Verify that the EC UART is now a member of the inputs.
self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
# Since we have issued no commands during the disconnected state, no
diff --git a/util/ec_openocd.py b/util/ec_openocd.py
index 11956ffa1c..bff3e801ec 100755
--- a/util/ec_openocd.py
+++ b/util/ec_openocd.py
@@ -83,7 +83,10 @@ def debug(interface, board, port, executable):
openocd_args += ["-c", f"gdb_port {port}"]
openocd = subprocess.Popen(
- openocd_args, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.STDOUT
+ openocd_args,
+ encoding="utf-8",
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
)
# Wait for OpenOCD to start, it'll open a port for GDB connections
@@ -213,11 +216,15 @@ def main():
target_file = args.file.resolve()
if args.command == "flash":
- image_file = get_flash_file(args.board) if target_file == None else target_file
+ image_file = (
+ get_flash_file(args.board) if target_file == None else target_file
+ )
flash(args.interface, args.board, image_file, args.verify)
elif args.command == "debug":
executable_file = (
- get_executable_file(args.board) if target_file == None else target_file
+ get_executable_file(args.board)
+ if target_file == None
+ else target_file
)
debug(args.interface, args.board, args.port, executable_file)
else:
diff --git a/util/flash_jlink.py b/util/flash_jlink.py
index 50a0bfca20..eef29b98f4 100755
--- a/util/flash_jlink.py
+++ b/util/flash_jlink.py
@@ -142,7 +142,9 @@ def flash(jlink_exe, remote, device, interface, cmd_file):
logging.debug(f"Checking connection to {remote}.")
if not is_tcp_port_open(ip, port):
- logging.error(f"JLink server doesn't seem to be listening on {remote}.")
+ logging.error(
+ f"JLink server doesn't seem to be listening on {remote}."
+ )
logging.error("Ensure that JLinkRemoteServerCLExe is running.")
return 1
cmd.extend(["-ip", remote])
@@ -162,7 +164,9 @@ def flash(jlink_exe, remote, device, interface, cmd_file):
]
)
logging.debug('Running command: "%s"', " ".join(cmd))
- completed_process = subprocess.run(cmd) # pylint: disable=subprocess-run-check
+ completed_process = subprocess.run(
+ cmd
+ ) # pylint: disable=subprocess-run-check
logging.debug("JLink return code: %d", completed_process.returncode)
return completed_process.returncode
@@ -205,7 +209,9 @@ def main(argv: list):
)
log_level_choices = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
- parser.add_argument("--log_level", "-l", choices=log_level_choices, default="DEBUG")
+ parser.add_argument(
+ "--log_level", "-l", choices=log_level_choices, default="DEBUG"
+ )
args = parser.parse_args(argv)
logging.basicConfig(level=args.log_level)
diff --git a/util/fptool.py b/util/fptool.py
index b7f2150289..6b33a0b1f0 100755
--- a/util/fptool.py
+++ b/util/fptool.py
@@ -44,7 +44,9 @@ def main(argv: list) -> int:
# Parser for "flash" subcommand.
parser_decrypt = subparsers.add_parser("flash", help=cmd_flash.__doc__)
- parser_decrypt.add_argument("image", nargs="?", help="Path to the firmware image")
+ parser_decrypt.add_argument(
+ "image", nargs="?", help="Path to the firmware image"
+ )
parser_decrypt.set_defaults(func=cmd_flash)
opts = parser.parse_args(argv)
return opts.func(opts)
diff --git a/util/inject-keys.py b/util/inject-keys.py
index d05d4fbed7..292dd4f492 100755
--- a/util/inject-keys.py
+++ b/util/inject-keys.py
@@ -125,7 +125,9 @@ def inject_event(key, press):
print("%s: invalid key: %s" % (this_script, key))
sys.exit(1)
(row, col) = KEYMATRIX[key]
- subprocess.call(["ectool", "kbpress", str(row), str(col), "1" if press else "0"])
+ subprocess.call(
+ ["ectool", "kbpress", str(row), str(col), "1" if press else "0"]
+ )
def inject_key(key):
@@ -191,7 +193,8 @@ usage_check(arg_len % 2 == 1, "mismatched arguments")
for i in range(1, arg_len, 2):
usage_check(
- sys.argv[i] in ("-s", "-k", "-p", "-r"), "unknown flag: %s" % sys.argv[i]
+ sys.argv[i] in ("-s", "-k", "-p", "-r"),
+ "unknown flag: %s" % sys.argv[i],
)
for i in range(1, arg_len, 2):
diff --git a/util/kconfig_check.py b/util/kconfig_check.py
index 651078448d..ed832b26a1 100755
--- a/util/kconfig_check.py
+++ b/util/kconfig_check.py
@@ -212,7 +212,10 @@ class KconfigCheck:
with open(configs_file, "r") as inf:
configs = re.findall(
"%sCONFIG_([A-Za-z0-9_]*)%s"
- % ((use_defines and "#define " or ""), (use_defines and " " or "")),
+ % (
+ (use_defines and "#define " or ""),
+ (use_defines and " " or ""),
+ ),
inf.read(),
)
return configs
@@ -265,7 +268,9 @@ class KconfigCheck:
return kconfig_files
@classmethod
- def scan_kconfigs(cls, srcdir, prefix="", search_paths=None, try_kconfiglib=True):
+ def scan_kconfigs(
+ cls, srcdir, prefix="", search_paths=None, try_kconfiglib=True
+ ):
"""Scan a source tree for Kconfig options
Args:
@@ -297,7 +302,9 @@ class KconfigCheck:
else:
kconfigs = []
# Remove the prefix if present
- expr = re.compile(r"\n(config|menuconfig) (%s)?([A-Za-z0-9_]*)\n" % prefix)
+ expr = re.compile(
+ r"\n(config|menuconfig) (%s)?([A-Za-z0-9_]*)\n" % prefix
+ )
for fname in cls.find_kconfigs(srcdir):
with open(fname) as inf:
found = re.findall(expr, inf.read())
@@ -381,7 +388,12 @@ class KconfigCheck:
Exit code: 0 if OK, 1 if a problem was found
"""
new_adhoc, unneeded_adhoc, updated_adhoc = self.check_adhoc_configs(
- configs_file, srcdir, allowed_file, prefix, use_defines, search_paths
+ configs_file,
+ srcdir,
+ allowed_file,
+ prefix,
+ use_defines,
+ search_paths,
)
if new_adhoc:
file_list = "\n".join(["CONFIG_%s" % name for name in new_adhoc])
@@ -411,7 +423,9 @@ To temporarily disable this, use: ALLOW_CONFIG=1 make ...
with open(NEW_ALLOWED_FNAME, "w") as out:
for config in updated_adhoc:
print("CONFIG_%s" % config, file=out)
- now_in_kconfig = "\n".join(["CONFIG_%s" % name for name in unneeded_adhoc])
+ now_in_kconfig = "\n".join(
+ ["CONFIG_%s" % name for name in unneeded_adhoc]
+ )
print(
f"""The following options are now in Kconfig:
@@ -427,7 +441,13 @@ update in your CL:
return 0
def do_build(
- self, configs_file, srcdir, allowed_file, prefix, use_defines, search_paths
+ self,
+ configs_file,
+ srcdir,
+ allowed_file,
+ prefix,
+ use_defines,
+ search_paths,
):
"""Find new ad-hoc configs in the configs_file
@@ -445,7 +465,12 @@ update in your CL:
Exit code: 0 if OK, 1 if a problem was found
"""
new_adhoc, _, updated_adhoc = self.check_adhoc_configs(
- configs_file, srcdir, allowed_file, prefix, use_defines, search_paths
+ configs_file,
+ srcdir,
+ allowed_file,
+ prefix,
+ use_defines,
+ search_paths,
)
with open(NEW_ALLOWED_FNAME, "w") as out:
combined = sorted(new_adhoc + updated_adhoc)
diff --git a/util/run_ects.py b/util/run_ects.py
index cf32184462..6d89b39f23 100644
--- a/util/run_ects.py
+++ b/util/run_ects.py
@@ -68,7 +68,10 @@ def main():
help="Echo commands to be executed without running them.",
)
parser.add_argument(
- "-s", "--sync", action="store_true", help="Sync tree before running tests."
+ "-s",
+ "--sync",
+ action="store_true",
+ help="Sync tree before running tests.",
)
parser.add_argument(
"-u", "--upload", action="store_true", help="Upload test results."
diff --git a/util/test_kconfig_check.py b/util/test_kconfig_check.py
index d4942821ae..41c95e951c 100644
--- a/util/test_kconfig_check.py
+++ b/util/test_kconfig_check.py
@@ -134,11 +134,14 @@ rsource "subdir/Kconfig.wibble"
with tempfile.TemporaryDirectory() as srctree:
self.setup_srctree(srctree)
self.assertEqual(
- ["MENU_KCONFIG", "MY_KCONFIG"], checker.scan_kconfigs(srctree, PREFIX)
+ ["MENU_KCONFIG", "MY_KCONFIG"],
+ checker.scan_kconfigs(srctree, PREFIX),
)
@classmethod
- def setup_allowed_and_configs(cls, allowed_fname, configs_fname, add_new_one=True):
+ def setup_allowed_and_configs(
+ cls, allowed_fname, configs_fname, add_new_one=True
+ ):
"""Set up the 'allowed' and 'configs' files for tests
Args:
@@ -181,7 +184,9 @@ rsource "subdir/Kconfig.wibble"
self.setup_srctree(srctree)
with tempfile.NamedTemporaryFile() as allowed:
with tempfile.NamedTemporaryFile() as configs:
- self.setup_allowed_and_configs(allowed.name, configs.name)
+ self.setup_allowed_and_configs(
+ allowed.name, configs.name
+ )
ret_code = kconfig_check.main(
[
"-c",
@@ -204,7 +209,9 @@ rsource "subdir/Kconfig.wibble"
"""Same Kconfig should be returned for kconfiglib / adhoc"""
if not kconfig_check.USE_KCONFIGLIB:
self.fail("No kconfiglib available")
- zephyr_path = pathlib.Path("../../../src/third_party/zephyr/main").resolve()
+ zephyr_path = pathlib.Path(
+ "../../../src/third_party/zephyr/main"
+ ).resolve()
if not zephyr_path.exists():
self.fail("No zephyr tree available")
os.environ["ZEPHYR_BASE"] = str(zephyr_path)
diff --git a/util/twister_launcher.py b/util/twister_launcher.py
index e6e5999252..36549d4e50 100755
--- a/util/twister_launcher.py
+++ b/util/twister_launcher.py
@@ -62,7 +62,9 @@ def main():
ec_base = cros_checkout / "src" / "platform" / "ec"
# Module paths, including third party modules and the EC application.
- zephyr_modules = find_modules(cros_checkout / "src" / "third_party" / "zephyr")
+ zephyr_modules = find_modules(
+ cros_checkout / "src" / "third_party" / "zephyr"
+ )
zephyr_modules.append(ec_base)
# Prepare environment variables for export to Twister and inherit the
diff --git a/util/uart_stress_tester.py b/util/uart_stress_tester.py
index c44262016c..6417376a08 100755
--- a/util/uart_stress_tester.py
+++ b/util/uart_stress_tester.py
@@ -46,7 +46,10 @@ TPM_CMD = (
)
# A ChromeOS TPM command for the cr50 stress
# purpose.
-CR50_LOAD_GEN_CMD = "while [[ -f %s ]]; do %s; done &" % (FLAG_FILENAME, TPM_CMD)
+CR50_LOAD_GEN_CMD = "while [[ -f %s ]]; do %s; done &" % (
+ FLAG_FILENAME,
+ TPM_CMD,
+)
# A command line to run TPM_CMD in background
# infinitely.
@@ -231,7 +234,9 @@ class UartSerial(object):
" to this port, and try it again." % self.serial.port
)
- self.logger.info("Detected as %s UART", self.dev_prof["device_type"])
+ self.logger.info(
+ "Detected as %s UART", self.dev_prof["device_type"]
+ )
# Log displays the UART type (AP|EC) instead of device filename.
self.logger = logging.getLogger(
type(self).__name__ + "| " + self.dev_prof["device_type"]
@@ -266,7 +271,9 @@ class UartSerial(object):
)
self.num_ch_exp = int(self.serial.baudrate * self.duration / 10)
- chargen_cmd = "chargen " + str(CHARGEN_TXT_LEN) + " " + str(self.num_ch_exp)
+ chargen_cmd = (
+ "chargen " + str(CHARGEN_TXT_LEN) + " " + str(self.num_ch_exp)
+ )
if self.usb_output:
chargen_cmd += " usb"
self.test_cli = [chargen_cmd]
@@ -304,9 +311,13 @@ class UartSerial(object):
self.char_loss_occurrences = 0
data_starve_count = 0
- total_num_ch = self.num_ch_exp # Expected number of characters in total
+ total_num_ch = (
+ self.num_ch_exp
+ ) # Expected number of characters in total
ch_exp = CHARGEN_TXT[0]
- ch_cap = "z" # any character value is ok for loop initial condition.
+ ch_cap = (
+ "z" # any character value is ok for loop initial condition.
+ )
while self.num_ch_cap < total_num_ch:
captured = self.get_output()
@@ -325,7 +336,9 @@ class UartSerial(object):
# If it is not alpha-numeric, terminate the test.
if ch_cap not in CRLF:
# If it is neither a CR nor LF, then it is an error case.
- self.logger.error("Whole captured characters: %r", captured)
+ self.logger.error(
+ "Whole captured characters: %r", captured
+ )
raise ChargenTestError(
err_msg
% (
@@ -509,7 +522,9 @@ class ChargenTest(object):
# Print the result.
char_lost = self.print_result()
if char_lost:
- raise ChargenTestError("Test failed: lost %d character(s)" % char_lost)
+ raise ChargenTestError(
+ "Test failed: lost %d character(s)" % char_lost
+ )
self.logger.info("Test is done")
@@ -537,7 +552,9 @@ Examples:
parser = argparse.ArgumentParser(
description=description, formatter_class=argparse.RawTextHelpFormatter
)
- parser.add_argument("port", type=str, nargs="*", help="UART device path to test")
+ parser.add_argument(
+ "port", type=str, nargs="*", help="UART device path to test"
+ )
parser.add_argument(
"-c",
"--cr50",
@@ -580,7 +597,9 @@ def main():
loglevel = logging.INFO
log_format += " | %(message)s"
- logging.basicConfig(level=loglevel, format=log_format, datefmt=date_format)
+ logging.basicConfig(
+ level=loglevel, format=log_format, datefmt=date_format
+ )
# Create a ChargenTest object
utest = ChargenTest(
diff --git a/util/update_release_branch.py b/util/update_release_branch.py
index 4d9c89df4a..c2fc5ff116 100755
--- a/util/update_release_branch.py
+++ b/util/update_release_branch.py
@@ -136,7 +136,14 @@ def get_relevant_commits(head, merge_head, fmt, relevant_paths):
stdout.
"""
if fmt:
- cmd = ["git", "log", fmt, head + ".." + merge_head, "--", relevant_paths]
+ cmd = [
+ "git",
+ "log",
+ fmt,
+ head + ".." + merge_head,
+ "--",
+ relevant_paths,
+ ]
else:
cmd = ["git", "log", head + ".." + merge_head, "--", relevant_paths]
@@ -281,7 +288,10 @@ def main(argv):
arglist.append("-X" + opts.strategy_option)
subprocess.run(arglist, check=True)
else:
- print("We have already started merge process.", "Attempt to generate commit.")
+ print(
+ "We have already started merge process.",
+ "Attempt to generate commit.",
+ )
print("Generating commit message...")
branch = subprocess.run(
@@ -308,7 +318,12 @@ def main(argv):
commit_msg = git_commit_msg(branch, head, merge_head, relevant_paths, cmd)
subprocess.run(["git", "commit", "--signoff", "-m", commit_msg], check=True)
subprocess.run(["git", "commit", "--amend"], check=True)
- print(("Finished! **Please review the commit to see if it's to your " "liking.**"))
+ print(
+ (
+ "Finished! **Please review the commit to see if it's to your "
+ "liking.**"
+ )
+ )
if __name__ == "__main__":
diff --git a/util/zephyr_to_resultdb.py b/util/zephyr_to_resultdb.py
index 07b4ecc010..19301aa50b 100755
--- a/util/zephyr_to_resultdb.py
+++ b/util/zephyr_to_resultdb.py
@@ -50,7 +50,9 @@ def testcase_summary(testcase):
or "reason" in testcase
or translate_status(testcase["status"]) == "SKIP"
):
- html = '<p><text-artifact artifact-id="artifact-content-in-request"></p>'
+ html = (
+ '<p><text-artifact artifact-id="artifact-content-in-request"></p>'
+ )
return html
diff --git a/zephyr/firmware_builder.py b/zephyr/firmware_builder.py
index f216e3c99f..3f076d5fca 100755
--- a/zephyr/firmware_builder.py
+++ b/zephyr/firmware_builder.py
@@ -63,7 +63,9 @@ def build(opts):
for project in zmake.project.find_projects(zephyr_dir).values():
if project.config.is_test:
continue
- build_dir = platform_ec / "build" / "zephyr" / project.config.project_name
+ build_dir = (
+ platform_ec / "build" / "zephyr" / project.config.project_name
+ )
metric = metric_list.value.add()
metric.target_name = project.config.project_name
metric.platform_name = project.config.zephyr_board
@@ -179,7 +181,9 @@ def bundle_firmware(opts):
for project in zmake.project.find_projects(zephyr_dir).values():
if project.config.is_test:
continue
- build_dir = platform_ec / "build" / "zephyr" / project.config.project_name
+ build_dir = (
+ platform_ec / "build" / "zephyr" / project.config.project_name
+ )
artifacts_dir = build_dir / "output"
tarball_name = f"{project.config.project_name}.firmware.tbz2"
tarball_path = bundle_dir.joinpath(tarball_name)
@@ -256,7 +260,11 @@ def test(opts):
subprocess.run(cmd, cwd=platform_ec, check=True)
output = subprocess.run(
- ["/usr/bin/lcov", "--summary", platform_ec / "build/coverage/lcov.info"],
+ [
+ "/usr/bin/lcov",
+ "--summary",
+ platform_ec / "build/coverage/lcov.info",
+ ],
cwd=pathlib.Path(__file__).parent,
check=True,
stdout=subprocess.PIPE,
@@ -321,7 +329,9 @@ def test(opts):
return 0
-COVERAGE_RE = re.compile(r"lines\.*: *([0-9\.]+)% \(([0-9]+) of ([0-9]+) lines\)")
+COVERAGE_RE = re.compile(
+ r"lines\.*: *([0-9\.]+)% \(([0-9]+) of ([0-9]+) lines\)"
+)
def _extract_lcov_summary(name, metrics, output):
@@ -367,14 +377,18 @@ def parse_args(args):
"--metadata",
required=False,
help=(
- "Full pathname for the file in which to write build artifact " "metadata."
+ "Full pathname for the file in which to write build artifact "
+ "metadata."
),
)
parser.add_argument(
"--output-dir",
required=False,
- help=("Full pathname for the directory in which to bundle build " "artifacts."),
+ help=(
+ "Full pathname for the directory in which to bundle build "
+ "artifacts."
+ ),
)
parser.add_argument(
diff --git a/zephyr/projects/brya/BUILD.py b/zephyr/projects/brya/BUILD.py
index 1457fb31ff..054ff85fff 100644
--- a/zephyr/projects/brya/BUILD.py
+++ b/zephyr/projects/brya/BUILD.py
@@ -5,7 +5,9 @@
"""Define zmake projects for brya."""
-def register_npcx9_variant(project_name, extra_dts_overlays=(), extra_kconfig_files=()):
+def register_npcx9_variant(
+ project_name, extra_dts_overlays=(), extra_kconfig_files=()
+):
"""Register a variant of a brya, even though this is not named as such."""
return register_npcx_project(
project_name=project_name,
diff --git a/zephyr/projects/herobrine/BUILD.py b/zephyr/projects/herobrine/BUILD.py
index 90f0004d01..b6a1f70382 100644
--- a/zephyr/projects/herobrine/BUILD.py
+++ b/zephyr/projects/herobrine/BUILD.py
@@ -5,7 +5,9 @@
"""Define zmake projects for herobrine."""
-def register_variant(project_name, extra_dts_overlays=(), extra_kconfig_files=()):
+def register_variant(
+ project_name, extra_dts_overlays=(), extra_kconfig_files=()
+):
"""Register a variant of herobrine."""
register_npcx_project(
project_name=project_name,
diff --git a/zephyr/test/math/BUILD.py b/zephyr/test/math/BUILD.py
index 8f6b28ce1a..fb93f5269d 100644
--- a/zephyr/test/math/BUILD.py
+++ b/zephyr/test/math/BUILD.py
@@ -8,5 +8,6 @@ register_host_test(
"math_fixed", kconfig_files=[here / "prj.conf", here / "fixed_point.conf"]
)
register_host_test(
- "math_float", kconfig_files=[here / "prj.conf", here / "floating_point.conf"]
+ "math_float",
+ kconfig_files=[here / "prj.conf", here / "floating_point.conf"],
)
diff --git a/zephyr/zmake/tests/test_build_config.py b/zephyr/zmake/tests/test_build_config.py
index 88291cbd05..ac73432e88 100644
--- a/zephyr/zmake/tests/test_build_config.py
+++ b/zephyr/zmake/tests/test_build_config.py
@@ -162,7 +162,9 @@ def test_popen_cmake_no_kconfig(conf: BuildConfig, project_dir, build_dir):
@hypothesis.given(build_configs_with_at_least_one_kconfig, paths, paths)
@hypothesis.settings(deadline=60000)
-def test_popen_cmake_kconfig_but_no_file(conf: BuildConfig, project_dir, build_dir):
+def test_popen_cmake_kconfig_but_no_file(
+ conf: BuildConfig, project_dir, build_dir
+):
"""Test that running popen_cmake with Kconfig definitions to write
out, but no path to do so, should raise an error.
"""
@@ -183,7 +185,10 @@ def test_popen_cmake_kconfig(conf: BuildConfig, project_dir, build_dir):
try:
conf.popen_cmake(
- job_client, project_dir, build_dir, kconfig_path=pathlib.Path(temp_path)
+ job_client,
+ project_dir,
+ build_dir,
+ kconfig_path=pathlib.Path(temp_path),
)
_, cmake_defs = parse_cmake_args(job_client.captured_argv)
@@ -214,7 +219,9 @@ def fake_kconfig_files(tmp_path):
paths = [tmp_path / f"{letter}.conf" for letter in "ABCD"]
for path, cfg_name in zip(paths, ("ONE", "TWO", "THREE", "FOUR")):
- path.write_text(f"# Fake kconfig file for testing.\nCONFIG_{cfg_name}=y\n")
+ path.write_text(
+ f"# Fake kconfig file for testing.\nCONFIG_{cfg_name}=y\n"
+ )
return paths
diff --git a/zephyr/zmake/tests/test_modules.py b/zephyr/zmake/tests/test_modules.py
index 8fdbd3cb43..4128f7787a 100644
--- a/zephyr/zmake/tests/test_modules.py
+++ b/zephyr/zmake/tests/test_modules.py
@@ -36,4 +36,6 @@ def test_locate_in_directory(modules):
expected_modules[module] = module_dir
- assert zmake.modules.locate_from_directory(modules_dir) == expected_modules
+ assert (
+ zmake.modules.locate_from_directory(modules_dir) == expected_modules
+ )
diff --git a/zephyr/zmake/tests/test_multiproc_logging.py b/zephyr/zmake/tests/test_multiproc_logging.py
index 2694b6451e..fe5bb62e8f 100644
--- a/zephyr/zmake/tests/test_multiproc_logging.py
+++ b/zephyr/zmake/tests/test_multiproc_logging.py
@@ -20,7 +20,9 @@ def test_read_output_from_pipe():
file_desc = io.TextIOWrapper(os.fdopen(pipe[0], "rb"), encoding="utf-8")
logger = mock.Mock(spec=logging.Logger)
logger.log.side_effect = lambda log_lvl, line: semaphore.release()
- zmake.multiproc.LogWriter.log_output(logger, logging.DEBUG, file_desc, job_id="")
+ zmake.multiproc.LogWriter.log_output(
+ logger, logging.DEBUG, file_desc, job_id=""
+ )
os.write(pipe[1], "Hello\n".encode("utf-8"))
semaphore.acquire()
logger.log.assert_called_with(logging.DEBUG, "Hello")
@@ -77,8 +79,12 @@ def test_read_output_from_second_pipe():
logger = mock.Mock(spec=logging.Logger)
logger.log.side_effect = lambda log_lvl, fmt, id, line: semaphore.release()
- zmake.multiproc.LogWriter.log_output(logger, logging.DEBUG, fds[0], job_id="0")
- zmake.multiproc.LogWriter.log_output(logger, logging.ERROR, fds[1], job_id="1")
+ zmake.multiproc.LogWriter.log_output(
+ logger, logging.DEBUG, fds[0], job_id="0"
+ )
+ zmake.multiproc.LogWriter.log_output(
+ logger, logging.ERROR, fds[1], job_id="1"
+ )
os.write(pipes[1][1], "Hello\n".encode("utf-8"))
semaphore.acquire()
@@ -102,8 +108,12 @@ def test_read_output_after_another_pipe_closed():
logger = mock.Mock(spec=logging.Logger)
logger.log.side_effect = lambda log_lvl, fmt, id, line: semaphore.release()
- zmake.multiproc.LogWriter.log_output(logger, logging.DEBUG, fds[0], job_id="0")
- zmake.multiproc.LogWriter.log_output(logger, logging.ERROR, fds[1], job_id="1")
+ zmake.multiproc.LogWriter.log_output(
+ logger, logging.DEBUG, fds[0], job_id="0"
+ )
+ zmake.multiproc.LogWriter.log_output(
+ logger, logging.ERROR, fds[1], job_id="1"
+ )
fds[0].close()
os.write(pipes[1][1], "Hello\n".encode("utf-8"))
diff --git a/zephyr/zmake/tests/test_project.py b/zephyr/zmake/tests/test_project.py
index 2d98aaf32b..5dae09cd21 100644
--- a/zephyr/zmake/tests/test_project.py
+++ b/zephyr/zmake/tests/test_project.py
@@ -33,7 +33,9 @@ def test_find_dts_overlays(modules):
with tempfile.TemporaryDirectory() as modpath:
modpath = pathlib.Path(modpath)
for board in boards:
- dts_path = zmake.project.module_dts_overlay_name(modpath, board)
+ dts_path = zmake.project.module_dts_overlay_name(
+ modpath, board
+ )
dts_path.parent.mkdir(parents=True, exist_ok=True)
dts_path.touch()
setup_modules_and_dispatch(
@@ -48,7 +50,9 @@ def test_find_dts_overlays(modules):
board_file_mapping = {}
for modpath, board_list in zip(module_paths, modules):
for board in board_list:
- file_name = zmake.project.module_dts_overlay_name(modpath, board)
+ file_name = zmake.project.module_dts_overlay_name(
+ modpath, board
+ )
files = board_file_mapping.get(board, set())
board_file_mapping[board] = files | {file_name}
@@ -257,4 +261,6 @@ def test_kconfig_files(tmp_path, actual_files, config_files, expected_files):
assert len(builds) == 1
_, config = builds[0]
- assert sorted(f.name for f in config.kconfig_files) == sorted(expected_files)
+ assert sorted(f.name for f in config.kconfig_files) == sorted(
+ expected_files
+ )
diff --git a/zephyr/zmake/tests/test_toolchains.py b/zephyr/zmake/tests/test_toolchains.py
index 3773fac13d..f778bedf87 100644
--- a/zephyr/zmake/tests/test_toolchains.py
+++ b/zephyr/zmake/tests/test_toolchains.py
@@ -196,9 +196,13 @@ def test_no_toolchains(fake_project: project.Project, mockfs, no_environ):
fake_project.get_toolchain(module_paths)
-def test_override_without_sdk(fake_project: project.Project, mockfs, no_environ):
+def test_override_without_sdk(
+ fake_project: project.Project, mockfs, no_environ
+):
"""Check for error override is set to zephyr, but it can't be found."""
chain = fake_project.get_toolchain(module_paths, override="zephyr")
- with pytest.raises(RuntimeError, match=r"No installed Zephyr SDK was found"):
+ with pytest.raises(
+ RuntimeError, match=r"No installed Zephyr SDK was found"
+ ):
chain.get_build_config()
diff --git a/zephyr/zmake/tests/test_version.py b/zephyr/zmake/tests/test_version.py
index 37d4fd8d67..b0ed0953f4 100644
--- a/zephyr/zmake/tests/test_version.py
+++ b/zephyr/zmake/tests/test_version.py
@@ -39,7 +39,9 @@ def _git_commit(repo, message="message!"):
"GIT_COMMITTER_EMAIL": "bitdiddle@example.org",
"GIT_COMMITTER_DATE": "Tue, 30 Aug 2005 10:50:30 -0700",
}
- subprocess.run(["git", "-C", repo, "commit", "-m", message], check=True, env=env)
+ subprocess.run(
+ ["git", "-C", repo, "commit", "-m", message], check=True, env=env
+ )
def _setup_example_repos(tmp_path):
@@ -94,7 +96,9 @@ def test_version_string(tmp_path):
"""Test a that version string is as expected."""
project, zephyr_base, modules = _setup_example_repos(tmp_path)
assert (
- version.get_version_string(project.config.project_name, zephyr_base, modules)
+ version.get_version_string(
+ project.config.project_name, zephyr_base, modules
+ )
== "prj_v2.6.4-ec:b5991f,os:377d26,mod1:02fd7a"
)
@@ -181,7 +185,9 @@ def test_header_gen_exists_not_changed(fake_user_hostname, fake_date, tmp_path):
assert output_file.stat().st_mtime == expected_mtime
-def test_header_gen_exists_needs_changes(fake_user_hostname, fake_date, tmp_path):
+def test_header_gen_exists_needs_changes(
+ fake_user_hostname, fake_date, tmp_path
+):
"""Test that the version file is changed, when needed."""
# Test we overwrite when it exists already and changes are needed.
output_file = tmp_path / "ec_version.h"
diff --git a/zephyr/zmake/tests/test_zmake.py b/zephyr/zmake/tests/test_zmake.py
index db7189a2e8..c49fb9abda 100644
--- a/zephyr/zmake/tests/test_zmake.py
+++ b/zephyr/zmake/tests/test_zmake.py
@@ -87,7 +87,10 @@ class FakeJobserver(zmake.jobserver.GNUMakeJobServer):
"""Ignores the provided command and just runs 'cat' instead"""
for pattern, filename in self.fnames.items():
# Convert to a list of strings
- cmd = [isinstance(c, pathlib.PosixPath) and c.as_posix() or c for c in cmd]
+ cmd = [
+ isinstance(c, pathlib.PosixPath) and c.as_posix() or c
+ for c in cmd
+ ]
if pattern.match(" ".join(cmd)):
new_cmd = ["cat", filename]
break
@@ -167,7 +170,9 @@ class TestFilters:
expected = {
"Configuring fakeproject:rw.",
"Configuring fakeproject:ro.",
- "Building fakeproject in {}/ec/build/zephyr/fakeproject.".format(tmp_path),
+ "Building fakeproject in {}/ec/build/zephyr/fakeproject.".format(
+ tmp_path
+ ),
"Building fakeproject:ro: /usr/bin/ninja -C {}-ro".format(
tmp_path / "ec/build/zephyr/fakeproject/build"
),
@@ -178,7 +183,9 @@ class TestFilters:
for suffix in ["ro", "rw"]:
with open(get_test_filepath("%s_INFO" % suffix)) as file:
for line in file:
- expected.add("[fakeproject:{}]{}".format(suffix, line.strip()))
+ expected.add(
+ "[fakeproject:{}]{}".format(suffix, line.strip())
+ )
# This produces an easy-to-read diff if there is a difference
assert expected == set(recs)
@@ -191,7 +198,9 @@ class TestFilters:
expected = {
"Configuring fakeproject:rw.",
"Configuring fakeproject:ro.",
- "Building fakeproject in {}/ec/build/zephyr/fakeproject.".format(tmp_path),
+ "Building fakeproject in {}/ec/build/zephyr/fakeproject.".format(
+ tmp_path
+ ),
"Building fakeproject:ro: /usr/bin/ninja -C {}-ro".format(
tmp_path / "ec/build/zephyr/fakeproject/build"
),
@@ -204,7 +213,9 @@ class TestFilters:
for suffix in ["ro", "rw"]:
with open(get_test_filepath(suffix)) as file:
for line in file:
- expected.add("[fakeproject:{}]{}".format(suffix, line.strip()))
+ expected.add(
+ "[fakeproject:{}]{}".format(suffix, line.strip())
+ )
# This produces an easy-to-read diff if there is a difference
assert expected == set(recs)
@@ -218,7 +229,9 @@ class TestFilters:
)
dt_errs = [rec for rec in recs if "adc" in rec]
- assert "devicetree error: 'adc' is marked as required" in list(dt_errs)[0]
+ assert (
+ "devicetree error: 'adc' is marked as required" in list(dt_errs)[0]
+ )
@pytest.mark.parametrize(
diff --git a/zephyr/zmake/zephyr_build_tools/generate_ec_version.py b/zephyr/zmake/zephyr_build_tools/generate_ec_version.py
index 96f9013ca4..c43ee4eba3 100755
--- a/zephyr/zmake/zephyr_build_tools/generate_ec_version.py
+++ b/zephyr/zmake/zephyr_build_tools/generate_ec_version.py
@@ -59,7 +59,9 @@ def main():
help="Specify modules paths to include in version hash. Uses "
"ZEPHYR_MODULES env var if unset",
)
- parser.add_argument("-n", "--name", required=True, type=str, help="Project name")
+ parser.add_argument(
+ "-n", "--name", required=True, type=str, help="Project name"
+ )
args = parser.parse_args()
@@ -82,7 +84,9 @@ def main():
# No modules specified on command line. Default to environment variable.
env_modules = os.environ.get("ZEPHYR_MODULES")
args.module = env_modules.split(";") if env_modules else []
- logging.info("No modules passed via CLI. Getting list from ZEPHYR_MODULES")
+ logging.info(
+ "No modules passed via CLI. Getting list from ZEPHYR_MODULES"
+ )
elif len(args.module) == 1:
# In case of a single -m flag, treat value as a semicolon-delimited
@@ -90,7 +94,9 @@ def main():
args.module = args.module[0].split(";")
try:
- module_dict = convert_module_list_to_dict(map(pathlib.Path, args.module))
+ module_dict = convert_module_list_to_dict(
+ map(pathlib.Path, args.module)
+ )
except FileNotFoundError as err:
logging.error("Cannot find module: %s", str(err))
return 1
@@ -112,7 +118,9 @@ def main():
output_path.parent.mkdir(parents=True, exist_ok=True)
logging.info("Writing header to %s", args.header_path)
- zmake.version.write_version_header(ver, output_path, sys.argv[0], args.static)
+ zmake.version.write_version_header(
+ ver, output_path, sys.argv[0], args.static
+ )
return 0
diff --git a/zephyr/zmake/zmake/__main__.py b/zephyr/zmake/zmake/__main__.py
index 7054601ac1..ab0bf99164 100644
--- a/zephyr/zmake/zmake/__main__.py
+++ b/zephyr/zmake/zmake/__main__.py
@@ -225,7 +225,9 @@ def get_argparser():
dest="clobber",
help="Delete existing build directories, even if configuration is unchanged",
)
- testall.add_argument("-B", "--build-dir", type=pathlib.Path, help="Build directory")
+ testall.add_argument(
+ "-B", "--build-dir", type=pathlib.Path, help="Build directory"
+ )
generate_readme = sub.add_parser(
"generate-readme",
@@ -251,7 +253,9 @@ def get_argparser():
def add_common_configure_args(sub_parser: argparse.ArgumentParser):
"""Adds common arguments used by configure-like subcommands."""
- sub_parser.add_argument("-t", "--toolchain", help="Name of toolchain to use")
+ sub_parser.add_argument(
+ "-t", "--toolchain", help="Name of toolchain to use"
+ )
sub_parser.add_argument(
"--bringup",
action="store_true",
diff --git a/zephyr/zmake/zmake/build_config.py b/zephyr/zmake/zmake/build_config.py
index 0d03e22a45..6f1a76904e 100644
--- a/zephyr/zmake/zmake/build_config.py
+++ b/zephyr/zmake/zmake/build_config.py
@@ -19,7 +19,11 @@ class BuildConfig:
"""
def __init__(
- self, environ_defs=None, cmake_defs=None, kconfig_defs=None, kconfig_files=None
+ self,
+ environ_defs=None,
+ cmake_defs=None,
+ kconfig_defs=None,
+ kconfig_files=None,
):
self.environ_defs = dict(environ_defs or {})
self.cmake_defs = dict(cmake_defs or {})
@@ -68,7 +72,9 @@ class BuildConfig:
)
conf_file_config = BuildConfig(
cmake_defs={
- "CONF_FILE": ";".join(str(p.resolve()) for p in kconfig_files)
+ "CONF_FILE": ";".join(
+ str(p.resolve()) for p in kconfig_files
+ )
}
)
return (base_config | conf_file_config).popen_cmake(
@@ -93,7 +99,9 @@ class BuildConfig:
"""Combine two BuildConfig instances."""
if not isinstance(other, BuildConfig):
raise TypeError(
- "Unsupported operation | for {} and {}".format(type(self), type(other))
+ "Unsupported operation | for {} and {}".format(
+ type(self), type(other)
+ )
)
return BuildConfig(
diff --git a/zephyr/zmake/zmake/configlib.py b/zephyr/zmake/zmake/configlib.py
index 2baa20523b..6264f19e45 100644
--- a/zephyr/zmake/zmake/configlib.py
+++ b/zephyr/zmake/zmake/configlib.py
@@ -11,7 +11,9 @@ def _register_project(**kwargs):
kwargs.setdefault(
"project_dir", here # noqa: F821 pylint: disable=undefined-variable
)
- return register_project(**kwargs) # noqa: F821 pylint: disable=undefined-variable
+ return register_project(
+ **kwargs
+ ) # noqa: F821 pylint: disable=undefined-variable
def register_host_project(**kwargs):
@@ -25,7 +27,9 @@ def register_host_project(**kwargs):
def register_host_test(test_name, **kwargs):
"""Register a test project that runs on the host."""
kwargs.setdefault("is_test", True)
- return register_host_project(project_name="test-{}".format(test_name), **kwargs)
+ return register_host_project(
+ project_name="test-{}".format(test_name), **kwargs
+ )
def register_raw_project(**kwargs):
diff --git a/zephyr/zmake/zmake/generate_readme.py b/zephyr/zmake/zmake/generate_readme.py
index 9008f0f45d..ebac0d908b 100644
--- a/zephyr/zmake/zmake/generate_readme.py
+++ b/zephyr/zmake/zmake/generate_readme.py
@@ -53,7 +53,9 @@ class MarkdownHelpFormatter(argparse.HelpFormatter):
if action.nargs == 0:
parts.append(option_string)
else:
- parts.append(f"{option_string} {_get_metavar(action).upper()}")
+ parts.append(
+ f"{option_string} {_get_metavar(action).upper()}"
+ )
return ", ".join(f"`{part}`" for part in parts)
return f"`{_get_metavar(action)}`"
@@ -103,7 +105,9 @@ def generate_readme():
_append("# Zmake")
_append()
- _append('<!-- Auto-generated contents! Run "zmake generate-readme" to update. -->')
+ _append(
+ '<!-- Auto-generated contents! Run "zmake generate-readme" to update. -->'
+ )
_append()
_append("[TOC]")
_append()
diff --git a/zephyr/zmake/zmake/multiproc.py b/zephyr/zmake/zmake/multiproc.py
index 7d9a88de5a..5f1497c0dc 100644
--- a/zephyr/zmake/zmake/multiproc.py
+++ b/zephyr/zmake/zmake/multiproc.py
@@ -297,7 +297,9 @@ class Executor:
exception.
"""
with self.lock:
- thread = threading.Thread(target=lambda: self._run_fn(func), daemon=True)
+ thread = threading.Thread(
+ target=lambda: self._run_fn(func), daemon=True
+ )
thread.start()
self.threads.append(thread)
diff --git a/zephyr/zmake/zmake/output_packers.py b/zephyr/zmake/zmake/output_packers.py
index 78ee7649e6..bc58d92ddc 100644
--- a/zephyr/zmake/zmake/output_packers.py
+++ b/zephyr/zmake/zmake/output_packers.py
@@ -85,8 +85,10 @@ class BasePacker:
Returns:
The file if it passes the test.
"""
- max_size = self._get_max_image_bytes( # pylint: disable=assignment-from-none
- dir_map
+ max_size = (
+ self._get_max_image_bytes( # pylint: disable=assignment-from-none
+ dir_map
+ )
)
if max_size is None or file.stat().st_size <= max_size:
return file
@@ -120,11 +122,19 @@ class BinmanPacker(BasePacker):
super().__init__(project)
def configs(self):
- yield "ro", build_config.BuildConfig(kconfig_defs={"CONFIG_CROS_EC_RO": "y"})
- yield "rw", build_config.BuildConfig(kconfig_defs={"CONFIG_CROS_EC_RW": "y"})
+ yield "ro", build_config.BuildConfig(
+ kconfig_defs={"CONFIG_CROS_EC_RO": "y"}
+ )
+ yield "rw", build_config.BuildConfig(
+ kconfig_defs={"CONFIG_CROS_EC_RW": "y"}
+ )
def pack_firmware(
- self, work_dir, jobclient: zmake.jobserver.JobClient, dir_map, version_string=""
+ self,
+ work_dir,
+ jobclient: zmake.jobserver.JobClient,
+ dir_map,
+ version_string="",
):
"""Pack RO and RW sections using Binman.
@@ -148,8 +158,12 @@ class BinmanPacker(BasePacker):
# Copy the inputs into the work directory so that Binman can
# find them under a hard-coded name.
- shutil.copy2(ro_dir / "zephyr" / self.ro_file, work_dir / "zephyr_ro.bin")
- shutil.copy2(rw_dir / "zephyr" / self.rw_file, work_dir / "zephyr_rw.bin")
+ shutil.copy2(
+ ro_dir / "zephyr" / self.ro_file, work_dir / "zephyr_ro.bin"
+ )
+ shutil.copy2(
+ rw_dir / "zephyr" / self.rw_file, work_dir / "zephyr_rw.bin"
+ )
# Version in FRID/FWID can be at most 31 bytes long (32, minus
# one for null character).
@@ -176,8 +190,12 @@ class BinmanPacker(BasePacker):
encoding="utf-8",
)
- zmake.multiproc.LogWriter.log_output(self.logger, logging.DEBUG, proc.stdout)
- zmake.multiproc.LogWriter.log_output(self.logger, logging.ERROR, proc.stderr)
+ zmake.multiproc.LogWriter.log_output(
+ self.logger, logging.DEBUG, proc.stdout
+ )
+ zmake.multiproc.LogWriter.log_output(
+ self.logger, logging.ERROR, proc.stderr
+ )
if proc.wait(timeout=60):
raise OSError("Failed to run binman")
diff --git a/zephyr/zmake/zmake/project.py b/zephyr/zmake/zmake/project.py
index 42d1258cb5..ca7b30e6ba 100644
--- a/zephyr/zmake/zmake/project.py
+++ b/zephyr/zmake/zmake/project.py
@@ -25,7 +25,13 @@ def module_dts_overlay_name(modpath, board_name):
Returns:
A pathlib.Path object to the expected overlay path.
"""
- return modpath / "zephyr" / "dts" / "board-overlays" / "{}.dts".format(board_name)
+ return (
+ modpath
+ / "zephyr"
+ / "dts"
+ / "board-overlays"
+ / "{}.dts".format(board_name)
+ )
@dataclasses.dataclass
@@ -43,7 +49,9 @@ class ProjectConfig:
is_test: bool = dataclasses.field(default=False)
test_args: typing.List[str] = dataclasses.field(default_factory=list)
dts_overlays: "list[str]" = dataclasses.field(default_factory=list)
- kconfig_files: "list[pathlib.Path]" = dataclasses.field(default_factory=list)
+ kconfig_files: "list[pathlib.Path]" = dataclasses.field(
+ default_factory=list
+ )
project_dir: pathlib.Path = dataclasses.field(default_factory=pathlib.Path)
test_timeout_secs: float = dataclasses.field(default=2 * 60)
@@ -53,7 +61,9 @@ class Project:
def __init__(self, config: ProjectConfig):
self.config = config
- self.packer: zmake.output_packers.BasePacker = self.config.output_packer(self)
+ self.packer: zmake.output_packers.BasePacker = (
+ self.config.output_packer(self)
+ )
def iter_builds(self):
"""Iterate thru the build combinations provided by the project's packer.
@@ -61,7 +71,9 @@ class Project:
Yields:
2-tuples of a build configuration name and a BuildConfig.
"""
- conf = build_config.BuildConfig(cmake_defs={"BOARD": self.config.zephyr_board})
+ conf = build_config.BuildConfig(
+ cmake_defs={"BOARD": self.config.zephyr_board}
+ )
kconfig_files = []
prj_conf = self.config.project_dir / "prj.conf"
@@ -85,7 +97,9 @@ class Project:
"""
overlays = []
for module_path in modules.values():
- dts_path = module_dts_overlay_name(module_path, self.config.zephyr_board)
+ dts_path = module_dts_overlay_name(
+ module_path, self.config.zephyr_board
+ )
if dts_path.is_file():
overlays.append(dts_path.resolve())
@@ -148,7 +162,9 @@ class Project:
support_class = toolchains.support_classes[name]
toolchain = support_class(name=name, modules=module_paths)
if toolchain.probe():
- logging.info("Toolchain %r selected by probe function.", toolchain)
+ logging.info(
+ "Toolchain %r selected by probe function.", toolchain
+ )
return toolchain
raise OSError(
"No supported toolchains could be found on your system. If you see "
diff --git a/zephyr/zmake/zmake/zmake.py b/zephyr/zmake/zmake/zmake.py
index 3ccda6d7cd..6e271a408c 100644
--- a/zephyr/zmake/zmake/zmake.py
+++ b/zephyr/zmake/zmake/zmake.py
@@ -171,12 +171,16 @@ class Zmake:
if zephyr_base:
self.zephyr_base = zephyr_base
else:
- self.zephyr_base = self.checkout / "src" / "third_party" / "zephyr" / "main"
+ self.zephyr_base = (
+ self.checkout / "src" / "third_party" / "zephyr" / "main"
+ )
if modules_dir:
self.module_paths = zmake.modules.locate_from_directory(modules_dir)
else:
- self.module_paths = zmake.modules.locate_from_checkout(self.checkout)
+ self.module_paths = zmake.modules.locate_from_checkout(
+ self.checkout
+ )
if jobserver:
self.jobserver = jobserver
@@ -208,20 +212,26 @@ class Zmake:
Returns a list of projects.
"""
- found_projects = zmake.project.find_projects(self.module_paths["ec"] / "zephyr")
+ found_projects = zmake.project.find_projects(
+ self.module_paths["ec"] / "zephyr"
+ )
if all_projects:
projects = set(found_projects.values())
elif host_tests_only:
projects = {p for p in found_projects.values() if p.config.is_test}
elif boards_only:
- projects = {p for p in found_projects.values() if not p.config.is_test}
+ projects = {
+ p for p in found_projects.values() if not p.config.is_test
+ }
else:
projects = set()
for project_name in project_names:
try:
projects.add(found_projects[project_name])
except KeyError as e:
- raise KeyError("No project named {}".format(project_name)) from e
+ raise KeyError(
+ "No project named {}".format(project_name)
+ ) from e
return projects
def configure( # pylint: disable=too-many-arguments,too-many-locals
@@ -253,7 +263,9 @@ class Zmake:
boards_only=boards_only,
)
for project in projects:
- project_build_dir = pathlib.Path(build_dir) / project.config.project_name
+ project_build_dir = (
+ pathlib.Path(build_dir) / project.config.project_name
+ )
self.executor.append(
func=functools.partial(
self._configure,
@@ -377,10 +389,14 @@ class Zmake:
)
test_projects = [p for p in projects if p.config.is_test]
for project in test_projects:
- project_build_dir = pathlib.Path(build_dir) / project.config.project_name
+ project_build_dir = (
+ pathlib.Path(build_dir) / project.config.project_name
+ )
gcov = "gcov.sh-not-found"
for build_name, _ in project.iter_builds():
- target_build_dir = project_build_dir / "build-{}".format(build_name)
+ target_build_dir = project_build_dir / "build-{}".format(
+ build_name
+ )
gcov = target_build_dir / "gcov.sh"
self.executor.append(
func=functools.partial(
@@ -460,15 +476,24 @@ class Zmake:
generated_include_dir = (build_dir / "include").resolve()
base_config = zmake.build_config.BuildConfig(
- environ_defs={"ZEPHYR_BASE": str(self.zephyr_base), "PATH": "/usr/bin"},
+ environ_defs={
+ "ZEPHYR_BASE": str(self.zephyr_base),
+ "PATH": "/usr/bin",
+ },
cmake_defs={
"CMAKE_EXPORT_COMPILE_COMMANDS": "ON",
"DTS_ROOT": str(self.module_paths["ec"] / "zephyr"),
"SYSCALL_INCLUDE_DIRS": str(
- self.module_paths["ec"] / "zephyr" / "include" / "drivers"
+ self.module_paths["ec"]
+ / "zephyr"
+ / "include"
+ / "drivers"
),
"USER_CACHE_DIR": str(
- self.module_paths["ec"] / "build" / "zephyr" / "user-cache"
+ self.module_paths["ec"]
+ / "build"
+ / "zephyr"
+ / "user-cache"
),
"ZMAKE_INCLUDE_DIR": str(generated_include_dir),
"ZMAKE_PROJECT_NAME": project.config.project_name,
@@ -488,7 +513,9 @@ class Zmake:
dts_overlay_config = project.find_dts_overlays(module_paths)
- toolchain_support = project.get_toolchain(module_paths, override=toolchain)
+ toolchain_support = project.get_toolchain(
+ module_paths, override=toolchain
+ )
toolchain_config = toolchain_support.get_build_config()
if bringup:
@@ -557,7 +584,9 @@ class Zmake:
shutil.rmtree(output_dir)
self.logger.info(
- "Configuring %s:%s.", project.config.project_name, build_name
+ "Configuring %s:%s.",
+ project.config.project_name,
+ build_name,
)
kconfig_file = build_dir / "kconfig-{}.conf".format(build_name)
@@ -601,8 +630,12 @@ class Zmake:
# To reconstruct a Project object later, we need to know the
# name and project directory.
- (build_dir / "project_name.txt").write_text(project.config.project_name)
- util.update_symlink(project.config.project_dir, build_dir / "project")
+ (build_dir / "project_name.txt").write_text(
+ project.config.project_name
+ )
+ util.update_symlink(
+ project.config.project_dir, build_dir / "project"
+ )
output_files = []
if build_after_configure or test_after_configure:
@@ -778,11 +811,17 @@ class Zmake:
if coverage and not project.config.is_test:
with self.jobserver.get_job():
self._run_lcov(
- build_dir, output_dir / "zephyr.info", initial=True, gcov=gcov
+ build_dir,
+ output_dir / "zephyr.info",
+ initial=True,
+ gcov=gcov,
)
else:
for output_file, output_name in project.packer.pack_firmware(
- packer_work_dir, self.jobserver, dirs, version_string=version_string
+ packer_work_dir,
+ self.jobserver,
+ dirs,
+ version_string=version_string,
):
shutil.copy2(output_file, output_dir / output_name)
self.logger.debug("Output file '%s' created.", output_file)
@@ -850,7 +889,9 @@ class Zmake:
if proc.wait(timeout=timeout):
raise OSError(get_process_failure_msg(proc))
if coverage:
- self._run_lcov(build_dir, lcov_file, initial=False, gcov=gcov)
+ self._run_lcov(
+ build_dir, lcov_file, initial=False, gcov=gcov
+ )
except subprocess.TimeoutExpired as e:
proc.terminate()
try:
@@ -869,7 +910,11 @@ class Zmake:
raise
def _run_lcov(
- self, build_dir, lcov_file, initial=False, gcov: Union[os.PathLike, str] = ""
+ self,
+ build_dir,
+ lcov_file,
+ initial=False,
+ gcov: Union[os.PathLike, str] = "",
):
gcov = os.path.abspath(gcov)
if initial:
@@ -922,7 +967,9 @@ class Zmake:
def _merge_lcov_files(self, projects, build_dir, output_file):
all_lcov_files = []
for project in projects:
- project_build_dir = pathlib.Path(build_dir) / project.config.project_name
+ project_build_dir = (
+ pathlib.Path(build_dir) / project.config.project_name
+ )
all_lcov_files.append(project_build_dir / "output" / "zephyr.info")
with self.jobserver.get_job():
# Merge info files into a single lcov.info