summaryrefslogtreecommitdiff
path: root/extra/usb_power
diff options
context:
space:
mode:
authorTom Hughes <tomhughes@chromium.org>2022-09-21 14:10:01 -0700
committerTom Hughes <tomhughes@chromium.org>2022-09-22 12:49:33 -0700
commit2bcf863b492fe7ed8105c853814dba6ed32ba719 (patch)
treefcf6ce5810f9ff9e3c8cce434812dd75492269ed /extra/usb_power
parente5fb0b9ba488614b5684e640530f00821ab7b943 (diff)
parent28712dae9d7ed1e694f7622cc083afa71090d4d5 (diff)
downloadchrome-ec-firmware-fpmcu-bloonchipper-release.tar.gz
Merge remote-tracking branch cros/main into firmware-fpmcu-bloonchipper-releasefirmware-fpmcu-bloonchipper-release
Generated by: ./util/update_release_branch.py --board bloonchipper --relevant_paths_file ./util/fingerprint-relevant-paths.txt firmware- fpmcu-bloonchipper-release Relevant changes: git log --oneline e5fb0b9ba4..28712dae9d -- board/hatch_fp board/bloonchipper common/fpsensor docs/fingerprint driver/fingerprint util/getversion.sh ded9307b79 util/getversion.sh: Fix version when not in a git repo 956055e692 board: change Google USB vendor info 71b2ef709d Update license boilerplate text in source code files 33e11afda0 Revert "fpsensor: Build fpsensor source file with C++" c8d0360723 fpsensor: Build fpsensor source file with C++ bc113abd53 fpsensor: Fix g++ compiler error 150a58a0dc fpsensor: Fix fp_set_sensor_mode return type b33b5ce85b fpsensor: Remove nested designators for C++ compatibility 2e864b2539 tree-wide: const-ify argv for console commands 56d8b360f9 test: Add test for get ikm failure when seed not set 3a3d6c3690 test: Add test for fpsensor trivial key failure 233e6bbd08 fpsensor_crypto: Abstract calls to hmac_SHA256 0a041b285b docs/fingerprint: Typo correction c03fab67e2 docs/fingerprint: Fix the path of fputils.py 0b5d4baf5a util/getversion.sh: Fix empty file list handling 6e128fe760 FPMCU dev board environment with Satlab 3eb29b6aa5 builtin: Move ssize_t to sys/types.h 345d62ebd1 docs/fingerprint: Update power numbers for latest dartmonkey release c25ffdb316 common: Conditionally support printf %l and %i modifiers 9a3c514b45 test: Add a test to check if the debugger is connected 54e603413f Move standard library tests to their own file 43fa6b4bf8 docs/fingerprint: Update power numbers for latest bloonchipper release 25536f9a84 driver/fingerprint/fpc/bep/fpc_sensor_spi.c: Format with clang-format 4face99efd driver/fingerprint/fpc/libfp/fpc_sensor_pal.h: Format with clang-format 738de2b575 trng: Rename rand to trng_rand 14b8270edd docs/fingerprint: Update dragonclaw power numbers 0b268f93d1 driver/fingerprint/fpc/libfp/fpc_private.c: Format with clang-format f80da163f2 driver/fingerprint/fpc/libfp/fpc_private.h: Format with clang-format 5e9c85c9b1 driver/fingerprint/fpc/libfp/fpc_sensor_pal.c: Format with clang-format c1f9dd3cf8 driver/fingerprint/fpc/libfp/fpc_bio_algorithm.h: Format with clang-format eb1e1bed8d driver/fingerprint/fpc/libfp/fpc1145_private.h: Format with clang-format 6e7b611821 driver/fingerprint/fpc/bep/fpc_bio_algorithm.h: Format with clang-format e0589cd5e2 driver/fingerprint/fpc/bep/fpc1035_private.h: Format with clang-format 7905e556a0 common/fpsensor/fpsensor_crypto.c: Format with clang-format 21289d170c driver/fingerprint/fpc/bep/fpc1025_private.h: Format with clang-format 98a20f937e common/fpsensor/fpsensor_state.c: Format with clang-format a2d255d8af common/fpsensor/fpsensor.c: Format with clang-format 73055eeb3f driver/fingerprint/fpc/bep/fpc_private.c: Format with clang-format 0f7b5cb509 common/fpsensor/fpsensor_private.h: Format with clang-format 1ceade6e65 driver/fingerprint/fpc/bep/fpc_private.h: Format with clang-format dc3e9008b8 board/hatch_fp/board.h: Format with clang-format dca9d74321 Revert "trng: Rename rand to trng_rand" a6b0b3554f trng: Rename rand to trng_rand 28d0b75b70 third_party/boringssl: Remove unused header BRANCH=None BUG=b:246424843 b:234181908 b:244781166 b:234181908 b:244387210 BUG=b:242720240 chromium:1098010 b:180945056 b:236025198 b:234181908 BUG=b:234181908 b:237344361 b:131913998 b:236386294 b:234143158 BUG=b:234781655 b:215613183 b:242720910 TEST=`make -j buildall` TEST=./test/run_device_tests.py --board bloonchipper Test "aes": PASSED Test "cec": PASSED Test "cortexm_fpu": PASSED Test "crc": PASSED Test "flash_physical": PASSED Test "flash_write_protect": PASSED Test "fpsensor_hw": PASSED Test "fpsensor_spi_ro": PASSED Test "fpsensor_spi_rw": PASSED Test "fpsensor_uart_ro": PASSED Test "fpsensor_uart_rw": PASSED Test "mpu_ro": PASSED Test "mpu_rw": PASSED Test "mutex": PASSED Test "pingpong": PASSED Test "printf": PASSED Test "queue": PASSED Test "rollback_region0": PASSED Test "rollback_region1": PASSED Test "rollback_entropy": PASSED Test "rtc": PASSED Test "sha256": PASSED Test "sha256_unrolled": PASSED Test "static_if": PASSED Test "stdlib": PASSED Test "system_is_locked_wp_on": PASSED Test "system_is_locked_wp_off": PASSED Test "timer_dos": PASSED Test "utils": PASSED Test "utils_str": PASSED Test "stm32f_rtc": PASSED Test "panic_data_bloonchipper_v2.0.4277": PASSED Test "panic_data_bloonchipper_v2.0.5938": PASSED Force-Relevant-Builds: all Signed-off-by: Tom Hughes <tomhughes@chromium.org> Change-Id: I264ad0ffe7afcd507a1e483c6e934a9c4fea47c3
Diffstat (limited to 'extra/usb_power')
-rw-r--r--extra/usb_power/convert_power_log_board.py47
-rwxr-xr-xextra/usb_power/convert_servo_ina.py95
-rwxr-xr-xextra/usb_power/powerlog.py1825
-rw-r--r--extra/usb_power/powerlog_unittest.py82
-rw-r--r--extra/usb_power/stats_manager.py769
-rw-r--r--extra/usb_power/stats_manager_unittest.py609
6 files changed, 1810 insertions, 1617 deletions
diff --git a/extra/usb_power/convert_power_log_board.py b/extra/usb_power/convert_power_log_board.py
index 8aab77ee4c..f5fb7e925d 100644
--- a/extra/usb_power/convert_power_log_board.py
+++ b/extra/usb_power/convert_power_log_board.py
@@ -1,11 +1,7 @@
#!/usr/bin/env python
-# Copyright 2018 The Chromium OS Authors. All rights reserved.
+# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""
Program to convert sweetberry config to servod config template.
@@ -14,11 +10,12 @@ Program to convert sweetberry config to servod config template.
# Note: This is a py2/3 compatible file.
from __future__ import print_function
+
import json
import os
import sys
-from powerlog import Spower
+from powerlog import Spower # pylint:disable=import-error
def fetch_records(board_file):
@@ -48,21 +45,29 @@ def write_to_file(file, sweetberry, inas):
inas: list of inas read from board file.
"""
- with open(file, 'w') as pyfile:
+ with open(file, "w") as pyfile:
- pyfile.write('inas = [\n')
+ pyfile.write("inas = [\n")
for rec in inas:
- if rec['sweetberry'] != sweetberry:
+ if rec["sweetberry"] != sweetberry:
continue
# EX : ('sweetberry', 0x40, 'SB_FW_CAM_2P8', 5.0, 1.000, 3, False),
- channel, i2c_addr = Spower.CHMAP[rec['channel']]
- record = (" ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')"
- ",\n" % (i2c_addr, rec['name'], rec['rs'], channel))
+ channel, i2c_addr = Spower.CHMAP[rec["channel"]]
+ record = (
+ " ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')"
+ ",\n"
+ % (
+ i2c_addr,
+ rec["name"],
+ rec["rs"],
+ channel,
+ )
+ )
pyfile.write(record)
- pyfile.write(']\n')
+ pyfile.write("]\n")
def main(argv):
@@ -76,16 +81,18 @@ def main(argv):
inas = fetch_records(inputf)
- sweetberry = set(rec['sweetberry'] for rec in inas)
+ sweetberry = set(rec["sweetberry"] for rec in inas)
if len(sweetberry) == 2:
- print("Converting %s to %s and %s" % (inputf, basename + '_a.py',
- basename + '_b.py'))
- write_to_file(basename + '_a.py', 'A', inas)
- write_to_file(basename + '_b.py', 'B', inas)
+ print(
+ "Converting %s to %s and %s"
+ % (inputf, basename + "_a.py", basename + "_b.py")
+ )
+ write_to_file(basename + "_a.py", "A", inas)
+ write_to_file(basename + "_b.py", "B", inas)
else:
- print("Converting %s to %s" % (inputf, basename + '.py'))
- write_to_file(basename + '.py', sweetberry.pop(), inas)
+ print("Converting %s to %s" % (inputf, basename + ".py"))
+ write_to_file(basename + ".py", sweetberry.pop(), inas)
if __name__ == "__main__":
diff --git a/extra/usb_power/convert_servo_ina.py b/extra/usb_power/convert_servo_ina.py
index 1c70f31aeb..1deb75cda4 100755
--- a/extra/usb_power/convert_servo_ina.py
+++ b/extra/usb_power/convert_servo_ina.py
@@ -1,11 +1,7 @@
#!/usr/bin/env python
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Program to convert power logging config from a servo_ina device
to a sweetberry config.
@@ -14,67 +10,74 @@
# Note: This is a py2/3 compatible file.
from __future__ import print_function
+
import os
import sys
def fetch_records(basename):
- """Import records from servo_ina file.
+ """Import records from servo_ina file.
- servo_ina files are python imports, and have a list of tuples with
- the INA data.
- (inatype, i2caddr, rail name, bus voltage, shunt ohms, mux, True)
+ servo_ina files are python imports, and have a list of tuples with
+ the INA data.
+ (inatype, i2caddr, rail name, bus voltage, shunt ohms, mux, True)
- Args:
- basename: python import name (filename -.py)
+ Args:
+ basename: python import name (filename -.py)
- Returns:
- list of tuples as described above.
- """
- ina_desc = __import__(basename)
- return ina_desc.inas
+ Returns:
+ list of tuples as described above.
+ """
+ ina_desc = __import__(basename)
+ return ina_desc.inas
def main(argv):
- if len(argv) != 2:
- print("usage:")
- print(" %s input.py" % argv[0])
- return
+ if len(argv) != 2:
+ print("usage:")
+ print(" %s input.py" % argv[0])
+ return
- inputf = argv[1]
- basename = os.path.splitext(inputf)[0]
- outputf = basename + '.board'
- outputs = basename + '.scenario'
+ inputf = argv[1]
+ basename = os.path.splitext(inputf)[0]
+ outputf = basename + ".board"
+ outputs = basename + ".scenario"
- print("Converting %s to %s, %s" % (inputf, outputf, outputs))
+ print("Converting %s to %s, %s" % (inputf, outputf, outputs))
- inas = fetch_records(basename)
+ inas = fetch_records(basename)
+ boardfile = open(outputf, "w")
+ scenario = open(outputs, "w")
- boardfile = open(outputf, 'w')
- scenario = open(outputs, 'w')
+ boardfile.write("[\n")
+ scenario.write("[\n")
+ start = True
- boardfile.write('[\n')
- scenario.write('[\n')
- start = True
+ for rec in inas:
+ if start:
+ start = False
+ else:
+ boardfile.write(",\n")
+ scenario.write(",\n")
- for rec in inas:
- if start:
- start = False
- else:
- boardfile.write(',\n')
- scenario.write(',\n')
+ record = (
+ ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}'
+ % (
+ rec[2],
+ rec[4],
+ rec[1] - 64,
+ )
+ )
+ boardfile.write(record)
+ scenario.write('"%s"' % rec[2])
- record = ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}' % (
- rec[2], rec[4], rec[1] - 64)
- boardfile.write(record)
- scenario.write('"%s"' % rec[2])
+ boardfile.write("\n")
+ boardfile.write("]")
- boardfile.write('\n')
- boardfile.write(']')
+ scenario.write("\n")
+ scenario.write("]")
- scenario.write('\n')
- scenario.write(']')
if __name__ == "__main__":
- main(sys.argv)
+ main(sys.argv)
diff --git a/extra/usb_power/powerlog.py b/extra/usb_power/powerlog.py
index 82cce3daed..13e41bd23a 100755
--- a/extra/usb_power/powerlog.py
+++ b/extra/usb_power/powerlog.py
@@ -1,11 +1,7 @@
#!/usr/bin/env python
-# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Copyright 2016 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Program to fetch power logging data from a sweetberry device
or other usb device that exports a USB power logging interface.
@@ -14,9 +10,9 @@
# Note: This is a py2/3 compatible file.
from __future__ import print_function
+
import argparse
import array
-from distutils import sysconfig
import json
import logging
import os
@@ -25,884 +21,1041 @@ import struct
import sys
import time
import traceback
+from distutils import sysconfig
-import usb
-
-from stats_manager import StatsManager
+import usb # pylint:disable=import-error
+from stats_manager import StatsManager # pylint:disable=import-error
# Directory where hdctools installs configuration files into.
-LIB_DIR = os.path.join(sysconfig.get_python_lib(standard_lib=False), 'servo',
- 'data')
+LIB_DIR = os.path.join(
+ sysconfig.get_python_lib(standard_lib=False), "servo", "data"
+)
# Potential config file locations: current working directory, the same directory
# as powerlog.py file or LIB_DIR.
-CONFIG_LOCATIONS = [os.getcwd(), os.path.dirname(os.path.realpath(__file__)),
- LIB_DIR]
-
-def logoutput(msg):
- print(msg)
- sys.stdout.flush()
-
-def process_filename(filename):
- """Find the file path from the filename.
-
- If filename is already the complete path, return that directly. If filename is
- just the short name, look for the file in the current working directory, in
- the directory of the current .py file, and then in the directory installed by
- hdctools. If the file is found, return the complete path of the file.
-
- Args:
- filename: complete file path or short file name.
-
- Returns:
- a complete file path.
-
- Raises:
- IOError if filename does not exist.
- """
- # Check if filename is absolute path.
- if os.path.isabs(filename) and os.path.isfile(filename):
- return filename
- # Check if filename is relative to a known config location.
- for dirname in CONFIG_LOCATIONS:
- file_at_dir = os.path.join(dirname, filename)
- if os.path.isfile(file_at_dir):
- return file_at_dir
- raise IOError('No such file or directory: \'%s\'' % filename)
-
-
-class Spower(object):
- """Power class to access devices on the bus.
-
- Usage:
- bus = Spower()
-
- Instance Variables:
- _dev: pyUSB device object
- _read_ep: pyUSB read endpoint for this interface
- _write_ep: pyUSB write endpoint for this interface
- """
-
- # INA interface type.
- INA_POWER = 1
- INA_BUSV = 2
- INA_CURRENT = 3
- INA_SHUNTV = 4
- # INA_SUFFIX is used to differentiate multiple ina types for the same power
- # rail. No suffix for when ina type is 0 (non-existent) and when ina type is 1
- # (power, no suffix for backward compatibility).
- INA_SUFFIX = ['', '', '_busv', '_cur', '_shuntv']
-
- # usb power commands
- CMD_RESET = 0x0000
- CMD_STOP = 0x0001
- CMD_ADDINA = 0x0002
- CMD_START = 0x0003
- CMD_NEXT = 0x0004
- CMD_SETTIME = 0x0005
-
- # Map between header channel number (0-47)
- # and INA I2C bus/addr on sweetberry.
- CHMAP = {
- 0: (3, 0x40),
- 1: (1, 0x40),
- 2: (2, 0x40),
- 3: (0, 0x40),
- 4: (3, 0x41),
- 5: (1, 0x41),
- 6: (2, 0x41),
- 7: (0, 0x41),
- 8: (3, 0x42),
- 9: (1, 0x42),
- 10: (2, 0x42),
- 11: (0, 0x42),
- 12: (3, 0x43),
- 13: (1, 0x43),
- 14: (2, 0x43),
- 15: (0, 0x43),
- 16: (3, 0x44),
- 17: (1, 0x44),
- 18: (2, 0x44),
- 19: (0, 0x44),
- 20: (3, 0x45),
- 21: (1, 0x45),
- 22: (2, 0x45),
- 23: (0, 0x45),
- 24: (3, 0x46),
- 25: (1, 0x46),
- 26: (2, 0x46),
- 27: (0, 0x46),
- 28: (3, 0x47),
- 29: (1, 0x47),
- 30: (2, 0x47),
- 31: (0, 0x47),
- 32: (3, 0x48),
- 33: (1, 0x48),
- 34: (2, 0x48),
- 35: (0, 0x48),
- 36: (3, 0x49),
- 37: (1, 0x49),
- 38: (2, 0x49),
- 39: (0, 0x49),
- 40: (3, 0x4a),
- 41: (1, 0x4a),
- 42: (2, 0x4a),
- 43: (0, 0x4a),
- 44: (3, 0x4b),
- 45: (1, 0x4b),
- 46: (2, 0x4b),
- 47: (0, 0x4b),
- }
-
- def __init__(self, board, vendor=0x18d1,
- product=0x5020, interface=1, serialname=None):
- self._logger = logging.getLogger(__name__)
- self._board = board
-
- # Find the stm32.
- dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True)
- dev_list = list(dev_g)
- if dev_list is None:
- raise Exception("Power", "USB device not found")
-
- # Check if we have multiple stm32s and we've specified the serial.
- dev = None
- if serialname:
- for d in dev_list:
- dev_serial = "PyUSB dioesn't have a stable interface"
- try:
- dev_serial = usb.util.get_string(d, 256, d.iSerialNumber)
- except ValueError:
- # Incompatible pyUsb version.
- dev_serial = usb.util.get_string(d, d.iSerialNumber)
- if dev_serial == serialname:
- dev = d
- break
- if dev is None:
- raise Exception("Power", "USB device(%s) not found" % serialname)
- else:
- try:
- dev = dev_list[0]
- except TypeError:
- # Incompatible pyUsb version.
- dev = dev_list.next()
-
- self._logger.debug("Found USB device: %04x:%04x", vendor, product)
- self._dev = dev
-
- # Get an endpoint instance.
- try:
- dev.set_configuration()
- except usb.USBError:
- pass
- cfg = dev.get_active_configuration()
-
- intf = usb.util.find_descriptor(cfg, custom_match=lambda i: \
- i.bInterfaceClass==255 and i.bInterfaceSubClass==0x54)
-
- self._intf = intf
- self._logger.debug("InterfaceNumber: %s", intf.bInterfaceNumber)
-
- read_ep = usb.util.find_descriptor(
- intf,
- # match the first IN endpoint
- custom_match = \
- lambda e: \
- usb.util.endpoint_direction(e.bEndpointAddress) == \
- usb.util.ENDPOINT_IN
- )
-
- self._read_ep = read_ep
- self._logger.debug("Reader endpoint: 0x%x", read_ep.bEndpointAddress)
-
- write_ep = usb.util.find_descriptor(
- intf,
- # match the first OUT endpoint
- custom_match = \
- lambda e: \
- usb.util.endpoint_direction(e.bEndpointAddress) == \
- usb.util.ENDPOINT_OUT
- )
-
- self._write_ep = write_ep
- self._logger.debug("Writer endpoint: 0x%x", write_ep.bEndpointAddress)
-
- self.clear_ina_struct()
-
- self._logger.debug("Found power logging USB endpoint.")
-
- def clear_ina_struct(self):
- """ Clear INA description struct."""
- self._inas = []
+CONFIG_LOCATIONS = [
+ os.getcwd(),
+ os.path.dirname(os.path.realpath(__file__)),
+ LIB_DIR,
+]
- def append_ina_struct(self, name, rs, port, addr,
- data=None, ina_type=INA_POWER):
- """Add an INA descriptor into the list of active INAs.
- Args:
- name: Readable name of this channel.
- rs: Sense resistor value in ohms, floating point.
- port: I2C channel this INA is connected to.
- addr: I2C addr of this INA.
- data: Misc data for special handling, board specific.
- ina_type: INA function to use, power, voltage, etc.
- """
- ina = {}
- ina['name'] = name
- ina['rs'] = rs
- ina['port'] = port
- ina['addr'] = addr
- ina['type'] = ina_type
- # Calculate INA231 Calibration register
- # (see INA231 spec p.15)
- # CurrentLSB = uA per div = 80mV / (Rsh * 2^15)
- # CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000)
- ina['uAscale'] = 80000000. / (rs * 0x8000);
- ina['uWscale'] = 25. * ina['uAscale'];
- ina['mVscale'] = 1.25
- ina['uVscale'] = 2.5
- ina['data'] = data
- self._inas.append(ina)
-
- def wr_command(self, write_list, read_count=1, wtimeout=100, rtimeout=1000):
- """Write command to logger logic.
-
- This function writes byte command values list to stm, then reads
- byte status.
-
- Args:
- write_list: list of command byte values [0~255].
- read_count: number of status byte values to read.
-
- Interface:
- write: [command, data ... ]
- read: [status ]
-
- Returns:
- bytes read, or None on failure.
- """
- self._logger.debug("Spower.wr_command(write_list=[%s] (%d), read_count=%s)",
- list(bytearray(write_list)), len(write_list), read_count)
-
- # Clean up args from python style to correct types.
- write_length = 0
- if write_list:
- write_length = len(write_list)
- if not read_count:
- read_count = 0
-
- # Send command to stm32.
- if write_list:
- cmd = write_list
- ret = self._write_ep.write(cmd, wtimeout)
-
- self._logger.debug("RET: %s ", ret)
-
- # Read back response if necessary.
- if read_count:
- bytesread = self._read_ep.read(512, rtimeout)
- self._logger.debug("BYTES: [%s]", bytesread)
-
- if len(bytesread) != read_count:
- pass
-
- self._logger.debug("STATUS: 0x%02x", int(bytesread[0]))
- if read_count == 1:
- return bytesread[0]
- else:
- return bytesread
-
- return None
-
- def clear(self):
- """Clear pending reads on the stm32"""
- try:
- while True:
- ret = self.wr_command(b"", read_count=512, rtimeout=100, wtimeout=50)
- self._logger.debug("Try Clear: read %s",
- "success" if ret == 0 else "failure")
- except:
- pass
-
- def send_reset(self):
- """Reset the power interface on the stm32"""
- cmd = struct.pack("<H", self.CMD_RESET)
- ret = self.wr_command(cmd, rtimeout=50, wtimeout=50)
- self._logger.debug("Command RESET: %s",
- "success" if ret == 0 else "failure")
-
- def reset(self):
- """Try resetting the USB interface until success.
-
- Use linear back off strategy when encounter the error with 10ms increment.
-
- Raises:
- Exception on failure.
- """
- max_reset_retry = 100
- for count in range(1, max_reset_retry + 1):
- self.clear()
- try:
- self.send_reset()
- return
- except Exception as e:
- self.clear()
- self.clear()
- self._logger.debug("TRY %d of %d: %s", count, max_reset_retry, e)
- time.sleep(count * 0.01)
- raise Exception("Power", "Failed to reset")
-
- def stop(self):
- """Stop any active data acquisition."""
- cmd = struct.pack("<H", self.CMD_STOP)
- ret = self.wr_command(cmd)
- self._logger.debug("Command STOP: %s",
- "success" if ret == 0 else "failure")
-
- def start(self, integration_us):
- """Start data acquisition.
-
- Args:
- integration_us: int, how many us between samples, and
- how often the data block must be read.
+def logoutput(msg):
+ print(msg)
+ sys.stdout.flush()
- Returns:
- actual sampling interval in ms.
- """
- cmd = struct.pack("<HI", self.CMD_START, integration_us)
- read = self.wr_command(cmd, read_count=5)
- actual_us = 0
- if len(read) == 5:
- ret, actual_us = struct.unpack("<BI", read)
- self._logger.debug("Command START: %s %dus",
- "success" if ret == 0 else "failure", actual_us)
- else:
- self._logger.debug("Command START: FAIL")
- return actual_us
+def process_filename(filename):
+ """Find the file path from the filename.
- def add_ina_name(self, name_tuple):
- """Add INA from board config.
+ If filename is already the complete path, return that directly. If filename is
+ just the short name, look for the file in the current working directory, in
+ the directory of the current .py file, and then in the directory installed by
+ hdctools. If the file is found, return the complete path of the file.
Args:
- name_tuple: name and type of power rail in board config.
+ filename: complete file path or short file name.
Returns:
- True if INA added, False if the INA is not on this board.
+ a complete file path.
Raises:
- Exception on unexpected failure.
+ IOError if filename does not exist.
"""
- name, ina_type = name_tuple
-
- for datum in self._brdcfg:
- if datum["name"] == name:
- rs = int(float(datum["rs"]) * 1000.)
- board = datum["sweetberry"]
-
- if board == self._board:
- if 'port' in datum and 'addr' in datum:
- port = datum['port']
- addr = datum['addr']
- else:
- channel = int(datum["channel"])
- port, addr = self.CHMAP[channel]
- self.add_ina(port, ina_type, addr, 0, rs, data=datum)
- return True
- else:
- return False
- raise Exception("Power", "Failed to find INA %s" % name)
+ # Check if filename is absolute path.
+ if os.path.isabs(filename) and os.path.isfile(filename):
+ return filename
+ # Check if filename is relative to a known config location.
+ for dirname in CONFIG_LOCATIONS:
+ file_at_dir = os.path.join(dirname, filename)
+ if os.path.isfile(file_at_dir):
+ return file_at_dir
+ raise IOError("No such file or directory: '%s'" % filename)
- def set_time(self, timestamp_us):
- """Set sweetberry time to match host time.
- Args:
- timestamp_us: host timestmap in us.
- """
- # 0x0005 , 8 byte timestamp
- cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us)
- ret = self.wr_command(cmd)
-
- self._logger.debug("Command SETTIME: %s",
- "success" if ret == 0 else "failure")
+class Spower(object):
+ """Power class to access devices on the bus.
- def add_ina(self, bus, ina_type, addr, extra, resistance, data=None):
- """Add an INA to the data acquisition list.
+ Usage:
+ bus = Spower()
- Args:
- bus: which i2c bus the INA is on. Same ordering as Si2c.
- ina_type: Ina interface: INA_POWER/BUSV/etc.
- addr: 7 bit i2c addr of this INA
- extra: extra data for nonstandard configs.
- resistance: int, shunt resistance in mOhm
+ Instance Variables:
+ _dev: pyUSB device object
+ _read_ep: pyUSB read endpoint for this interface
+ _write_ep: pyUSB write endpoint for this interface
"""
- # 0x0002, 1B: bus, 1B:INA type, 1B: INA addr, 1B: extra, 4B: Rs
- cmd = struct.pack("<HBBBBI", self.CMD_ADDINA,
- bus, ina_type, addr, extra, resistance)
- ret = self.wr_command(cmd)
- if ret == 0:
- if data:
- name = data['name']
- else:
- name = "ina%d_%02x" % (bus, addr)
- self.append_ina_struct(name, resistance, bus, addr,
- data=data, ina_type=ina_type)
- self._logger.debug("Command ADD_INA: %s",
- "success" if ret == 0 else "failure")
-
- def report_header_size(self):
- """Helper function to calculate power record header size."""
- result = 2
- timestamp = 8
- return result + timestamp
-
- def report_size(self, ina_count):
- """Helper function to calculate full power record size."""
- record = 2
-
- datasize = self.report_header_size() + ina_count * record
- # Round to multiple of 4 bytes.
- datasize = int(((datasize + 3) // 4) * 4)
-
- return datasize
-
- def read_line(self):
- """Read a line of data from the setup INAs
- Returns:
- list of dicts of the values read by ina/type tuple, otherwise None.
- [{ts:100, (vbat, power):450}, {ts:200, (vbat, power):440}]
- """
- try:
- expected_bytes = self.report_size(len(self._inas))
- cmd = struct.pack("<H", self.CMD_NEXT)
- bytesread = self.wr_command(cmd, read_count=expected_bytes)
- except usb.core.USBError as e:
- self._logger.error("READ LINE FAILED %s", e)
- return None
-
- if len(bytesread) == 1:
- if bytesread[0] != 0x6:
- self._logger.debug("READ LINE FAILED bytes: %d ret: %02x",
- len(bytesread), bytesread[0])
- return None
-
- if len(bytesread) % expected_bytes != 0:
- self._logger.debug("READ LINE WARNING: expected %d, got %d",
- expected_bytes, len(bytesread))
-
- packet_count = len(bytesread) // expected_bytes
-
- values = []
- for i in range(0, packet_count):
- start = i * expected_bytes
- end = (i + 1) * expected_bytes
- record = self.interpret_line(bytesread[start:end])
- values.append(record)
-
- return values
-
- def interpret_line(self, data):
- """Interpret a power record from INAs
+ # INA interface type.
+ INA_POWER = 1
+ INA_BUSV = 2
+ INA_CURRENT = 3
+ INA_SHUNTV = 4
+ # INA_SUFFIX is used to differentiate multiple ina types for the same power
+ # rail. No suffix for when ina type is 0 (non-existent) and when ina type is 1
+ # (power, no suffix for backward compatibility).
+ INA_SUFFIX = ["", "", "_busv", "_cur", "_shuntv"]
+
+ # usb power commands
+ CMD_RESET = 0x0000
+ CMD_STOP = 0x0001
+ CMD_ADDINA = 0x0002
+ CMD_START = 0x0003
+ CMD_NEXT = 0x0004
+ CMD_SETTIME = 0x0005
+
+ # Map between header channel number (0-47)
+ # and INA I2C bus/addr on sweetberry.
+ CHMAP = {
+ 0: (3, 0x40),
+ 1: (1, 0x40),
+ 2: (2, 0x40),
+ 3: (0, 0x40),
+ 4: (3, 0x41),
+ 5: (1, 0x41),
+ 6: (2, 0x41),
+ 7: (0, 0x41),
+ 8: (3, 0x42),
+ 9: (1, 0x42),
+ 10: (2, 0x42),
+ 11: (0, 0x42),
+ 12: (3, 0x43),
+ 13: (1, 0x43),
+ 14: (2, 0x43),
+ 15: (0, 0x43),
+ 16: (3, 0x44),
+ 17: (1, 0x44),
+ 18: (2, 0x44),
+ 19: (0, 0x44),
+ 20: (3, 0x45),
+ 21: (1, 0x45),
+ 22: (2, 0x45),
+ 23: (0, 0x45),
+ 24: (3, 0x46),
+ 25: (1, 0x46),
+ 26: (2, 0x46),
+ 27: (0, 0x46),
+ 28: (3, 0x47),
+ 29: (1, 0x47),
+ 30: (2, 0x47),
+ 31: (0, 0x47),
+ 32: (3, 0x48),
+ 33: (1, 0x48),
+ 34: (2, 0x48),
+ 35: (0, 0x48),
+ 36: (3, 0x49),
+ 37: (1, 0x49),
+ 38: (2, 0x49),
+ 39: (0, 0x49),
+ 40: (3, 0x4A),
+ 41: (1, 0x4A),
+ 42: (2, 0x4A),
+ 43: (0, 0x4A),
+ 44: (3, 0x4B),
+ 45: (1, 0x4B),
+ 46: (2, 0x4B),
+ 47: (0, 0x4B),
+ }
+
+ def __init__(
+ self, board, vendor=0x18D1, product=0x5020, interface=1, serialname=None
+ ):
+ self._logger = logging.getLogger(__name__)
+ self._board = board
+
+ # Find the stm32.
+ dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True)
+ dev_list = list(dev_g)
+ if dev_list is None:
+ raise Exception("Power", "USB device not found")
+
+ # Check if we have multiple stm32s and we've specified the serial.
+ dev = None
+ if serialname:
+ for d in dev_list:
+ dev_serial = "PyUSB dioesn't have a stable interface"
+ try:
+ dev_serial = usb.util.get_string(d, 256, d.iSerialNumber)
+ except ValueError:
+ # Incompatible pyUsb version.
+ dev_serial = usb.util.get_string(d, d.iSerialNumber)
+ if dev_serial == serialname:
+ dev = d
+ break
+ if dev is None:
+ raise Exception(
+ "Power", "USB device(%s) not found" % serialname
+ )
+ else:
+ dev = dev_list[0]
- Args:
- data: one single record of bytes.
+ self._logger.debug("Found USB device: %04x:%04x", vendor, product)
+ self._dev = dev
- Output:
- stdout of the record in csv format.
+ # Get an endpoint instance.
+ try:
+ dev.set_configuration()
+ except usb.USBError:
+ pass
+ cfg = dev.get_active_configuration()
+
+ intf = usb.util.find_descriptor(
+ cfg,
+ custom_match=lambda i: i.bInterfaceClass == 255
+ and i.bInterfaceSubClass == 0x54,
+ )
+
+ self._intf = intf
+ self._logger.debug("InterfaceNumber: %s", intf.bInterfaceNumber)
+
+ read_ep = usb.util.find_descriptor(
+ intf,
+ # match the first IN endpoint
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
+ == usb.util.ENDPOINT_IN,
+ )
+
+ self._read_ep = read_ep
+ self._logger.debug("Reader endpoint: 0x%x", read_ep.bEndpointAddress)
+
+ write_ep = usb.util.find_descriptor(
+ intf,
+ # match the first OUT endpoint
+ custom_match=lambda e: usb.util.endpoint_direction(
+ e.bEndpointAddress
+ )
+ == usb.util.ENDPOINT_OUT,
+ )
+
+ self._write_ep = write_ep
+ self._logger.debug("Writer endpoint: 0x%x", write_ep.bEndpointAddress)
+
+ self.clear_ina_struct()
+
+ self._logger.debug("Found power logging USB endpoint.")
+
+ def clear_ina_struct(self):
+ """Clear INA description struct."""
+ self._inas = []
+
+ def append_ina_struct(
+ self, name, rs, port, addr, data=None, ina_type=INA_POWER
+ ):
+ """Add an INA descriptor into the list of active INAs.
+
+ Args:
+ name: Readable name of this channel.
+ rs: Sense resistor value in ohms, floating point.
+ port: I2C channel this INA is connected to.
+ addr: I2C addr of this INA.
+ data: Misc data for special handling, board specific.
+ ina_type: INA function to use, power, voltage, etc.
+ """
+ ina = {}
+ ina["name"] = name
+ ina["rs"] = rs
+ ina["port"] = port
+ ina["addr"] = addr
+ ina["type"] = ina_type
+ # Calculate INA231 Calibration register
+ # (see INA231 spec p.15)
+ # CurrentLSB = uA per div = 80mV / (Rsh * 2^15)
+ # CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000)
+ ina["uAscale"] = 80000000.0 / (rs * 0x8000)
+ ina["uWscale"] = 25.0 * ina["uAscale"]
+ ina["mVscale"] = 1.25
+ ina["uVscale"] = 2.5
+ ina["data"] = data
+ self._inas.append(ina)
+
+ def wr_command(self, write_list, read_count=1, wtimeout=100, rtimeout=1000):
+ """Write command to logger logic.
+
+ This function writes byte command values list to stm, then reads
+ byte status.
+
+ Args:
+ write_list: list of command byte values [0~255].
+ read_count: number of status byte values to read.
+
+ Interface:
+ write: [command, data ... ]
+ read: [status ]
+
+ Returns:
+ bytes read, or None on failure.
+ """
+ self._logger.debug(
+ "Spower.wr_command(write_list=[%s] (%d), read_count=%s)",
+ list(bytearray(write_list)),
+ len(write_list),
+ read_count,
+ )
+
+ # Clean up args from python style to correct types.
+ write_length = 0
+ if write_list:
+ write_length = len(write_list)
+ if not read_count:
+ read_count = 0
+
+ # Send command to stm32.
+ if write_list:
+ cmd = write_list
+ ret = self._write_ep.write(cmd, wtimeout)
+
+ self._logger.debug("RET: %s ", ret)
+
+ # Read back response if necessary.
+ if read_count:
+ bytesread = self._read_ep.read(512, rtimeout)
+ self._logger.debug("BYTES: [%s]", bytesread)
+
+ if len(bytesread) != read_count:
+ pass
+
+ self._logger.debug("STATUS: 0x%02x", int(bytesread[0]))
+ if read_count == 1:
+ return bytesread[0]
+ else:
+ return bytesread
+
+ return None
+
+ def clear(self):
+ """Clear pending reads on the stm32"""
+ try:
+ while True:
+ ret = self.wr_command(
+ b"", read_count=512, rtimeout=100, wtimeout=50
+ )
+ self._logger.debug(
+ "Try Clear: read %s", "success" if ret == 0 else "failure"
+ )
+ except:
+ pass
+
+ def send_reset(self):
+ """Reset the power interface on the stm32"""
+ cmd = struct.pack("<H", self.CMD_RESET)
+ ret = self.wr_command(cmd, rtimeout=50, wtimeout=50)
+ self._logger.debug(
+ "Command RESET: %s", "success" if ret == 0 else "failure"
+ )
+
+ def reset(self):
+ """Try resetting the USB interface until success.
+
+ Use linear back off strategy when encounter the error with 10ms increment.
+
+ Raises:
+ Exception on failure.
+ """
+ max_reset_retry = 100
+ for count in range(1, max_reset_retry + 1):
+ self.clear()
+ try:
+ self.send_reset()
+ return
+ except Exception as e:
+ self.clear()
+ self.clear()
+ self._logger.debug(
+ "TRY %d of %d: %s", count, max_reset_retry, e
+ )
+ time.sleep(count * 0.01)
+ raise Exception("Power", "Failed to reset")
+
+ def stop(self):
+ """Stop any active data acquisition."""
+ cmd = struct.pack("<H", self.CMD_STOP)
+ ret = self.wr_command(cmd)
+ self._logger.debug(
+ "Command STOP: %s", "success" if ret == 0 else "failure"
+ )
+
+ def start(self, integration_us):
+ """Start data acquisition.
+
+ Args:
+ integration_us: int, how many us between samples, and
+ how often the data block must be read.
+
+ Returns:
+ actual sampling interval in ms.
+ """
+ cmd = struct.pack("<HI", self.CMD_START, integration_us)
+ read = self.wr_command(cmd, read_count=5)
+ actual_us = 0
+ if len(read) == 5:
+ ret, actual_us = struct.unpack("<BI", read)
+ self._logger.debug(
+ "Command START: %s %dus",
+ "success" if ret == 0 else "failure",
+ actual_us,
+ )
+ else:
+ self._logger.debug("Command START: FAIL")
+
+ return actual_us
+
+ def add_ina_name(self, name_tuple):
+ """Add INA from board config.
+
+ Args:
+ name_tuple: name and type of power rail in board config.
+
+ Returns:
+ True if INA added, False if the INA is not on this board.
+
+ Raises:
+ Exception on unexpected failure.
+ """
+ name, ina_type = name_tuple
+
+ for datum in self._brdcfg:
+ if datum["name"] == name:
+ rs = int(float(datum["rs"]) * 1000.0)
+ board = datum["sweetberry"]
+
+ if board == self._board:
+ if "port" in datum and "addr" in datum:
+ port = datum["port"]
+ addr = datum["addr"]
+ else:
+ channel = int(datum["channel"])
+ port, addr = self.CHMAP[channel]
+ self.add_ina(port, ina_type, addr, 0, rs, data=datum)
+ return True
+ else:
+ return False
+ raise Exception("Power", "Failed to find INA %s" % name)
+
+ def set_time(self, timestamp_us):
+ """Set sweetberry time to match host time.
+
+ Args:
+ timestamp_us: host timestmap in us.
+ """
+ # 0x0005 , 8 byte timestamp
+ cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us)
+ ret = self.wr_command(cmd)
+
+ self._logger.debug(
+ "Command SETTIME: %s", "success" if ret == 0 else "failure"
+ )
+
+ def add_ina(self, bus, ina_type, addr, extra, resistance, data=None):
+ """Add an INA to the data acquisition list.
+
+ Args:
+ bus: which i2c bus the INA is on. Same ordering as Si2c.
+ ina_type: Ina interface: INA_POWER/BUSV/etc.
+ addr: 7 bit i2c addr of this INA
+ extra: extra data for nonstandard configs.
+ resistance: int, shunt resistance in mOhm
+ """
+ # 0x0002, 1B: bus, 1B:INA type, 1B: INA addr, 1B: extra, 4B: Rs
+ cmd = struct.pack(
+ "<HBBBBI", self.CMD_ADDINA, bus, ina_type, addr, extra, resistance
+ )
+ ret = self.wr_command(cmd)
+ if ret == 0:
+ if data:
+ name = data["name"]
+ else:
+ name = "ina%d_%02x" % (bus, addr)
+ self.append_ina_struct(
+ name, resistance, bus, addr, data=data, ina_type=ina_type
+ )
+ self._logger.debug(
+ "Command ADD_INA: %s", "success" if ret == 0 else "failure"
+ )
+
+ def report_header_size(self):
+ """Helper function to calculate power record header size."""
+ result = 2
+ timestamp = 8
+ return result + timestamp
+
+ def report_size(self, ina_count):
+ """Helper function to calculate full power record size."""
+ record = 2
+
+ datasize = self.report_header_size() + ina_count * record
+ # Round to multiple of 4 bytes.
+ datasize = int(((datasize + 3) // 4) * 4)
+
+ return datasize
+
+ def read_line(self):
+ """Read a line of data from the setup INAs
+
+ Returns:
+ list of dicts of the values read by ina/type tuple, otherwise None.
+ [{ts:100, (vbat, power):450}, {ts:200, (vbat, power):440}]
+ """
+ try:
+ expected_bytes = self.report_size(len(self._inas))
+ cmd = struct.pack("<H", self.CMD_NEXT)
+ bytesread = self.wr_command(cmd, read_count=expected_bytes)
+ except usb.core.USBError as e:
+ self._logger.error("READ LINE FAILED %s", e)
+ return None
+
+ if len(bytesread) == 1:
+ if bytesread[0] != 0x6:
+ self._logger.debug(
+ "READ LINE FAILED bytes: %d ret: %02x",
+ len(bytesread),
+ bytesread[0],
+ )
+ return None
+
+ if len(bytesread) % expected_bytes != 0:
+ self._logger.debug(
+ "READ LINE WARNING: expected %d, got %d",
+ expected_bytes,
+ len(bytesread),
+ )
+
+ packet_count = len(bytesread) // expected_bytes
+
+ values = []
+ for i in range(0, packet_count):
+ start = i * expected_bytes
+ end = (i + 1) * expected_bytes
+ record = self.interpret_line(bytesread[start:end])
+ values.append(record)
+
+ return values
+
+ def interpret_line(self, data):
+ """Interpret a power record from INAs
+
+ Args:
+ data: one single record of bytes.
+
+ Output:
+ stdout of the record in csv format.
+
+ Returns:
+ dict containing name, value of recorded data.
+ """
+ status, size = struct.unpack("<BB", data[0:2])
+ if len(data) != self.report_size(size):
+ self._logger.error(
+ "READ LINE FAILED st:%d size:%d expected:%d len:%d",
+ status,
+ size,
+ self.report_size(size),
+ len(data),
+ )
+ else:
+ pass
- Returns:
- dict containing name, value of recorded data.
- """
- status, size = struct.unpack("<BB", data[0:2])
- if len(data) != self.report_size(size):
- self._logger.error("READ LINE FAILED st:%d size:%d expected:%d len:%d",
- status, size, self.report_size(size), len(data))
- else:
- pass
+ timestamp = struct.unpack("<Q", data[2:10])[0]
+ self._logger.debug(
+ "READ LINE: st:%d size:%d time:%dus", status, size, timestamp
+ )
+ ftimestamp = float(timestamp) / 1000000.0
- timestamp = struct.unpack("<Q", data[2:10])[0]
- self._logger.debug("READ LINE: st:%d size:%d time:%dus", status, size,
- timestamp)
- ftimestamp = float(timestamp) / 1000000.
+ record = {"ts": ftimestamp, "status": status, "berry": self._board}
- record = {"ts": ftimestamp, "status": status, "berry":self._board}
+ for i in range(0, size):
+ idx = self.report_header_size() + 2 * i
+ name = self._inas[i]["name"]
+ name_tuple = (self._inas[i]["name"], self._inas[i]["type"])
- for i in range(0, size):
- idx = self.report_header_size() + 2*i
- name = self._inas[i]['name']
- name_tuple = (self._inas[i]['name'], self._inas[i]['type'])
+ raw_val = struct.unpack("<h", data[idx : idx + 2])[0]
- raw_val = struct.unpack("<h", data[idx:idx+2])[0]
+ if self._inas[i]["type"] == Spower.INA_POWER:
+ val = raw_val * self._inas[i]["uWscale"]
+ elif self._inas[i]["type"] == Spower.INA_BUSV:
+ val = raw_val * self._inas[i]["mVscale"]
+ elif self._inas[i]["type"] == Spower.INA_CURRENT:
+ val = raw_val * self._inas[i]["uAscale"]
+ elif self._inas[i]["type"] == Spower.INA_SHUNTV:
+ val = raw_val * self._inas[i]["uVscale"]
- if self._inas[i]['type'] == Spower.INA_POWER:
- val = raw_val * self._inas[i]['uWscale']
- elif self._inas[i]['type'] == Spower.INA_BUSV:
- val = raw_val * self._inas[i]['mVscale']
- elif self._inas[i]['type'] == Spower.INA_CURRENT:
- val = raw_val * self._inas[i]['uAscale']
- elif self._inas[i]['type'] == Spower.INA_SHUNTV:
- val = raw_val * self._inas[i]['uVscale']
+ self._logger.debug(
+ "READ %d %s: %fs: 0x%04x %f", i, name, ftimestamp, raw_val, val
+ )
+ record[name_tuple] = val
- self._logger.debug("READ %d %s: %fs: 0x%04x %f", i, name, ftimestamp,
- raw_val, val)
- record[name_tuple] = val
+ return record
- return record
+ def load_board(self, brdfile):
+ """Load a board config.
- def load_board(self, brdfile):
- """Load a board config.
+ Args:
+ brdfile: Filename of a json file decribing the INA wiring of this board.
+ """
+ with open(process_filename(brdfile)) as data_file:
+ data = json.load(data_file)
- Args:
- brdfile: Filename of a json file decribing the INA wiring of this board.
- """
- with open(process_filename(brdfile)) as data_file:
- data = json.load(data_file)
-
- #TODO: validate this.
- self._brdcfg = data;
- self._logger.debug(pprint.pformat(data))
+ # TODO: validate this.
+ self._brdcfg = data
+ self._logger.debug(pprint.pformat(data))
class powerlog(object):
- """Power class to log aggregated power.
-
- Usage:
- obj = powerlog()
-
- Instance Variables:
- _data: a StatsManager object that records sweetberry readings and calculates
- statistics.
- _pwr[]: Spower objects for individual sweetberries.
- """
+ """Power class to log aggregated power.
- def __init__(self, brdfile, cfgfile, serial_a=None, serial_b=None,
- sync_date=False, use_ms=False, use_mW=False, print_stats=False,
- stats_dir=None, stats_json_dir=None, print_raw_data=True,
- raw_data_dir=None):
- """Init the powerlog class and set the variables.
-
- Args:
- brdfile: string name of json file containing board layout.
- cfgfile: string name of json containing list of rails to read.
- serial_a: serial number of sweetberry A.
- serial_b: serial number of sweetberry B.
- sync_date: report timestamps synced with host datetime.
- use_ms: report timestamps in ms rather than us.
- use_mW: report power as milliwatts, otherwise default to microwatts.
- print_stats: print statistics for sweetberry readings at the end.
- stats_dir: directory to save sweetberry readings statistics; if None then
- do not save the statistics.
- stats_json_dir: directory to save means of sweetberry readings in json
- format; if None then do not save the statistics.
- print_raw_data: print sweetberry readings raw data in real time, default
- is to print.
- raw_data_dir: directory to save sweetberry readings raw data; if None then
- do not save the raw data.
- """
- self._logger = logging.getLogger(__name__)
- self._data = StatsManager()
- self._pwr = {}
- self._use_ms = use_ms
- self._use_mW = use_mW
- self._print_stats = print_stats
- self._stats_dir = stats_dir
- self._stats_json_dir = stats_json_dir
- self._print_raw_data = print_raw_data
- self._raw_data_dir = raw_data_dir
-
- if not serial_a and not serial_b:
- self._pwr['A'] = Spower('A')
- if serial_a:
- self._pwr['A'] = Spower('A', serialname=serial_a)
- if serial_b:
- self._pwr['B'] = Spower('B', serialname=serial_b)
-
- with open(process_filename(cfgfile)) as data_file:
- names = json.load(data_file)
- self._names = self.process_scenario(names)
-
- for key in self._pwr:
- self._pwr[key].load_board(brdfile)
- self._pwr[key].reset()
-
- # Allocate the rails to the appropriate boards.
- used_boards = []
- for name in self._names:
- success = False
- for key in self._pwr.keys():
- if self._pwr[key].add_ina_name(name):
- success = True
- if key not in used_boards:
- used_boards.append(key)
- if not success:
- raise Exception("Failed to add %s (maybe missing "
- "sweetberry, or bad board file?)" % name)
-
- # Evict unused boards.
- for key in list(self._pwr.keys()):
- if key not in used_boards:
- self._pwr.pop(key)
-
- for key in self._pwr.keys():
- if sync_date:
- self._pwr[key].set_time(time.time() * 1000000)
- else:
- self._pwr[key].set_time(0)
-
- def process_scenario(self, name_list):
- """Return list of tuples indicating name and type.
+ Usage:
+ obj = powerlog()
- Args:
- json originated list of names, or [name, type]
- Returns:
- list of tuples of (name, type) defaulting to type "POWER"
- Raises: exception, invalid INA type.
+ Instance Variables:
+ _data: a StatsManager object that records sweetberry readings and calculates
+ statistics.
+ _pwr[]: Spower objects for individual sweetberries.
"""
- names = []
- for entry in name_list:
- if isinstance(entry, list):
- name = entry[0]
- if entry[1] == "POWER":
- type = Spower.INA_POWER
- elif entry[1] == "BUSV":
- type = Spower.INA_BUSV
- elif entry[1] == "CURRENT":
- type = Spower.INA_CURRENT
- elif entry[1] == "SHUNTV":
- type = Spower.INA_SHUNTV
- else:
- raise Exception("Invalid INA type", "Type of %s [%s] not recognized,"
- " try one of POWER, BUSV, CURRENT" % (entry[0], entry[1]))
- else:
- name = entry
- type = Spower.INA_POWER
- names.append((name, type))
- return names
+ def __init__(
+ self,
+ brdfile,
+ cfgfile,
+ serial_a=None,
+ serial_b=None,
+ sync_date=False,
+ use_ms=False,
+ use_mW=False,
+ print_stats=False,
+ stats_dir=None,
+ stats_json_dir=None,
+ print_raw_data=True,
+ raw_data_dir=None,
+ ):
+ """Init the powerlog class and set the variables.
+
+ Args:
+ brdfile: string name of json file containing board layout.
+ cfgfile: string name of json containing list of rails to read.
+ serial_a: serial number of sweetberry A.
+ serial_b: serial number of sweetberry B.
+ sync_date: report timestamps synced with host datetime.
+ use_ms: report timestamps in ms rather than us.
+ use_mW: report power as milliwatts, otherwise default to microwatts.
+ print_stats: print statistics for sweetberry readings at the end.
+ stats_dir: directory to save sweetberry readings statistics; if None then
+ do not save the statistics.
+ stats_json_dir: directory to save means of sweetberry readings in json
+ format; if None then do not save the statistics.
+ print_raw_data: print sweetberry readings raw data in real time, default
+ is to print.
+ raw_data_dir: directory to save sweetberry readings raw data; if None then
+ do not save the raw data.
+ """
+ self._logger = logging.getLogger(__name__)
+ self._data = StatsManager()
+ self._pwr = {}
+ self._use_ms = use_ms
+ self._use_mW = use_mW
+ self._print_stats = print_stats
+ self._stats_dir = stats_dir
+ self._stats_json_dir = stats_json_dir
+ self._print_raw_data = print_raw_data
+ self._raw_data_dir = raw_data_dir
+
+ if not serial_a and not serial_b:
+ self._pwr["A"] = Spower("A")
+ if serial_a:
+ self._pwr["A"] = Spower("A", serialname=serial_a)
+ if serial_b:
+ self._pwr["B"] = Spower("B", serialname=serial_b)
+
+ with open(process_filename(cfgfile)) as data_file:
+ names = json.load(data_file)
+ self._names = self.process_scenario(names)
- def start(self, integration_us_request, seconds, sync_speed=.8):
- """Starts sampling.
-
- Args:
- integration_us_request: requested interval between sample values.
- seconds: time until exit, or None to run until cancel.
- sync_speed: A usb request is sent every [.8] * integration_us.
- """
- # We will get back the actual integration us.
- # It should be the same for all devices.
- integration_us = None
- for key in self._pwr:
- integration_us_new = self._pwr[key].start(integration_us_request)
- if integration_us:
- if integration_us != integration_us_new:
- raise Exception("FAIL",
- "Integration on A: %dus != integration on B %dus" % (
- integration_us, integration_us_new))
- integration_us = integration_us_new
-
- # CSV header
- title = "ts:%dus" % integration_us
- for name_tuple in self._names:
- name, ina_type = name_tuple
-
- if ina_type == Spower.INA_POWER:
- unit = "mW" if self._use_mW else "uW"
- elif ina_type == Spower.INA_BUSV:
- unit = "mV"
- elif ina_type == Spower.INA_CURRENT:
- unit = "uA"
- elif ina_type == Spower.INA_SHUNTV:
- unit = "uV"
-
- title += ", %s %s" % (name, unit)
- name_type = name + Spower.INA_SUFFIX[ina_type]
- self._data.SetUnit(name_type, unit)
- title += ", status"
- if self._print_raw_data:
- logoutput(title)
-
- forever = False
- if not seconds:
- forever = True
- end_time = time.time() + seconds
- try:
- pending_records = []
- while forever or end_time > time.time():
- if (integration_us > 5000):
- time.sleep((integration_us / 1000000.) * sync_speed)
for key in self._pwr:
- records = self._pwr[key].read_line()
- if not records:
- continue
-
- for record in records:
- pending_records.append(record)
-
- pending_records.sort(key=lambda r: r['ts'])
-
- aggregate_record = {"boards": set()}
- for record in pending_records:
- if record["berry"] not in aggregate_record["boards"]:
- for rkey in record.keys():
- aggregate_record[rkey] = record[rkey]
- aggregate_record["boards"].add(record["berry"])
- else:
- self._logger.info("break %s, %s", record["berry"],
- aggregate_record["boards"])
- break
-
- if aggregate_record["boards"] == set(self._pwr.keys()):
- csv = "%f" % aggregate_record["ts"]
- for name in self._names:
- if name in aggregate_record:
- multiplier = 0.001 if (self._use_mW and
- name[1]==Spower.INA_POWER) else 1
- value = aggregate_record[name] * multiplier
- csv += ", %.2f" % value
- name_type = name[0] + Spower.INA_SUFFIX[name[1]]
- self._data.AddSample(name_type, value)
- else:
- csv += ", "
- csv += ", %d" % aggregate_record["status"]
- if self._print_raw_data:
- logoutput(csv)
-
- aggregate_record = {"boards": set()}
- for r in range(0, len(self._pwr)):
- pending_records.pop(0)
-
- except KeyboardInterrupt:
- self._logger.info('\nCTRL+C caught.')
-
- finally:
- for key in self._pwr:
- self._pwr[key].stop()
- self._data.CalculateStats()
- if self._print_stats:
- print(self._data.SummaryToString())
- save_dir = 'sweetberry%s' % time.time()
- if self._stats_dir:
- stats_dir = os.path.join(self._stats_dir, save_dir)
- self._data.SaveSummary(stats_dir)
- if self._stats_json_dir:
- stats_json_dir = os.path.join(self._stats_json_dir, save_dir)
- self._data.SaveSummaryJSON(stats_json_dir)
- if self._raw_data_dir:
- raw_data_dir = os.path.join(self._raw_data_dir, save_dir)
- self._data.SaveRawData(raw_data_dir)
+ self._pwr[key].load_board(brdfile)
+ self._pwr[key].reset()
+
+ # Allocate the rails to the appropriate boards.
+ used_boards = []
+ for name in self._names:
+ success = False
+ for key in self._pwr.keys():
+ if self._pwr[key].add_ina_name(name):
+ success = True
+ if key not in used_boards:
+ used_boards.append(key)
+ if not success:
+ raise Exception(
+ "Failed to add %s (maybe missing "
+ "sweetberry, or bad board file?)" % name
+ )
+
+ # Evict unused boards.
+ for key in list(self._pwr.keys()):
+ if key not in used_boards:
+ self._pwr.pop(key)
+
+ for key in self._pwr.keys():
+ if sync_date:
+ self._pwr[key].set_time(time.time() * 1000000)
+ else:
+ self._pwr[key].set_time(0)
+
+ def process_scenario(self, name_list):
+ """Return list of tuples indicating name and type.
+
+ Args:
+ json originated list of names, or [name, type]
+ Returns:
+ list of tuples of (name, type) defaulting to type "POWER"
+ Raises: exception, invalid INA type.
+ """
+ names = []
+ for entry in name_list:
+ if isinstance(entry, list):
+ name = entry[0]
+ if entry[1] == "POWER":
+ type = Spower.INA_POWER
+ elif entry[1] == "BUSV":
+ type = Spower.INA_BUSV
+ elif entry[1] == "CURRENT":
+ type = Spower.INA_CURRENT
+ elif entry[1] == "SHUNTV":
+ type = Spower.INA_SHUNTV
+ else:
+ raise Exception(
+ "Invalid INA type",
+ "Type of %s [%s] not recognized,"
+ " try one of POWER, BUSV, CURRENT"
+ % (entry[0], entry[1]),
+ )
+ else:
+ name = entry
+ type = Spower.INA_POWER
+
+ names.append((name, type))
+ return names
+
+ def start(self, integration_us_request, seconds, sync_speed=0.8):
+ """Starts sampling.
+
+ Args:
+ integration_us_request: requested interval between sample values.
+ seconds: time until exit, or None to run until cancel.
+ sync_speed: A usb request is sent every [.8] * integration_us.
+ """
+ # We will get back the actual integration us.
+ # It should be the same for all devices.
+ integration_us = None
+ for key in self._pwr:
+ integration_us_new = self._pwr[key].start(integration_us_request)
+ if integration_us:
+ if integration_us != integration_us_new:
+ raise Exception(
+ "FAIL",
+ # pylint:disable=bad-string-format-type
+ "Integration on A: %dus != integration on B %dus"
+ % (integration_us, integration_us_new),
+ )
+ integration_us = integration_us_new
+
+ # CSV header
+ title = "ts:%dus" % integration_us
+ for name_tuple in self._names:
+ name, ina_type = name_tuple
+
+ if ina_type == Spower.INA_POWER:
+ unit = "mW" if self._use_mW else "uW"
+ elif ina_type == Spower.INA_BUSV:
+ unit = "mV"
+ elif ina_type == Spower.INA_CURRENT:
+ unit = "uA"
+ elif ina_type == Spower.INA_SHUNTV:
+ unit = "uV"
+
+ title += ", %s %s" % (name, unit)
+ name_type = name + Spower.INA_SUFFIX[ina_type]
+ self._data.SetUnit(name_type, unit)
+ title += ", status"
+ if self._print_raw_data:
+ logoutput(title)
+
+ forever = False
+ if not seconds:
+ forever = True
+ end_time = time.time() + seconds
+ try:
+ pending_records = []
+ while forever or end_time > time.time():
+ if integration_us > 5000:
+ time.sleep((integration_us / 1000000.0) * sync_speed)
+ for key in self._pwr:
+ records = self._pwr[key].read_line()
+ if not records:
+ continue
+
+ for record in records:
+ pending_records.append(record)
+
+ pending_records.sort(key=lambda r: r["ts"])
+
+ aggregate_record = {"boards": set()}
+ for record in pending_records:
+ if record["berry"] not in aggregate_record["boards"]:
+ for rkey in record.keys():
+ aggregate_record[rkey] = record[rkey]
+ aggregate_record["boards"].add(record["berry"])
+ else:
+ self._logger.info(
+ "break %s, %s",
+ record["berry"],
+ aggregate_record["boards"],
+ )
+ break
+
+ if aggregate_record["boards"] == set(self._pwr.keys()):
+ csv = "%f" % aggregate_record["ts"]
+ for name in self._names:
+ if name in aggregate_record:
+ multiplier = (
+ 0.001
+ if (
+ self._use_mW
+ and name[1] == Spower.INA_POWER
+ )
+ else 1
+ )
+ value = aggregate_record[name] * multiplier
+ csv += ", %.2f" % value
+ name_type = name[0] + Spower.INA_SUFFIX[name[1]]
+ self._data.AddSample(name_type, value)
+ else:
+ csv += ", "
+ csv += ", %d" % aggregate_record["status"]
+ if self._print_raw_data:
+ logoutput(csv)
+
+ aggregate_record = {"boards": set()}
+ for r in range(0, len(self._pwr)):
+ pending_records.pop(0)
+
+ except KeyboardInterrupt:
+ self._logger.info("\nCTRL+C caught.")
+
+ finally:
+ for key in self._pwr:
+ self._pwr[key].stop()
+ self._data.CalculateStats()
+ if self._print_stats:
+ print(self._data.SummaryToString())
+ save_dir = "sweetberry%s" % time.time()
+ if self._stats_dir:
+ stats_dir = os.path.join(self._stats_dir, save_dir)
+ self._data.SaveSummary(stats_dir)
+ if self._stats_json_dir:
+ stats_json_dir = os.path.join(self._stats_json_dir, save_dir)
+ self._data.SaveSummaryJSON(stats_json_dir)
+ if self._raw_data_dir:
+ raw_data_dir = os.path.join(self._raw_data_dir, save_dir)
+ self._data.SaveRawData(raw_data_dir)
def main(argv=None):
- if argv is None:
- argv = sys.argv[1:]
- # Command line argument description.
- parser = argparse.ArgumentParser(
- description="Gather CSV data from sweetberry")
- parser.add_argument('-b', '--board', type=str,
- help="Board configuration file, eg. my.board", default="")
- parser.add_argument('-c', '--config', type=str,
- help="Rail config to monitor, eg my.scenario", default="")
- parser.add_argument('-A', '--serial', type=str,
- help="Serial number of sweetberry A", default="")
- parser.add_argument('-B', '--serial_b', type=str,
- help="Serial number of sweetberry B", default="")
- parser.add_argument('-t', '--integration_us', type=int,
- help="Target integration time for samples", default=100000)
- parser.add_argument('-s', '--seconds', type=float,
- help="Seconds to run capture", default=0.)
- parser.add_argument('--date', default=False,
- help="Sync logged timestamp to host date", action="store_true")
- parser.add_argument('--ms', default=False,
- help="Print timestamp as milliseconds", action="store_true")
- parser.add_argument('--mW', default=False,
- help="Print power as milliwatts, otherwise default to microwatts",
- action="store_true")
- parser.add_argument('--slow', default=False,
- help="Intentionally overflow", action="store_true")
- parser.add_argument('--print_stats', default=False, action="store_true",
- help="Print statistics for sweetberry readings at the end")
- parser.add_argument('--save_stats', type=str, nargs='?',
- dest='stats_dir', metavar='STATS_DIR',
- const=os.path.dirname(os.path.abspath(__file__)), default=None,
- help="Save statistics for sweetberry readings to %(metavar)s if "
- "%(metavar)s is specified, %(metavar)s will be created if it does "
- "not exist; if %(metavar)s is not specified but the flag is set, "
- "stats will be saved to where %(prog)s is located; if this flag is "
- "not set, then do not save stats")
- parser.add_argument('--save_stats_json', type=str, nargs='?',
- dest='stats_json_dir', metavar='STATS_JSON_DIR',
- const=os.path.dirname(os.path.abspath(__file__)), default=None,
- help="Save means for sweetberry readings in json to %(metavar)s if "
- "%(metavar)s is specified, %(metavar)s will be created if it does "
- "not exist; if %(metavar)s is not specified but the flag is set, "
- "stats will be saved to where %(prog)s is located; if this flag is "
- "not set, then do not save stats")
- parser.add_argument('--no_print_raw_data',
- dest='print_raw_data', default=True, action="store_false",
- help="Not print raw sweetberry readings at real time, default is to "
- "print")
- parser.add_argument('--save_raw_data', type=str, nargs='?',
- dest='raw_data_dir', metavar='RAW_DATA_DIR',
- const=os.path.dirname(os.path.abspath(__file__)), default=None,
- help="Save raw data for sweetberry readings to %(metavar)s if "
- "%(metavar)s is specified, %(metavar)s will be created if it does "
- "not exist; if %(metavar)s is not specified but the flag is set, "
- "raw data will be saved to where %(prog)s is located; if this flag "
- "is not set, then do not save raw data")
- parser.add_argument('-v', '--verbose', default=False,
- help="Very chatty printout", action="store_true")
-
- args = parser.parse_args(argv)
-
- root_logger = logging.getLogger(__name__)
- if args.verbose:
- root_logger.setLevel(logging.DEBUG)
- else:
- root_logger.setLevel(logging.INFO)
-
- # if powerlog is used through main, log to sys.stdout
- if __name__ == "__main__":
- stdout_handler = logging.StreamHandler(sys.stdout)
- stdout_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
- root_logger.addHandler(stdout_handler)
-
- integration_us_request = args.integration_us
- if not args.board:
- raise Exception("Power", "No board file selected, see board.README")
- if not args.config:
- raise Exception("Power", "No config file selected, see board.README")
-
- brdfile = args.board
- cfgfile = args.config
- seconds = args.seconds
- serial_a = args.serial
- serial_b = args.serial_b
- sync_date = args.date
- use_ms = args.ms
- use_mW = args.mW
- print_stats = args.print_stats
- stats_dir = args.stats_dir
- stats_json_dir = args.stats_json_dir
- print_raw_data = args.print_raw_data
- raw_data_dir = args.raw_data_dir
-
- boards = []
-
- sync_speed = .8
- if args.slow:
- sync_speed = 1.2
-
- # Set up logging interface.
- powerlogger = powerlog(brdfile, cfgfile, serial_a=serial_a, serial_b=serial_b,
- sync_date=sync_date, use_ms=use_ms, use_mW=use_mW,
- print_stats=print_stats, stats_dir=stats_dir,
- stats_json_dir=stats_json_dir,
- print_raw_data=print_raw_data,raw_data_dir=raw_data_dir)
-
- # Start logging.
- powerlogger.start(integration_us_request, seconds, sync_speed=sync_speed)
+ if argv is None:
+ argv = sys.argv[1:]
+ # Command line argument description.
+ parser = argparse.ArgumentParser(
+ description="Gather CSV data from sweetberry"
+ )
+ parser.add_argument(
+ "-b",
+ "--board",
+ type=str,
+ help="Board configuration file, eg. my.board",
+ default="",
+ )
+ parser.add_argument(
+ "-c",
+ "--config",
+ type=str,
+ help="Rail config to monitor, eg my.scenario",
+ default="",
+ )
+ parser.add_argument(
+ "-A",
+ "--serial",
+ type=str,
+ help="Serial number of sweetberry A",
+ default="",
+ )
+ parser.add_argument(
+ "-B",
+ "--serial_b",
+ type=str,
+ help="Serial number of sweetberry B",
+ default="",
+ )
+ parser.add_argument(
+ "-t",
+ "--integration_us",
+ type=int,
+ help="Target integration time for samples",
+ default=100000,
+ )
+ parser.add_argument(
+ "-s",
+ "--seconds",
+ type=float,
+ help="Seconds to run capture",
+ default=0.0,
+ )
+ parser.add_argument(
+ "--date",
+ default=False,
+ help="Sync logged timestamp to host date",
+ action="store_true",
+ )
+ parser.add_argument(
+ "--ms",
+ default=False,
+ help="Print timestamp as milliseconds",
+ action="store_true",
+ )
+ parser.add_argument(
+ "--mW",
+ default=False,
+ help="Print power as milliwatts, otherwise default to microwatts",
+ action="store_true",
+ )
+ parser.add_argument(
+ "--slow",
+ default=False,
+ help="Intentionally overflow",
+ action="store_true",
+ )
+ parser.add_argument(
+ "--print_stats",
+ default=False,
+ action="store_true",
+ help="Print statistics for sweetberry readings at the end",
+ )
+ parser.add_argument(
+ "--save_stats",
+ type=str,
+ nargs="?",
+ dest="stats_dir",
+ metavar="STATS_DIR",
+ const=os.path.dirname(os.path.abspath(__file__)),
+ default=None,
+ help="Save statistics for sweetberry readings to %(metavar)s if "
+ "%(metavar)s is specified, %(metavar)s will be created if it does "
+ "not exist; if %(metavar)s is not specified but the flag is set, "
+ "stats will be saved to where %(prog)s is located; if this flag is "
+ "not set, then do not save stats",
+ )
+ parser.add_argument(
+ "--save_stats_json",
+ type=str,
+ nargs="?",
+ dest="stats_json_dir",
+ metavar="STATS_JSON_DIR",
+ const=os.path.dirname(os.path.abspath(__file__)),
+ default=None,
+ help="Save means for sweetberry readings in json to %(metavar)s if "
+ "%(metavar)s is specified, %(metavar)s will be created if it does "
+ "not exist; if %(metavar)s is not specified but the flag is set, "
+ "stats will be saved to where %(prog)s is located; if this flag is "
+ "not set, then do not save stats",
+ )
+ parser.add_argument(
+ "--no_print_raw_data",
+ dest="print_raw_data",
+ default=True,
+ action="store_false",
+ help="Not print raw sweetberry readings at real time, default is to "
+ "print",
+ )
+ parser.add_argument(
+ "--save_raw_data",
+ type=str,
+ nargs="?",
+ dest="raw_data_dir",
+ metavar="RAW_DATA_DIR",
+ const=os.path.dirname(os.path.abspath(__file__)),
+ default=None,
+ help="Save raw data for sweetberry readings to %(metavar)s if "
+ "%(metavar)s is specified, %(metavar)s will be created if it does "
+ "not exist; if %(metavar)s is not specified but the flag is set, "
+ "raw data will be saved to where %(prog)s is located; if this flag "
+ "is not set, then do not save raw data",
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ default=False,
+ help="Very chatty printout",
+ action="store_true",
+ )
+
+ args = parser.parse_args(argv)
+
+ root_logger = logging.getLogger(__name__)
+ if args.verbose:
+ root_logger.setLevel(logging.DEBUG)
+ else:
+ root_logger.setLevel(logging.INFO)
+
+ # if powerlog is used through main, log to sys.stdout
+ if __name__ == "__main__":
+ stdout_handler = logging.StreamHandler(sys.stdout)
+ stdout_handler.setFormatter(
+ logging.Formatter("%(levelname)s: %(message)s")
+ )
+ root_logger.addHandler(stdout_handler)
+
+ integration_us_request = args.integration_us
+ if not args.board:
+ raise Exception("Power", "No board file selected, see board.README")
+ if not args.config:
+ raise Exception("Power", "No config file selected, see board.README")
+
+ brdfile = args.board
+ cfgfile = args.config
+ seconds = args.seconds
+ serial_a = args.serial
+ serial_b = args.serial_b
+ sync_date = args.date
+ use_ms = args.ms
+ use_mW = args.mW
+ print_stats = args.print_stats
+ stats_dir = args.stats_dir
+ stats_json_dir = args.stats_json_dir
+ print_raw_data = args.print_raw_data
+ raw_data_dir = args.raw_data_dir
+
+ boards = []
+
+ sync_speed = 0.8
+ if args.slow:
+ sync_speed = 1.2
+
+ # Set up logging interface.
+ powerlogger = powerlog(
+ brdfile,
+ cfgfile,
+ serial_a=serial_a,
+ serial_b=serial_b,
+ sync_date=sync_date,
+ use_ms=use_ms,
+ use_mW=use_mW,
+ print_stats=print_stats,
+ stats_dir=stats_dir,
+ stats_json_dir=stats_json_dir,
+ print_raw_data=print_raw_data,
+ raw_data_dir=raw_data_dir,
+ )
+
+ # Start logging.
+ powerlogger.start(integration_us_request, seconds, sync_speed=sync_speed)
if __name__ == "__main__":
- main()
+ main()
diff --git a/extra/usb_power/powerlog_unittest.py b/extra/usb_power/powerlog_unittest.py
index 1d0718530e..62667e35b8 100644
--- a/extra/usb_power/powerlog_unittest.py
+++ b/extra/usb_power/powerlog_unittest.py
@@ -1,10 +1,6 @@
-# Copyright 2018 The Chromium OS Authors. All rights reserved.
+# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Unit tests for powerlog."""
@@ -13,42 +9,44 @@ import shutil
import tempfile
import unittest
-import powerlog
+from usb_power import powerlog
+
class TestPowerlog(unittest.TestCase):
- """Test to verify powerlog util methods work as expected."""
-
- def setUp(self):
- """Set up data and create a temporary directory to save data and stats."""
- self.tempdir = tempfile.mkdtemp()
- self.filename = 'testfile'
- self.filepath = os.path.join(self.tempdir, self.filename)
- with open(self.filepath, 'w') as f:
- f.write('')
-
- def tearDown(self):
- """Delete the temporary directory and its content."""
- shutil.rmtree(self.tempdir)
-
- def test_ProcessFilenameAbsoluteFilePath(self):
- """Absolute file path is returned unchanged."""
- processed_fname = powerlog.process_filename(self.filepath)
- self.assertEqual(self.filepath, processed_fname)
-
- def test_ProcessFilenameRelativeFilePath(self):
- """Finds relative file path inside a known config location."""
- original = powerlog.CONFIG_LOCATIONS
- powerlog.CONFIG_LOCATIONS = [self.tempdir]
- processed_fname = powerlog.process_filename(self.filename)
- try:
- self.assertEqual(self.filepath, processed_fname)
- finally:
- powerlog.CONFIG_LOCATIONS = original
-
- def test_ProcessFilenameInvalid(self):
- """IOError is raised when file cannot be found by any of the four ways."""
- with self.assertRaises(IOError):
- powerlog.process_filename(self.filename)
-
-if __name__ == '__main__':
- unittest.main()
+ """Test to verify powerlog util methods work as expected."""
+
+ def setUp(self):
+ """Set up data and create a temporary directory to save data and stats."""
+ self.tempdir = tempfile.mkdtemp()
+ self.filename = "testfile"
+ self.filepath = os.path.join(self.tempdir, self.filename)
+ with open(self.filepath, "w") as f:
+ f.write("")
+
+ def tearDown(self):
+ """Delete the temporary directory and its content."""
+ shutil.rmtree(self.tempdir)
+
+ def test_ProcessFilenameAbsoluteFilePath(self):
+ """Absolute file path is returned unchanged."""
+ processed_fname = powerlog.process_filename(self.filepath)
+ self.assertEqual(self.filepath, processed_fname)
+
+ def test_ProcessFilenameRelativeFilePath(self):
+ """Finds relative file path inside a known config location."""
+ original = powerlog.CONFIG_LOCATIONS
+ powerlog.CONFIG_LOCATIONS = [self.tempdir]
+ processed_fname = powerlog.process_filename(self.filename)
+ try:
+ self.assertEqual(self.filepath, processed_fname)
+ finally:
+ powerlog.CONFIG_LOCATIONS = original
+
+ def test_ProcessFilenameInvalid(self):
+ """IOError is raised when file cannot be found by any of the four ways."""
+ with self.assertRaises(IOError):
+ powerlog.process_filename(self.filename)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/extra/usb_power/stats_manager.py b/extra/usb_power/stats_manager.py
index 0f8c3fcb15..2035138731 100644
--- a/extra/usb_power/stats_manager.py
+++ b/extra/usb_power/stats_manager.py
@@ -1,10 +1,6 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Calculates statistics for lists of data and pretty print them."""
@@ -18,384 +14,409 @@ import logging
import math
import os
-import numpy
+import numpy # pylint:disable=import-error
-STATS_PREFIX = '@@'
-NAN_TAG = '*'
-NAN_DESCRIPTION = '%s domains contain NaN samples' % NAN_TAG
+STATS_PREFIX = "@@"
+NAN_TAG = "*"
+NAN_DESCRIPTION = "%s domains contain NaN samples" % NAN_TAG
LONG_UNIT = {
- '': 'N/A',
- 'mW': 'milliwatt',
- 'uW': 'microwatt',
- 'mV': 'millivolt',
- 'uA': 'microamp',
- 'uV': 'microvolt'
+ "": "N/A",
+ "mW": "milliwatt",
+ "uW": "microwatt",
+ "mV": "millivolt",
+ "uA": "microamp",
+ "uV": "microvolt",
}
class StatsManagerError(Exception):
- """Errors in StatsManager class."""
- pass
+ """Errors in StatsManager class."""
+ pass
-class StatsManager(object):
- """Calculates statistics for several lists of data(float).
-
- Example usage:
-
- >>> stats = StatsManager(title='Title Banner')
- >>> stats.AddSample(TIME_KEY, 50.0)
- >>> stats.AddSample(TIME_KEY, 25.0)
- >>> stats.AddSample(TIME_KEY, 40.0)
- >>> stats.AddSample(TIME_KEY, 10.0)
- >>> stats.AddSample(TIME_KEY, 10.0)
- >>> stats.AddSample('frobnicate', 11.5)
- >>> stats.AddSample('frobnicate', 9.0)
- >>> stats.AddSample('foobar', 11111.0)
- >>> stats.AddSample('foobar', 22222.0)
- >>> stats.CalculateStats()
- >>> print(stats.SummaryToString())
- ` @@--------------------------------------------------------------
- ` @@ Title Banner
- @@--------------------------------------------------------------
- @@ NAME COUNT MEAN STDDEV MAX MIN
- @@ sample_msecs 4 31.25 15.16 50.00 10.00
- @@ foobar 2 16666.50 5555.50 22222.00 11111.00
- @@ frobnicate 2 10.25 1.25 11.50 9.00
- ` @@--------------------------------------------------------------
-
- Attributes:
- _data: dict of list of readings for each domain(key)
- _unit: dict of unit for each domain(key)
- _smid: id supplied to differentiate data output to other StatsManager
- instances that potentially save to the same directory
- if smid all output files will be named |smid|_|fname|
- _title: title to add as banner to formatted summary. If no title,
- no banner gets added
- _order: list of formatting order for domains. Domains not listed are
- displayed in sorted order
- _hide_domains: collection of domains to hide when formatting summary string
- _accept_nan: flag to indicate if NaN samples are acceptable
- _nan_domains: set to keep track of which domains contain NaN samples
- _summary: dict of stats per domain (key): min, max, count, mean, stddev
- _logger = StatsManager logger
-
- Note:
- _summary is empty until CalculateStats() is called, and is updated when
- CalculateStats() is called.
- """
-
- # pylint: disable=W0102
- def __init__(self, smid='', title='', order=[], hide_domains=[],
- accept_nan=True):
- """Initialize infrastructure for data and their statistics."""
- self._title = title
- self._data = collections.defaultdict(list)
- self._unit = collections.defaultdict(str)
- self._smid = smid
- self._order = order
- self._hide_domains = hide_domains
- self._accept_nan = accept_nan
- self._nan_domains = set()
- self._summary = {}
- self._logger = logging.getLogger(type(self).__name__)
-
- def AddSample(self, domain, sample):
- """Add one sample for a domain.
-
- Args:
- domain: the domain name for the sample.
- sample: one time sample for domain, expect type float.
-
- Raises:
- StatsManagerError: if trying to add NaN and |_accept_nan| is false
- """
- try:
- sample = float(sample)
- except ValueError:
- # if we don't accept nan this will be caught below
- self._logger.debug('sample %s for domain %s is not a number. Making NaN',
- sample, domain)
- sample = float('NaN')
- if not self._accept_nan and math.isnan(sample):
- raise StatsManagerError('accept_nan is false. Cannot add NaN sample.')
- self._data[domain].append(sample)
- if math.isnan(sample):
- self._nan_domains.add(domain)
-
- def SetUnit(self, domain, unit):
- """Set the unit for a domain.
-
- There can be only one unit for each domain. Setting unit twice will
- overwrite the original unit.
-
- Args:
- domain: the domain name.
- unit: unit of the domain.
- """
- if domain in self._unit:
- self._logger.warning('overwriting the unit of %s, old unit is %s, new '
- 'unit is %s.', domain, self._unit[domain], unit)
- self._unit[domain] = unit
- def CalculateStats(self):
- """Calculate stats for all domain-data pairs.
-
- First erases all previous stats, then calculate stats for all data.
- """
- self._summary = {}
- for domain, data in self._data.items():
- data_np = numpy.array(data)
- self._summary[domain] = {
- 'mean': numpy.nanmean(data_np),
- 'min': numpy.nanmin(data_np),
- 'max': numpy.nanmax(data_np),
- 'stddev': numpy.nanstd(data_np),
- 'count': data_np.size,
- }
-
- @property
- def DomainsToDisplay(self):
- """List of domains that the manager will output in summaries."""
- return set(self._summary.keys()) - set(self._hide_domains)
-
- @property
- def NanInOutput(self):
- """Return whether any of the domains to display have NaN values."""
- return bool(len(set(self._nan_domains) & self.DomainsToDisplay))
-
- def _SummaryTable(self):
- """Generate the matrix to output as a summary.
-
- Returns:
- A 2d matrix of headers and their data for each domain
- e.g.
- [[NAME, COUNT, MEAN, STDDEV, MAX, MIN],
- [pp5000_mw, 10, 50, 0, 50, 50]]
- """
- headers = ('NAME', 'COUNT', 'MEAN', 'STDDEV', 'MAX', 'MIN')
- table = [headers]
- # determine what domains to display & and the order
- domains_to_display = self.DomainsToDisplay
- display_order = [key for key in self._order if key in domains_to_display]
- domains_to_display -= set(display_order)
- display_order.extend(sorted(domains_to_display))
- for domain in display_order:
- stats = self._summary[domain]
- if not domain.endswith(self._unit[domain]):
- domain = '%s_%s' % (domain, self._unit[domain])
- if domain in self._nan_domains:
- domain = '%s%s' % (domain, NAN_TAG)
- row = [domain]
- row.append(str(stats['count']))
- for entry in headers[2:]:
- row.append('%.2f' % stats[entry.lower()])
- table.append(row)
- return table
-
- def SummaryToMarkdownString(self):
- """Format the summary into a b/ compatible markdown table string.
-
- This requires this sort of output format
-
- | header1 | header2 | header3 | ...
- | --------- | --------- | --------- | ...
- | sample1h1 | sample1h2 | sample1h3 | ...
- .
- .
- .
-
- Returns:
- formatted summary string.
- """
- # All we need to do before processing is insert a row of '-' between
- # the headers, and the data
- table = self._SummaryTable()
- columns = len(table[0])
- # Using '-:' to allow the numbers to be right aligned
- sep_row = ['-'] + ['-:'] * (columns - 1)
- table.insert(1, sep_row)
- text_rows = ['|'.join(r) for r in table]
- body = '\n'.join(['|%s|' % r for r in text_rows])
- if self._title:
- title_section = '**%s** \n\n' % self._title
- body = title_section + body
- # Make sure that the body is terminated with a newline.
- return body + '\n'
-
- def SummaryToString(self, prefix=STATS_PREFIX):
- """Format summary into a string, ready for pretty print.
-
- See class description for format example.
-
- Args:
- prefix: start every row in summary string with prefix, for easier reading.
-
- Returns:
- formatted summary string.
- """
- table = self._SummaryTable()
- max_col_width = []
- for col_idx in range(len(table[0])):
- col_item_widths = [len(row[col_idx]) for row in table]
- max_col_width.append(max(col_item_widths))
-
- formatted_lines = []
- for row in table:
- formatted_row = prefix + ' '
- for i in range(len(row)):
- formatted_row += row[i].rjust(max_col_width[i] + 2)
- formatted_lines.append(formatted_row)
- if self.NanInOutput:
- formatted_lines.append('%s %s' % (prefix, NAN_DESCRIPTION))
-
- if self._title:
- line_length = len(formatted_lines[0])
- dec_length = len(prefix)
- # trim title to be at most as long as the longest line without the prefix
- title = self._title[:(line_length - dec_length)]
- # line is a seperator line consisting of -----
- line = '%s%s' % (prefix, '-' * (line_length - dec_length))
- # prepend the prefix to the centered title
- padded_title = '%s%s' % (prefix, title.center(line_length)[dec_length:])
- formatted_lines = [line, padded_title, line] + formatted_lines + [line]
- formatted_output = '\n'.join(formatted_lines)
- return formatted_output
-
- def GetSummary(self):
- """Getter for summary."""
- return self._summary
-
- def _MakeUniqueFName(self, fname):
- """prepend |_smid| to fname & rotate fname to ensure uniqueness.
-
- Before saving a file through the StatsManager, make sure that the filename
- is unique, first by prepending the smid if any and otherwise by appending
- increasing integer suffixes until the filename is unique.
-
- If |smid| is defined /path/to/example/file.txt becomes
- /path/to/example/{smid}_file.txt.
-
- The rotation works by changing /path/to/example/somename.txt to
- /path/to/example/somename1.txt if the first one already exists on the
- system.
-
- Note: this is not thread-safe. While it makes sense to use StatsManager
- in a threaded data-collection, the data retrieval should happen in a
- single threaded environment to ensure files don't get potentially clobbered.
-
- Args:
- fname: filename to ensure uniqueness.
-
- Returns:
- {smid_}fname{tag}.[b].ext
- the smid portion gets prepended if |smid| is defined
- the tag portion gets appended if necessary to ensure unique fname
- """
- fdir = os.path.dirname(fname)
- base, ext = os.path.splitext(os.path.basename(fname))
- if self._smid:
- base = '%s_%s' % (self._smid, base)
- unique_fname = os.path.join(fdir, '%s%s' % (base, ext))
- tag = 0
- while os.path.exists(unique_fname):
- old_fname = unique_fname
- unique_fname = os.path.join(fdir, '%s%d%s' % (base, tag, ext))
- self._logger.warning('Attempted to store stats information at %s, but '
- 'file already exists. Attempting to store at %s '
- 'now.', old_fname, unique_fname)
- tag += 1
- return unique_fname
-
- def SaveSummary(self, directory, fname='summary.txt', prefix=STATS_PREFIX):
- """Save summary to file.
-
- Args:
- directory: directory to save the summary in.
- fname: filename to save summary under.
- prefix: start every row in summary string with prefix, for easier reading.
-
- Returns:
- full path of summary save location
- """
- summary_str = self.SummaryToString(prefix=prefix) + '\n'
- return self._SaveSummary(summary_str, directory, fname)
-
- def SaveSummaryJSON(self, directory, fname='summary.json'):
- """Save summary (only MEAN) into a JSON file.
-
- Args:
- directory: directory to save the JSON summary in.
- fname: filename to save summary under.
-
- Returns:
- full path of summary save location
- """
- data = {}
- for domain in self._summary:
- unit = LONG_UNIT.get(self._unit[domain], self._unit[domain])
- data_entry = {'mean': self._summary[domain]['mean'], 'unit': unit}
- data[domain] = data_entry
- summary_str = json.dumps(data, indent=2)
- return self._SaveSummary(summary_str, directory, fname)
-
- def SaveSummaryMD(self, directory, fname='summary.md'):
- """Save summary into a MD file to paste into b/.
-
- Args:
- directory: directory to save the MD summary in.
- fname: filename to save summary under.
-
- Returns:
- full path of summary save location
+class StatsManager(object):
+ """Calculates statistics for several lists of data(float).
+
+ Example usage:
+
+ >>> stats = StatsManager(title='Title Banner')
+ >>> stats.AddSample(TIME_KEY, 50.0)
+ >>> stats.AddSample(TIME_KEY, 25.0)
+ >>> stats.AddSample(TIME_KEY, 40.0)
+ >>> stats.AddSample(TIME_KEY, 10.0)
+ >>> stats.AddSample(TIME_KEY, 10.0)
+ >>> stats.AddSample('frobnicate', 11.5)
+ >>> stats.AddSample('frobnicate', 9.0)
+ >>> stats.AddSample('foobar', 11111.0)
+ >>> stats.AddSample('foobar', 22222.0)
+ >>> stats.CalculateStats()
+ >>> print(stats.SummaryToString())
+ ` @@--------------------------------------------------------------
+ ` @@ Title Banner
+ @@--------------------------------------------------------------
+ @@ NAME COUNT MEAN STDDEV MAX MIN
+ @@ sample_msecs 4 31.25 15.16 50.00 10.00
+ @@ foobar 2 16666.50 5555.50 22222.00 11111.00
+ @@ frobnicate 2 10.25 1.25 11.50 9.00
+ ` @@--------------------------------------------------------------
+
+ Attributes:
+ _data: dict of list of readings for each domain(key)
+ _unit: dict of unit for each domain(key)
+ _smid: id supplied to differentiate data output to other StatsManager
+ instances that potentially save to the same directory
+ if smid all output files will be named |smid|_|fname|
+ _title: title to add as banner to formatted summary. If no title,
+ no banner gets added
+ _order: list of formatting order for domains. Domains not listed are
+ displayed in sorted order
+ _hide_domains: collection of domains to hide when formatting summary string
+ _accept_nan: flag to indicate if NaN samples are acceptable
+ _nan_domains: set to keep track of which domains contain NaN samples
+ _summary: dict of stats per domain (key): min, max, count, mean, stddev
+ _logger = StatsManager logger
+
+ Note:
+ _summary is empty until CalculateStats() is called, and is updated when
+ CalculateStats() is called.
"""
- summary_str = self.SummaryToMarkdownString()
- return self._SaveSummary(summary_str, directory, fname)
- def _SaveSummary(self, output_str, directory, fname):
- """Wrote |output_str| to |fname|.
-
- Args:
- output_str: formatted output string
- directory: directory to save the summary in.
- fname: filename to save summary under.
-
- Returns:
- full path of summary save location
- """
- if not os.path.exists(directory):
- os.makedirs(directory)
- fname = self._MakeUniqueFName(os.path.join(directory, fname))
- with open(fname, 'w') as f:
- f.write(output_str)
- return fname
-
- def GetRawData(self):
- """Getter for all raw_data."""
- return self._data
-
- def SaveRawData(self, directory, dirname='raw_data'):
- """Save raw data to file.
-
- Args:
- directory: directory to create the raw data folder in.
- dirname: folder in which raw data live.
-
- Returns:
- list of full path of each domain's raw data save location
- """
- if not os.path.exists(directory):
- os.makedirs(directory)
- dirname = os.path.join(directory, dirname)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- fnames = []
- for domain, data in self._data.items():
- if not domain.endswith(self._unit[domain]):
- domain = '%s_%s' % (domain, self._unit[domain])
- fname = self._MakeUniqueFName(os.path.join(dirname, '%s.txt' % domain))
- with open(fname, 'w') as f:
- f.write('\n'.join('%.2f' % sample for sample in data) + '\n')
- fnames.append(fname)
- return fnames
+ # pylint: disable=W0102
+ def __init__(
+ self, smid="", title="", order=[], hide_domains=[], accept_nan=True
+ ):
+ """Initialize infrastructure for data and their statistics."""
+ self._title = title
+ self._data = collections.defaultdict(list)
+ self._unit = collections.defaultdict(str)
+ self._smid = smid
+ self._order = order
+ self._hide_domains = hide_domains
+ self._accept_nan = accept_nan
+ self._nan_domains = set()
+ self._summary = {}
+ self._logger = logging.getLogger(type(self).__name__)
+
+ def AddSample(self, domain, sample):
+ """Add one sample for a domain.
+
+ Args:
+ domain: the domain name for the sample.
+ sample: one time sample for domain, expect type float.
+
+ Raises:
+ StatsManagerError: if trying to add NaN and |_accept_nan| is false
+ """
+ try:
+ sample = float(sample)
+ except ValueError:
+ # if we don't accept nan this will be caught below
+ self._logger.debug(
+ "sample %s for domain %s is not a number. Making NaN",
+ sample,
+ domain,
+ )
+ sample = float("NaN")
+ if not self._accept_nan and math.isnan(sample):
+ raise StatsManagerError(
+ "accept_nan is false. Cannot add NaN sample."
+ )
+ self._data[domain].append(sample)
+ if math.isnan(sample):
+ self._nan_domains.add(domain)
+
+ def SetUnit(self, domain, unit):
+ """Set the unit for a domain.
+
+ There can be only one unit for each domain. Setting unit twice will
+ overwrite the original unit.
+
+ Args:
+ domain: the domain name.
+ unit: unit of the domain.
+ """
+ if domain in self._unit:
+ self._logger.warning(
+ "overwriting the unit of %s, old unit is %s, new "
+ "unit is %s.",
+ domain,
+ self._unit[domain],
+ unit,
+ )
+ self._unit[domain] = unit
+
+ def CalculateStats(self):
+ """Calculate stats for all domain-data pairs.
+
+ First erases all previous stats, then calculate stats for all data.
+ """
+ self._summary = {}
+ for domain, data in self._data.items():
+ data_np = numpy.array(data)
+ self._summary[domain] = {
+ "mean": numpy.nanmean(data_np),
+ "min": numpy.nanmin(data_np),
+ "max": numpy.nanmax(data_np),
+ "stddev": numpy.nanstd(data_np),
+ "count": data_np.size,
+ }
+
+ @property
+ def DomainsToDisplay(self):
+ """List of domains that the manager will output in summaries."""
+ return set(self._summary.keys()) - set(self._hide_domains)
+
+ @property
+ def NanInOutput(self):
+ """Return whether any of the domains to display have NaN values."""
+ return bool(len(set(self._nan_domains) & self.DomainsToDisplay))
+
+ def _SummaryTable(self):
+ """Generate the matrix to output as a summary.
+
+ Returns:
+ A 2d matrix of headers and their data for each domain
+ e.g.
+ [[NAME, COUNT, MEAN, STDDEV, MAX, MIN],
+ [pp5000_mw, 10, 50, 0, 50, 50]]
+ """
+ headers = ("NAME", "COUNT", "MEAN", "STDDEV", "MAX", "MIN")
+ table = [headers]
+ # determine what domains to display & and the order
+ domains_to_display = self.DomainsToDisplay
+ display_order = [
+ key for key in self._order if key in domains_to_display
+ ]
+ domains_to_display -= set(display_order)
+ display_order.extend(sorted(domains_to_display))
+ for domain in display_order:
+ stats = self._summary[domain]
+ if not domain.endswith(self._unit[domain]):
+ domain = "%s_%s" % (domain, self._unit[domain])
+ if domain in self._nan_domains:
+ domain = "%s%s" % (domain, NAN_TAG)
+ row = [domain]
+ row.append(str(stats["count"]))
+ for entry in headers[2:]:
+ row.append("%.2f" % stats[entry.lower()])
+ table.append(row)
+ return table
+
+ def SummaryToMarkdownString(self):
+ """Format the summary into a b/ compatible markdown table string.
+
+ This requires this sort of output format
+
+ | header1 | header2 | header3 | ...
+ | --------- | --------- | --------- | ...
+ | sample1h1 | sample1h2 | sample1h3 | ...
+ .
+ .
+ .
+
+ Returns:
+ formatted summary string.
+ """
+ # All we need to do before processing is insert a row of '-' between
+ # the headers, and the data
+ table = self._SummaryTable()
+ columns = len(table[0])
+ # Using '-:' to allow the numbers to be right aligned
+ sep_row = ["-"] + ["-:"] * (columns - 1)
+ table.insert(1, sep_row)
+ text_rows = ["|".join(r) for r in table]
+ body = "\n".join(["|%s|" % r for r in text_rows])
+ if self._title:
+ title_section = "**%s** \n\n" % self._title
+ body = title_section + body
+ # Make sure that the body is terminated with a newline.
+ return body + "\n"
+
+ def SummaryToString(self, prefix=STATS_PREFIX):
+ """Format summary into a string, ready for pretty print.
+
+ See class description for format example.
+
+ Args:
+ prefix: start every row in summary string with prefix, for easier reading.
+
+ Returns:
+ formatted summary string.
+ """
+ table = self._SummaryTable()
+ max_col_width = []
+ for col_idx in range(len(table[0])):
+ col_item_widths = [len(row[col_idx]) for row in table]
+ max_col_width.append(max(col_item_widths))
+
+ formatted_lines = []
+ for row in table:
+ formatted_row = prefix + " "
+ for i in range(len(row)):
+ formatted_row += row[i].rjust(max_col_width[i] + 2)
+ formatted_lines.append(formatted_row)
+ if self.NanInOutput:
+ formatted_lines.append("%s %s" % (prefix, NAN_DESCRIPTION))
+
+ if self._title:
+ line_length = len(formatted_lines[0])
+ dec_length = len(prefix)
+ # trim title to be at most as long as the longest line without the prefix
+ title = self._title[: (line_length - dec_length)]
+ # line is a seperator line consisting of -----
+ line = "%s%s" % (prefix, "-" * (line_length - dec_length))
+ # prepend the prefix to the centered title
+ padded_title = "%s%s" % (
+ prefix,
+ title.center(line_length)[dec_length:],
+ )
+ formatted_lines = (
+ [line, padded_title, line] + formatted_lines + [line]
+ )
+ formatted_output = "\n".join(formatted_lines)
+ return formatted_output
+
+ def GetSummary(self):
+ """Getter for summary."""
+ return self._summary
+
+ def _MakeUniqueFName(self, fname):
+ """prepend |_smid| to fname & rotate fname to ensure uniqueness.
+
+ Before saving a file through the StatsManager, make sure that the filename
+ is unique, first by prepending the smid if any and otherwise by appending
+ increasing integer suffixes until the filename is unique.
+
+ If |smid| is defined /path/to/example/file.txt becomes
+ /path/to/example/{smid}_file.txt.
+
+ The rotation works by changing /path/to/example/somename.txt to
+ /path/to/example/somename1.txt if the first one already exists on the
+ system.
+
+ Note: this is not thread-safe. While it makes sense to use StatsManager
+ in a threaded data-collection, the data retrieval should happen in a
+ single threaded environment to ensure files don't get potentially clobbered.
+
+ Args:
+ fname: filename to ensure uniqueness.
+
+ Returns:
+ {smid_}fname{tag}.[b].ext
+ the smid portion gets prepended if |smid| is defined
+ the tag portion gets appended if necessary to ensure unique fname
+ """
+ fdir = os.path.dirname(fname)
+ base, ext = os.path.splitext(os.path.basename(fname))
+ if self._smid:
+ base = "%s_%s" % (self._smid, base)
+ unique_fname = os.path.join(fdir, "%s%s" % (base, ext))
+ tag = 0
+ while os.path.exists(unique_fname):
+ old_fname = unique_fname
+ unique_fname = os.path.join(fdir, "%s%d%s" % (base, tag, ext))
+ self._logger.warning(
+ "Attempted to store stats information at %s, but "
+ "file already exists. Attempting to store at %s "
+ "now.",
+ old_fname,
+ unique_fname,
+ )
+ tag += 1
+ return unique_fname
+
+ def SaveSummary(self, directory, fname="summary.txt", prefix=STATS_PREFIX):
+ """Save summary to file.
+
+ Args:
+ directory: directory to save the summary in.
+ fname: filename to save summary under.
+ prefix: start every row in summary string with prefix, for easier reading.
+
+ Returns:
+ full path of summary save location
+ """
+ summary_str = self.SummaryToString(prefix=prefix) + "\n"
+ return self._SaveSummary(summary_str, directory, fname)
+
+ def SaveSummaryJSON(self, directory, fname="summary.json"):
+ """Save summary (only MEAN) into a JSON file.
+
+ Args:
+ directory: directory to save the JSON summary in.
+ fname: filename to save summary under.
+
+ Returns:
+ full path of summary save location
+ """
+ data = {}
+ for domain in self._summary:
+ unit = LONG_UNIT.get(self._unit[domain], self._unit[domain])
+ data_entry = {"mean": self._summary[domain]["mean"], "unit": unit}
+ data[domain] = data_entry
+ summary_str = json.dumps(data, indent=2)
+ return self._SaveSummary(summary_str, directory, fname)
+
+ def SaveSummaryMD(self, directory, fname="summary.md"):
+ """Save summary into a MD file to paste into b/.
+
+ Args:
+ directory: directory to save the MD summary in.
+ fname: filename to save summary under.
+
+ Returns:
+ full path of summary save location
+ """
+ summary_str = self.SummaryToMarkdownString()
+ return self._SaveSummary(summary_str, directory, fname)
+
+ def _SaveSummary(self, output_str, directory, fname):
+ """Wrote |output_str| to |fname|.
+
+ Args:
+ output_str: formatted output string
+ directory: directory to save the summary in.
+ fname: filename to save summary under.
+
+ Returns:
+ full path of summary save location
+ """
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+ fname = self._MakeUniqueFName(os.path.join(directory, fname))
+ with open(fname, "w") as f:
+ f.write(output_str)
+ return fname
+
+ def GetRawData(self):
+ """Getter for all raw_data."""
+ return self._data
+
+ def SaveRawData(self, directory, dirname="raw_data"):
+ """Save raw data to file.
+
+ Args:
+ directory: directory to create the raw data folder in.
+ dirname: folder in which raw data live.
+
+ Returns:
+ list of full path of each domain's raw data save location
+ """
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+ dirname = os.path.join(directory, dirname)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+ fnames = []
+ for domain, data in self._data.items():
+ if not domain.endswith(self._unit[domain]):
+ domain = "%s_%s" % (domain, self._unit[domain])
+ fname = self._MakeUniqueFName(
+ os.path.join(dirname, "%s.txt" % domain)
+ )
+ with open(fname, "w") as f:
+ f.write("\n".join("%.2f" % sample for sample in data) + "\n")
+ fnames.append(fname)
+ return fnames
diff --git a/extra/usb_power/stats_manager_unittest.py b/extra/usb_power/stats_manager_unittest.py
index beb9984b93..2bfaa5c83d 100644
--- a/extra/usb_power/stats_manager_unittest.py
+++ b/extra/usb_power/stats_manager_unittest.py
@@ -1,14 +1,11 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""Unit tests for StatsManager."""
from __future__ import print_function
+
import json
import os
import re
@@ -16,300 +13,314 @@ import shutil
import tempfile
import unittest
-import stats_manager
+import stats_manager # pylint:disable=import-error
class TestStatsManager(unittest.TestCase):
- """Test to verify StatsManager methods work as expected.
-
- StatsManager should collect raw data, calculate their statistics, and save
- them in expected format.
- """
-
- def _populate_mock_stats(self):
- """Create a populated & processed StatsManager to test data retrieval."""
- self.data.AddSample('A', 99999.5)
- self.data.AddSample('A', 100000.5)
- self.data.SetUnit('A', 'uW')
- self.data.SetUnit('A', 'mW')
- self.data.AddSample('B', 1.5)
- self.data.AddSample('B', 2.5)
- self.data.AddSample('B', 3.5)
- self.data.SetUnit('B', 'mV')
- self.data.CalculateStats()
-
- def _populate_mock_stats_no_unit(self):
- self.data.AddSample('B', 1000)
- self.data.AddSample('A', 200)
- self.data.SetUnit('A', 'blue')
-
- def setUp(self):
- """Set up StatsManager and create a temporary directory for test."""
- self.tempdir = tempfile.mkdtemp()
- self.data = stats_manager.StatsManager()
-
- def tearDown(self):
- """Delete the temporary directory and its content."""
- shutil.rmtree(self.tempdir)
-
- def test_AddSample(self):
- """Adding a sample successfully adds a sample."""
- self.data.AddSample('Test', 1000)
- self.data.SetUnit('Test', 'test')
- self.data.CalculateStats()
- summary = self.data.GetSummary()
- self.assertEqual(1, summary['Test']['count'])
-
- def test_AddSampleNoFloatAcceptNaN(self):
- """Adding a non-number adds 'NaN' and doesn't raise an exception."""
- self.data.AddSample('Test', 10)
- self.data.AddSample('Test', 20)
- # adding a fake NaN: one that gets converted into NaN internally
- self.data.AddSample('Test', 'fiesta')
- # adding a real NaN
- self.data.AddSample('Test', float('NaN'))
- self.data.SetUnit('Test', 'test')
- self.data.CalculateStats()
- summary = self.data.GetSummary()
- # assert that 'NaN' as added.
- self.assertEqual(4, summary['Test']['count'])
- # assert that mean, min, and max calculatings ignore the 'NaN'
- self.assertEqual(10, summary['Test']['min'])
- self.assertEqual(20, summary['Test']['max'])
- self.assertEqual(15, summary['Test']['mean'])
-
- def test_AddSampleNoFloatNotAcceptNaN(self):
- """Adding a non-number raises a StatsManagerError if accept_nan is False."""
- self.data = stats_manager.StatsManager(accept_nan=False)
- with self.assertRaisesRegexp(stats_manager.StatsManagerError,
- 'accept_nan is false. Cannot add NaN sample.'):
- # adding a fake NaN: one that gets converted into NaN internally
- self.data.AddSample('Test', 'fiesta')
- with self.assertRaisesRegexp(stats_manager.StatsManagerError,
- 'accept_nan is false. Cannot add NaN sample.'):
- # adding a real NaN
- self.data.AddSample('Test', float('NaN'))
-
- def test_AddSampleNoUnit(self):
- """Not adding a unit does not cause an exception on CalculateStats()."""
- self.data.AddSample('Test', 17)
- self.data.CalculateStats()
- summary = self.data.GetSummary()
- self.assertEqual(1, summary['Test']['count'])
-
- def test_UnitSuffix(self):
- """Unit gets appended as a suffix in the displayed summary."""
- self.data.AddSample('test', 250)
- self.data.SetUnit('test', 'mw')
- self.data.CalculateStats()
- summary_str = self.data.SummaryToString()
- self.assertIn('test_mw', summary_str)
-
- def test_DoubleUnitSuffix(self):
- """If domain already ends in unit, verify that unit doesn't get appended."""
- self.data.AddSample('test_mw', 250)
- self.data.SetUnit('test_mw', 'mw')
- self.data.CalculateStats()
- summary_str = self.data.SummaryToString()
- self.assertIn('test_mw', summary_str)
- self.assertNotIn('test_mw_mw', summary_str)
-
- def test_GetRawData(self):
- """GetRawData returns exact same data as fed in."""
- self._populate_mock_stats()
- raw_data = self.data.GetRawData()
- self.assertListEqual([99999.5, 100000.5], raw_data['A'])
- self.assertListEqual([1.5, 2.5, 3.5], raw_data['B'])
-
- def test_GetSummary(self):
- """GetSummary returns expected stats about the data fed in."""
- self._populate_mock_stats()
- summary = self.data.GetSummary()
- self.assertEqual(2, summary['A']['count'])
- self.assertAlmostEqual(100000.5, summary['A']['max'])
- self.assertAlmostEqual(99999.5, summary['A']['min'])
- self.assertAlmostEqual(0.5, summary['A']['stddev'])
- self.assertAlmostEqual(100000.0, summary['A']['mean'])
- self.assertEqual(3, summary['B']['count'])
- self.assertAlmostEqual(3.5, summary['B']['max'])
- self.assertAlmostEqual(1.5, summary['B']['min'])
- self.assertAlmostEqual(0.81649658092773, summary['B']['stddev'])
- self.assertAlmostEqual(2.5, summary['B']['mean'])
-
- def test_SaveRawData(self):
- """SaveRawData stores same data as fed in."""
- self._populate_mock_stats()
- dirname = 'unittest_raw_data'
- expected_files = set(['A_mW.txt', 'B_mV.txt'])
- fnames = self.data.SaveRawData(self.tempdir, dirname)
- files_returned = set([os.path.basename(f) for f in fnames])
- # Assert that only the expected files got returned.
- self.assertEqual(expected_files, files_returned)
- # Assert that only the returned files are in the outdir.
- self.assertEqual(set(os.listdir(os.path.join(self.tempdir, dirname))),
- files_returned)
- for fname in fnames:
- with open(fname, 'r') as f:
- if 'A_mW' in fname:
- self.assertEqual('99999.50', f.readline().strip())
- self.assertEqual('100000.50', f.readline().strip())
- if 'B_mV' in fname:
- self.assertEqual('1.50', f.readline().strip())
- self.assertEqual('2.50', f.readline().strip())
- self.assertEqual('3.50', f.readline().strip())
-
- def test_SaveRawDataNoUnit(self):
- """SaveRawData appends no unit suffix if the unit is not specified."""
- self._populate_mock_stats_no_unit()
- self.data.CalculateStats()
- outdir = 'unittest_raw_data'
- files = self.data.SaveRawData(self.tempdir, outdir)
- files = [os.path.basename(f) for f in files]
- # Verify nothing gets appended to domain for filename if no unit exists.
- self.assertIn('B.txt', files)
-
- def test_SaveRawDataSMID(self):
- """SaveRawData uses the smid when creating output filename."""
- identifier = 'ec'
- self.data = stats_manager.StatsManager(smid=identifier)
- self._populate_mock_stats()
- files = self.data.SaveRawData(self.tempdir)
- for fname in files:
- self.assertTrue(os.path.basename(fname).startswith(identifier))
-
- def test_SummaryToStringNaNHelp(self):
- """NaN containing row gets tagged with *, help banner gets added."""
- help_banner_exp = '%s %s' % (stats_manager.STATS_PREFIX,
- stats_manager.NAN_DESCRIPTION)
- nan_domain = 'A-domain'
- nan_domain_exp = '%s%s' % (nan_domain, stats_manager.NAN_TAG)
- # NaN helper banner is added when a NaN domain is found & domain gets tagged
- data = stats_manager.StatsManager()
- data.AddSample(nan_domain, float('NaN'))
- data.AddSample(nan_domain, 17)
- data.AddSample('B-domain', 17)
- data.CalculateStats()
- summarystr = data.SummaryToString()
- self.assertIn(help_banner_exp, summarystr)
- self.assertIn(nan_domain_exp, summarystr)
- # NaN helper banner is not added when no NaN domain output, no tagging
- data = stats_manager.StatsManager()
- # nan_domain in this scenario does not contain any NaN
- data.AddSample(nan_domain, 19)
- data.AddSample('B-domain', 17)
- data.CalculateStats()
- summarystr = data.SummaryToString()
- self.assertNotIn(help_banner_exp, summarystr)
- self.assertNotIn(nan_domain_exp, summarystr)
-
- def test_SummaryToStringTitle(self):
- """Title shows up in SummaryToString if title specified."""
- title = 'titulo'
- data = stats_manager.StatsManager(title=title)
- self._populate_mock_stats()
- summary_str = data.SummaryToString()
- self.assertIn(title, summary_str)
-
- def test_SummaryToStringHideDomains(self):
- """Keys indicated in hide_domains are not printed in the summary."""
- data = stats_manager.StatsManager(hide_domains=['A-domain'])
- data.AddSample('A-domain', 17)
- data.AddSample('B-domain', 17)
- data.CalculateStats()
- summary_str = data.SummaryToString()
- self.assertIn('B-domain', summary_str)
- self.assertNotIn('A-domain', summary_str)
-
- def test_SummaryToStringOrder(self):
- """Order passed into StatsManager is honoured when formatting summary."""
- # StatsManager that should print D & B first, and the subsequent elements
- # are sorted.
- d_b_a_c_regexp = re.compile('D-domain.*B-domain.*A-domain.*C-domain',
- re.DOTALL)
- data = stats_manager.StatsManager(order=['D-domain', 'B-domain'])
- data.AddSample('A-domain', 17)
- data.AddSample('B-domain', 17)
- data.AddSample('C-domain', 17)
- data.AddSample('D-domain', 17)
- data.CalculateStats()
- summary_str = data.SummaryToString()
- self.assertRegexpMatches(summary_str, d_b_a_c_regexp)
-
- def test_MakeUniqueFName(self):
- data = stats_manager.StatsManager()
- testfile = os.path.join(self.tempdir, 'testfile.txt')
- with open(testfile, 'w') as f:
- f.write('')
- expected_fname = os.path.join(self.tempdir, 'testfile0.txt')
- self.assertEqual(expected_fname, data._MakeUniqueFName(testfile))
-
- def test_SaveSummary(self):
- """SaveSummary properly dumps the summary into a file."""
- self._populate_mock_stats()
- fname = 'unittest_summary.txt'
- expected_fname = os.path.join(self.tempdir, fname)
- fname = self.data.SaveSummary(self.tempdir, fname)
- # Assert the reported fname is the same as the expected fname
- self.assertEqual(expected_fname, fname)
- # Assert only the reported fname is output (in the tempdir)
- self.assertEqual(set([os.path.basename(fname)]),
- set(os.listdir(self.tempdir)))
- with open(fname, 'r') as f:
- self.assertEqual(
- '@@ NAME COUNT MEAN STDDEV MAX MIN\n',
- f.readline())
- self.assertEqual(
- '@@ A_mW 2 100000.00 0.50 100000.50 99999.50\n',
- f.readline())
- self.assertEqual(
- '@@ B_mV 3 2.50 0.82 3.50 1.50\n',
- f.readline())
-
- def test_SaveSummarySMID(self):
- """SaveSummary uses the smid when creating output filename."""
- identifier = 'ec'
- self.data = stats_manager.StatsManager(smid=identifier)
- self._populate_mock_stats()
- fname = os.path.basename(self.data.SaveSummary(self.tempdir))
- self.assertTrue(fname.startswith(identifier))
-
- def test_SaveSummaryJSON(self):
- """SaveSummaryJSON saves the added data properly in JSON format."""
- self._populate_mock_stats()
- fname = 'unittest_summary.json'
- expected_fname = os.path.join(self.tempdir, fname)
- fname = self.data.SaveSummaryJSON(self.tempdir, fname)
- # Assert the reported fname is the same as the expected fname
- self.assertEqual(expected_fname, fname)
- # Assert only the reported fname is output (in the tempdir)
- self.assertEqual(set([os.path.basename(fname)]),
- set(os.listdir(self.tempdir)))
- with open(fname, 'r') as f:
- summary = json.load(f)
- self.assertAlmostEqual(100000.0, summary['A']['mean'])
- self.assertEqual('milliwatt', summary['A']['unit'])
- self.assertAlmostEqual(2.5, summary['B']['mean'])
- self.assertEqual('millivolt', summary['B']['unit'])
-
- def test_SaveSummaryJSONSMID(self):
- """SaveSummaryJSON uses the smid when creating output filename."""
- identifier = 'ec'
- self.data = stats_manager.StatsManager(smid=identifier)
- self._populate_mock_stats()
- fname = os.path.basename(self.data.SaveSummaryJSON(self.tempdir))
- self.assertTrue(fname.startswith(identifier))
-
- def test_SaveSummaryJSONNoUnit(self):
- """SaveSummaryJSON marks unknown units properly as N/A."""
- self._populate_mock_stats_no_unit()
- self.data.CalculateStats()
- fname = 'unittest_summary.json'
- fname = self.data.SaveSummaryJSON(self.tempdir, fname)
- with open(fname, 'r') as f:
- summary = json.load(f)
- self.assertEqual('blue', summary['A']['unit'])
- # if no unit is specified, JSON should save 'N/A' as the unit.
- self.assertEqual('N/A', summary['B']['unit'])
-
-if __name__ == '__main__':
- unittest.main()
+ """Test to verify StatsManager methods work as expected.
+
+ StatsManager should collect raw data, calculate their statistics, and save
+ them in expected format.
+ """
+
+ def _populate_mock_stats(self):
+ """Create a populated & processed StatsManager to test data retrieval."""
+ self.data.AddSample("A", 99999.5)
+ self.data.AddSample("A", 100000.5)
+ self.data.SetUnit("A", "uW")
+ self.data.SetUnit("A", "mW")
+ self.data.AddSample("B", 1.5)
+ self.data.AddSample("B", 2.5)
+ self.data.AddSample("B", 3.5)
+ self.data.SetUnit("B", "mV")
+ self.data.CalculateStats()
+
+ def _populate_mock_stats_no_unit(self):
+ self.data.AddSample("B", 1000)
+ self.data.AddSample("A", 200)
+ self.data.SetUnit("A", "blue")
+
+ def setUp(self):
+ """Set up StatsManager and create a temporary directory for test."""
+ self.tempdir = tempfile.mkdtemp()
+ self.data = stats_manager.StatsManager()
+
+ def tearDown(self):
+ """Delete the temporary directory and its content."""
+ shutil.rmtree(self.tempdir)
+
+ def test_AddSample(self):
+ """Adding a sample successfully adds a sample."""
+ self.data.AddSample("Test", 1000)
+ self.data.SetUnit("Test", "test")
+ self.data.CalculateStats()
+ summary = self.data.GetSummary()
+ self.assertEqual(1, summary["Test"]["count"])
+
+ def test_AddSampleNoFloatAcceptNaN(self):
+ """Adding a non-number adds 'NaN' and doesn't raise an exception."""
+ self.data.AddSample("Test", 10)
+ self.data.AddSample("Test", 20)
+ # adding a fake NaN: one that gets converted into NaN internally
+ self.data.AddSample("Test", "fiesta")
+ # adding a real NaN
+ self.data.AddSample("Test", float("NaN"))
+ self.data.SetUnit("Test", "test")
+ self.data.CalculateStats()
+ summary = self.data.GetSummary()
+ # assert that 'NaN' as added.
+ self.assertEqual(4, summary["Test"]["count"])
+ # assert that mean, min, and max calculatings ignore the 'NaN'
+ self.assertEqual(10, summary["Test"]["min"])
+ self.assertEqual(20, summary["Test"]["max"])
+ self.assertEqual(15, summary["Test"]["mean"])
+
+ def test_AddSampleNoFloatNotAcceptNaN(self):
+ """Adding a non-number raises a StatsManagerError if accept_nan is False."""
+ self.data = stats_manager.StatsManager(accept_nan=False)
+ with self.assertRaisesRegexp(
+ stats_manager.StatsManagerError,
+ "accept_nan is false. Cannot add NaN sample.",
+ ):
+ # adding a fake NaN: one that gets converted into NaN internally
+ self.data.AddSample("Test", "fiesta")
+ with self.assertRaisesRegexp(
+ stats_manager.StatsManagerError,
+ "accept_nan is false. Cannot add NaN sample.",
+ ):
+ # adding a real NaN
+ self.data.AddSample("Test", float("NaN"))
+
+ def test_AddSampleNoUnit(self):
+ """Not adding a unit does not cause an exception on CalculateStats()."""
+ self.data.AddSample("Test", 17)
+ self.data.CalculateStats()
+ summary = self.data.GetSummary()
+ self.assertEqual(1, summary["Test"]["count"])
+
+ def test_UnitSuffix(self):
+ """Unit gets appended as a suffix in the displayed summary."""
+ self.data.AddSample("test", 250)
+ self.data.SetUnit("test", "mw")
+ self.data.CalculateStats()
+ summary_str = self.data.SummaryToString()
+ self.assertIn("test_mw", summary_str)
+
+ def test_DoubleUnitSuffix(self):
+ """If domain already ends in unit, verify that unit doesn't get appended."""
+ self.data.AddSample("test_mw", 250)
+ self.data.SetUnit("test_mw", "mw")
+ self.data.CalculateStats()
+ summary_str = self.data.SummaryToString()
+ self.assertIn("test_mw", summary_str)
+ self.assertNotIn("test_mw_mw", summary_str)
+
+ def test_GetRawData(self):
+ """GetRawData returns exact same data as fed in."""
+ self._populate_mock_stats()
+ raw_data = self.data.GetRawData()
+ self.assertListEqual([99999.5, 100000.5], raw_data["A"])
+ self.assertListEqual([1.5, 2.5, 3.5], raw_data["B"])
+
+ def test_GetSummary(self):
+ """GetSummary returns expected stats about the data fed in."""
+ self._populate_mock_stats()
+ summary = self.data.GetSummary()
+ self.assertEqual(2, summary["A"]["count"])
+ self.assertAlmostEqual(100000.5, summary["A"]["max"])
+ self.assertAlmostEqual(99999.5, summary["A"]["min"])
+ self.assertAlmostEqual(0.5, summary["A"]["stddev"])
+ self.assertAlmostEqual(100000.0, summary["A"]["mean"])
+ self.assertEqual(3, summary["B"]["count"])
+ self.assertAlmostEqual(3.5, summary["B"]["max"])
+ self.assertAlmostEqual(1.5, summary["B"]["min"])
+ self.assertAlmostEqual(0.81649658092773, summary["B"]["stddev"])
+ self.assertAlmostEqual(2.5, summary["B"]["mean"])
+
+ def test_SaveRawData(self):
+ """SaveRawData stores same data as fed in."""
+ self._populate_mock_stats()
+ dirname = "unittest_raw_data"
+ expected_files = set(["A_mW.txt", "B_mV.txt"])
+ fnames = self.data.SaveRawData(self.tempdir, dirname)
+ files_returned = set([os.path.basename(f) for f in fnames])
+ # Assert that only the expected files got returned.
+ self.assertEqual(expected_files, files_returned)
+ # Assert that only the returned files are in the outdir.
+ self.assertEqual(
+ set(os.listdir(os.path.join(self.tempdir, dirname))), files_returned
+ )
+ for fname in fnames:
+ with open(fname, "r") as f:
+ if "A_mW" in fname:
+ self.assertEqual("99999.50", f.readline().strip())
+ self.assertEqual("100000.50", f.readline().strip())
+ if "B_mV" in fname:
+ self.assertEqual("1.50", f.readline().strip())
+ self.assertEqual("2.50", f.readline().strip())
+ self.assertEqual("3.50", f.readline().strip())
+
+ def test_SaveRawDataNoUnit(self):
+ """SaveRawData appends no unit suffix if the unit is not specified."""
+ self._populate_mock_stats_no_unit()
+ self.data.CalculateStats()
+ outdir = "unittest_raw_data"
+ files = self.data.SaveRawData(self.tempdir, outdir)
+ files = [os.path.basename(f) for f in files]
+ # Verify nothing gets appended to domain for filename if no unit exists.
+ self.assertIn("B.txt", files)
+
+ def test_SaveRawDataSMID(self):
+ """SaveRawData uses the smid when creating output filename."""
+ identifier = "ec"
+ self.data = stats_manager.StatsManager(smid=identifier)
+ self._populate_mock_stats()
+ files = self.data.SaveRawData(self.tempdir)
+ for fname in files:
+ self.assertTrue(os.path.basename(fname).startswith(identifier))
+
+ def test_SummaryToStringNaNHelp(self):
+ """NaN containing row gets tagged with *, help banner gets added."""
+ help_banner_exp = "%s %s" % (
+ stats_manager.STATS_PREFIX,
+ stats_manager.NAN_DESCRIPTION,
+ )
+ nan_domain = "A-domain"
+ nan_domain_exp = "%s%s" % (nan_domain, stats_manager.NAN_TAG)
+ # NaN helper banner is added when a NaN domain is found & domain gets tagged
+ data = stats_manager.StatsManager()
+ data.AddSample(nan_domain, float("NaN"))
+ data.AddSample(nan_domain, 17)
+ data.AddSample("B-domain", 17)
+ data.CalculateStats()
+ summarystr = data.SummaryToString()
+ self.assertIn(help_banner_exp, summarystr)
+ self.assertIn(nan_domain_exp, summarystr)
+ # NaN helper banner is not added when no NaN domain output, no tagging
+ data = stats_manager.StatsManager()
+ # nan_domain in this scenario does not contain any NaN
+ data.AddSample(nan_domain, 19)
+ data.AddSample("B-domain", 17)
+ data.CalculateStats()
+ summarystr = data.SummaryToString()
+ self.assertNotIn(help_banner_exp, summarystr)
+ self.assertNotIn(nan_domain_exp, summarystr)
+
+ def test_SummaryToStringTitle(self):
+ """Title shows up in SummaryToString if title specified."""
+ title = "titulo"
+ data = stats_manager.StatsManager(title=title)
+ self._populate_mock_stats()
+ summary_str = data.SummaryToString()
+ self.assertIn(title, summary_str)
+
+ def test_SummaryToStringHideDomains(self):
+ """Keys indicated in hide_domains are not printed in the summary."""
+ data = stats_manager.StatsManager(hide_domains=["A-domain"])
+ data.AddSample("A-domain", 17)
+ data.AddSample("B-domain", 17)
+ data.CalculateStats()
+ summary_str = data.SummaryToString()
+ self.assertIn("B-domain", summary_str)
+ self.assertNotIn("A-domain", summary_str)
+
+ def test_SummaryToStringOrder(self):
+ """Order passed into StatsManager is honoured when formatting summary."""
+ # StatsManager that should print D & B first, and the subsequent elements
+ # are sorted.
+ d_b_a_c_regexp = re.compile(
+ "D-domain.*B-domain.*A-domain.*C-domain", re.DOTALL
+ )
+ data = stats_manager.StatsManager(order=["D-domain", "B-domain"])
+ data.AddSample("A-domain", 17)
+ data.AddSample("B-domain", 17)
+ data.AddSample("C-domain", 17)
+ data.AddSample("D-domain", 17)
+ data.CalculateStats()
+ summary_str = data.SummaryToString()
+ self.assertRegexpMatches(summary_str, d_b_a_c_regexp)
+
+ def test_MakeUniqueFName(self):
+ data = stats_manager.StatsManager()
+ testfile = os.path.join(self.tempdir, "testfile.txt")
+ with open(testfile, "w") as f:
+ f.write("")
+ expected_fname = os.path.join(self.tempdir, "testfile0.txt")
+ self.assertEqual(expected_fname, data._MakeUniqueFName(testfile))
+
+ def test_SaveSummary(self):
+ """SaveSummary properly dumps the summary into a file."""
+ self._populate_mock_stats()
+ fname = "unittest_summary.txt"
+ expected_fname = os.path.join(self.tempdir, fname)
+ fname = self.data.SaveSummary(self.tempdir, fname)
+ # Assert the reported fname is the same as the expected fname
+ self.assertEqual(expected_fname, fname)
+ # Assert only the reported fname is output (in the tempdir)
+ self.assertEqual(
+ set([os.path.basename(fname)]), set(os.listdir(self.tempdir))
+ )
+ with open(fname, "r") as f:
+ self.assertEqual(
+ "@@ NAME COUNT MEAN STDDEV MAX MIN\n",
+ f.readline(),
+ )
+ self.assertEqual(
+ "@@ A_mW 2 100000.00 0.50 100000.50 99999.50\n",
+ f.readline(),
+ )
+ self.assertEqual(
+ "@@ B_mV 3 2.50 0.82 3.50 1.50\n",
+ f.readline(),
+ )
+
+ def test_SaveSummarySMID(self):
+ """SaveSummary uses the smid when creating output filename."""
+ identifier = "ec"
+ self.data = stats_manager.StatsManager(smid=identifier)
+ self._populate_mock_stats()
+ fname = os.path.basename(self.data.SaveSummary(self.tempdir))
+ self.assertTrue(fname.startswith(identifier))
+
+ def test_SaveSummaryJSON(self):
+ """SaveSummaryJSON saves the added data properly in JSON format."""
+ self._populate_mock_stats()
+ fname = "unittest_summary.json"
+ expected_fname = os.path.join(self.tempdir, fname)
+ fname = self.data.SaveSummaryJSON(self.tempdir, fname)
+ # Assert the reported fname is the same as the expected fname
+ self.assertEqual(expected_fname, fname)
+ # Assert only the reported fname is output (in the tempdir)
+ self.assertEqual(
+ set([os.path.basename(fname)]), set(os.listdir(self.tempdir))
+ )
+ with open(fname, "r") as f:
+ summary = json.load(f)
+ self.assertAlmostEqual(100000.0, summary["A"]["mean"])
+ self.assertEqual("milliwatt", summary["A"]["unit"])
+ self.assertAlmostEqual(2.5, summary["B"]["mean"])
+ self.assertEqual("millivolt", summary["B"]["unit"])
+
+ def test_SaveSummaryJSONSMID(self):
+ """SaveSummaryJSON uses the smid when creating output filename."""
+ identifier = "ec"
+ self.data = stats_manager.StatsManager(smid=identifier)
+ self._populate_mock_stats()
+ fname = os.path.basename(self.data.SaveSummaryJSON(self.tempdir))
+ self.assertTrue(fname.startswith(identifier))
+
+ def test_SaveSummaryJSONNoUnit(self):
+ """SaveSummaryJSON marks unknown units properly as N/A."""
+ self._populate_mock_stats_no_unit()
+ self.data.CalculateStats()
+ fname = "unittest_summary.json"
+ fname = self.data.SaveSummaryJSON(self.tempdir, fname)
+ with open(fname, "r") as f:
+ summary = json.load(f)
+ self.assertEqual("blue", summary["A"]["unit"])
+ # if no unit is specified, JSON should save 'N/A' as the unit.
+ self.assertEqual("N/A", summary["B"]["unit"])
+
+
+if __name__ == "__main__":
+ unittest.main()