diff options
Diffstat (limited to 'extra/usb_power')
-rw-r--r-- | extra/usb_power/convert_power_log_board.py | 47 | ||||
-rwxr-xr-x | extra/usb_power/convert_servo_ina.py | 95 | ||||
-rwxr-xr-x | extra/usb_power/powerlog.py | 1825 | ||||
-rw-r--r-- | extra/usb_power/powerlog_unittest.py | 82 | ||||
-rw-r--r-- | extra/usb_power/stats_manager.py | 769 | ||||
-rw-r--r-- | extra/usb_power/stats_manager_unittest.py | 609 |
6 files changed, 1810 insertions, 1617 deletions
diff --git a/extra/usb_power/convert_power_log_board.py b/extra/usb_power/convert_power_log_board.py index 8aab77ee4c..f5fb7e925d 100644 --- a/extra/usb_power/convert_power_log_board.py +++ b/extra/usb_power/convert_power_log_board.py @@ -1,11 +1,7 @@ #!/usr/bin/env python -# Copyright 2018 The Chromium OS Authors. All rights reserved. +# Copyright 2018 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """ Program to convert sweetberry config to servod config template. @@ -14,11 +10,12 @@ Program to convert sweetberry config to servod config template. # Note: This is a py2/3 compatible file. from __future__ import print_function + import json import os import sys -from powerlog import Spower +from powerlog import Spower # pylint:disable=import-error def fetch_records(board_file): @@ -48,21 +45,29 @@ def write_to_file(file, sweetberry, inas): inas: list of inas read from board file. """ - with open(file, 'w') as pyfile: + with open(file, "w") as pyfile: - pyfile.write('inas = [\n') + pyfile.write("inas = [\n") for rec in inas: - if rec['sweetberry'] != sweetberry: + if rec["sweetberry"] != sweetberry: continue # EX : ('sweetberry', 0x40, 'SB_FW_CAM_2P8', 5.0, 1.000, 3, False), - channel, i2c_addr = Spower.CHMAP[rec['channel']] - record = (" ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')" - ",\n" % (i2c_addr, rec['name'], rec['rs'], channel)) + channel, i2c_addr = Spower.CHMAP[rec["channel"]] + record = ( + " ('sweetberry', 0x%02x, '%s', 5.0, %f, %d, 'True')" + ",\n" + % ( + i2c_addr, + rec["name"], + rec["rs"], + channel, + ) + ) pyfile.write(record) - pyfile.write(']\n') + pyfile.write("]\n") def main(argv): @@ -76,16 +81,18 @@ def main(argv): inas = fetch_records(inputf) - sweetberry = set(rec['sweetberry'] for rec in inas) + sweetberry = set(rec["sweetberry"] for rec in inas) if len(sweetberry) == 2: - print("Converting %s to %s and %s" % (inputf, basename + '_a.py', - basename + '_b.py')) - write_to_file(basename + '_a.py', 'A', inas) - write_to_file(basename + '_b.py', 'B', inas) + print( + "Converting %s to %s and %s" + % (inputf, basename + "_a.py", basename + "_b.py") + ) + write_to_file(basename + "_a.py", "A", inas) + write_to_file(basename + "_b.py", "B", inas) else: - print("Converting %s to %s" % (inputf, basename + '.py')) - write_to_file(basename + '.py', sweetberry.pop(), inas) + print("Converting %s to %s" % (inputf, basename + ".py")) + write_to_file(basename + ".py", sweetberry.pop(), inas) if __name__ == "__main__": diff --git a/extra/usb_power/convert_servo_ina.py b/extra/usb_power/convert_servo_ina.py index 1c70f31aeb..1deb75cda4 100755 --- a/extra/usb_power/convert_servo_ina.py +++ b/extra/usb_power/convert_servo_ina.py @@ -1,11 +1,7 @@ #!/usr/bin/env python -# Copyright 2017 The Chromium OS Authors. All rights reserved. +# Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """Program to convert power logging config from a servo_ina device to a sweetberry config. @@ -14,67 +10,74 @@ # Note: This is a py2/3 compatible file. from __future__ import print_function + import os import sys def fetch_records(basename): - """Import records from servo_ina file. + """Import records from servo_ina file. - servo_ina files are python imports, and have a list of tuples with - the INA data. - (inatype, i2caddr, rail name, bus voltage, shunt ohms, mux, True) + servo_ina files are python imports, and have a list of tuples with + the INA data. + (inatype, i2caddr, rail name, bus voltage, shunt ohms, mux, True) - Args: - basename: python import name (filename -.py) + Args: + basename: python import name (filename -.py) - Returns: - list of tuples as described above. - """ - ina_desc = __import__(basename) - return ina_desc.inas + Returns: + list of tuples as described above. + """ + ina_desc = __import__(basename) + return ina_desc.inas def main(argv): - if len(argv) != 2: - print("usage:") - print(" %s input.py" % argv[0]) - return + if len(argv) != 2: + print("usage:") + print(" %s input.py" % argv[0]) + return - inputf = argv[1] - basename = os.path.splitext(inputf)[0] - outputf = basename + '.board' - outputs = basename + '.scenario' + inputf = argv[1] + basename = os.path.splitext(inputf)[0] + outputf = basename + ".board" + outputs = basename + ".scenario" - print("Converting %s to %s, %s" % (inputf, outputf, outputs)) + print("Converting %s to %s, %s" % (inputf, outputf, outputs)) - inas = fetch_records(basename) + inas = fetch_records(basename) + boardfile = open(outputf, "w") + scenario = open(outputs, "w") - boardfile = open(outputf, 'w') - scenario = open(outputs, 'w') + boardfile.write("[\n") + scenario.write("[\n") + start = True - boardfile.write('[\n') - scenario.write('[\n') - start = True + for rec in inas: + if start: + start = False + else: + boardfile.write(",\n") + scenario.write(",\n") - for rec in inas: - if start: - start = False - else: - boardfile.write(',\n') - scenario.write(',\n') + record = ( + ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}' + % ( + rec[2], + rec[4], + rec[1] - 64, + ) + ) + boardfile.write(record) + scenario.write('"%s"' % rec[2]) - record = ' {"name": "%s", "rs": %f, "sweetberry": "A", "channel": %d}' % ( - rec[2], rec[4], rec[1] - 64) - boardfile.write(record) - scenario.write('"%s"' % rec[2]) + boardfile.write("\n") + boardfile.write("]") - boardfile.write('\n') - boardfile.write(']') + scenario.write("\n") + scenario.write("]") - scenario.write('\n') - scenario.write(']') if __name__ == "__main__": - main(sys.argv) + main(sys.argv) diff --git a/extra/usb_power/powerlog.py b/extra/usb_power/powerlog.py index 82cce3daed..13e41bd23a 100755 --- a/extra/usb_power/powerlog.py +++ b/extra/usb_power/powerlog.py @@ -1,11 +1,7 @@ #!/usr/bin/env python -# Copyright 2016 The Chromium OS Authors. All rights reserved. +# Copyright 2016 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """Program to fetch power logging data from a sweetberry device or other usb device that exports a USB power logging interface. @@ -14,9 +10,9 @@ # Note: This is a py2/3 compatible file. from __future__ import print_function + import argparse import array -from distutils import sysconfig import json import logging import os @@ -25,884 +21,1041 @@ import struct import sys import time import traceback +from distutils import sysconfig -import usb - -from stats_manager import StatsManager +import usb # pylint:disable=import-error +from stats_manager import StatsManager # pylint:disable=import-error # Directory where hdctools installs configuration files into. -LIB_DIR = os.path.join(sysconfig.get_python_lib(standard_lib=False), 'servo', - 'data') +LIB_DIR = os.path.join( + sysconfig.get_python_lib(standard_lib=False), "servo", "data" +) # Potential config file locations: current working directory, the same directory # as powerlog.py file or LIB_DIR. -CONFIG_LOCATIONS = [os.getcwd(), os.path.dirname(os.path.realpath(__file__)), - LIB_DIR] - -def logoutput(msg): - print(msg) - sys.stdout.flush() - -def process_filename(filename): - """Find the file path from the filename. - - If filename is already the complete path, return that directly. If filename is - just the short name, look for the file in the current working directory, in - the directory of the current .py file, and then in the directory installed by - hdctools. If the file is found, return the complete path of the file. - - Args: - filename: complete file path or short file name. - - Returns: - a complete file path. - - Raises: - IOError if filename does not exist. - """ - # Check if filename is absolute path. - if os.path.isabs(filename) and os.path.isfile(filename): - return filename - # Check if filename is relative to a known config location. - for dirname in CONFIG_LOCATIONS: - file_at_dir = os.path.join(dirname, filename) - if os.path.isfile(file_at_dir): - return file_at_dir - raise IOError('No such file or directory: \'%s\'' % filename) - - -class Spower(object): - """Power class to access devices on the bus. - - Usage: - bus = Spower() - - Instance Variables: - _dev: pyUSB device object - _read_ep: pyUSB read endpoint for this interface - _write_ep: pyUSB write endpoint for this interface - """ - - # INA interface type. - INA_POWER = 1 - INA_BUSV = 2 - INA_CURRENT = 3 - INA_SHUNTV = 4 - # INA_SUFFIX is used to differentiate multiple ina types for the same power - # rail. No suffix for when ina type is 0 (non-existent) and when ina type is 1 - # (power, no suffix for backward compatibility). - INA_SUFFIX = ['', '', '_busv', '_cur', '_shuntv'] - - # usb power commands - CMD_RESET = 0x0000 - CMD_STOP = 0x0001 - CMD_ADDINA = 0x0002 - CMD_START = 0x0003 - CMD_NEXT = 0x0004 - CMD_SETTIME = 0x0005 - - # Map between header channel number (0-47) - # and INA I2C bus/addr on sweetberry. - CHMAP = { - 0: (3, 0x40), - 1: (1, 0x40), - 2: (2, 0x40), - 3: (0, 0x40), - 4: (3, 0x41), - 5: (1, 0x41), - 6: (2, 0x41), - 7: (0, 0x41), - 8: (3, 0x42), - 9: (1, 0x42), - 10: (2, 0x42), - 11: (0, 0x42), - 12: (3, 0x43), - 13: (1, 0x43), - 14: (2, 0x43), - 15: (0, 0x43), - 16: (3, 0x44), - 17: (1, 0x44), - 18: (2, 0x44), - 19: (0, 0x44), - 20: (3, 0x45), - 21: (1, 0x45), - 22: (2, 0x45), - 23: (0, 0x45), - 24: (3, 0x46), - 25: (1, 0x46), - 26: (2, 0x46), - 27: (0, 0x46), - 28: (3, 0x47), - 29: (1, 0x47), - 30: (2, 0x47), - 31: (0, 0x47), - 32: (3, 0x48), - 33: (1, 0x48), - 34: (2, 0x48), - 35: (0, 0x48), - 36: (3, 0x49), - 37: (1, 0x49), - 38: (2, 0x49), - 39: (0, 0x49), - 40: (3, 0x4a), - 41: (1, 0x4a), - 42: (2, 0x4a), - 43: (0, 0x4a), - 44: (3, 0x4b), - 45: (1, 0x4b), - 46: (2, 0x4b), - 47: (0, 0x4b), - } - - def __init__(self, board, vendor=0x18d1, - product=0x5020, interface=1, serialname=None): - self._logger = logging.getLogger(__name__) - self._board = board - - # Find the stm32. - dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True) - dev_list = list(dev_g) - if dev_list is None: - raise Exception("Power", "USB device not found") - - # Check if we have multiple stm32s and we've specified the serial. - dev = None - if serialname: - for d in dev_list: - dev_serial = "PyUSB dioesn't have a stable interface" - try: - dev_serial = usb.util.get_string(d, 256, d.iSerialNumber) - except ValueError: - # Incompatible pyUsb version. - dev_serial = usb.util.get_string(d, d.iSerialNumber) - if dev_serial == serialname: - dev = d - break - if dev is None: - raise Exception("Power", "USB device(%s) not found" % serialname) - else: - try: - dev = dev_list[0] - except TypeError: - # Incompatible pyUsb version. - dev = dev_list.next() - - self._logger.debug("Found USB device: %04x:%04x", vendor, product) - self._dev = dev - - # Get an endpoint instance. - try: - dev.set_configuration() - except usb.USBError: - pass - cfg = dev.get_active_configuration() - - intf = usb.util.find_descriptor(cfg, custom_match=lambda i: \ - i.bInterfaceClass==255 and i.bInterfaceSubClass==0x54) - - self._intf = intf - self._logger.debug("InterfaceNumber: %s", intf.bInterfaceNumber) - - read_ep = usb.util.find_descriptor( - intf, - # match the first IN endpoint - custom_match = \ - lambda e: \ - usb.util.endpoint_direction(e.bEndpointAddress) == \ - usb.util.ENDPOINT_IN - ) - - self._read_ep = read_ep - self._logger.debug("Reader endpoint: 0x%x", read_ep.bEndpointAddress) - - write_ep = usb.util.find_descriptor( - intf, - # match the first OUT endpoint - custom_match = \ - lambda e: \ - usb.util.endpoint_direction(e.bEndpointAddress) == \ - usb.util.ENDPOINT_OUT - ) - - self._write_ep = write_ep - self._logger.debug("Writer endpoint: 0x%x", write_ep.bEndpointAddress) - - self.clear_ina_struct() - - self._logger.debug("Found power logging USB endpoint.") - - def clear_ina_struct(self): - """ Clear INA description struct.""" - self._inas = [] +CONFIG_LOCATIONS = [ + os.getcwd(), + os.path.dirname(os.path.realpath(__file__)), + LIB_DIR, +] - def append_ina_struct(self, name, rs, port, addr, - data=None, ina_type=INA_POWER): - """Add an INA descriptor into the list of active INAs. - Args: - name: Readable name of this channel. - rs: Sense resistor value in ohms, floating point. - port: I2C channel this INA is connected to. - addr: I2C addr of this INA. - data: Misc data for special handling, board specific. - ina_type: INA function to use, power, voltage, etc. - """ - ina = {} - ina['name'] = name - ina['rs'] = rs - ina['port'] = port - ina['addr'] = addr - ina['type'] = ina_type - # Calculate INA231 Calibration register - # (see INA231 spec p.15) - # CurrentLSB = uA per div = 80mV / (Rsh * 2^15) - # CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000) - ina['uAscale'] = 80000000. / (rs * 0x8000); - ina['uWscale'] = 25. * ina['uAscale']; - ina['mVscale'] = 1.25 - ina['uVscale'] = 2.5 - ina['data'] = data - self._inas.append(ina) - - def wr_command(self, write_list, read_count=1, wtimeout=100, rtimeout=1000): - """Write command to logger logic. - - This function writes byte command values list to stm, then reads - byte status. - - Args: - write_list: list of command byte values [0~255]. - read_count: number of status byte values to read. - - Interface: - write: [command, data ... ] - read: [status ] - - Returns: - bytes read, or None on failure. - """ - self._logger.debug("Spower.wr_command(write_list=[%s] (%d), read_count=%s)", - list(bytearray(write_list)), len(write_list), read_count) - - # Clean up args from python style to correct types. - write_length = 0 - if write_list: - write_length = len(write_list) - if not read_count: - read_count = 0 - - # Send command to stm32. - if write_list: - cmd = write_list - ret = self._write_ep.write(cmd, wtimeout) - - self._logger.debug("RET: %s ", ret) - - # Read back response if necessary. - if read_count: - bytesread = self._read_ep.read(512, rtimeout) - self._logger.debug("BYTES: [%s]", bytesread) - - if len(bytesread) != read_count: - pass - - self._logger.debug("STATUS: 0x%02x", int(bytesread[0])) - if read_count == 1: - return bytesread[0] - else: - return bytesread - - return None - - def clear(self): - """Clear pending reads on the stm32""" - try: - while True: - ret = self.wr_command(b"", read_count=512, rtimeout=100, wtimeout=50) - self._logger.debug("Try Clear: read %s", - "success" if ret == 0 else "failure") - except: - pass - - def send_reset(self): - """Reset the power interface on the stm32""" - cmd = struct.pack("<H", self.CMD_RESET) - ret = self.wr_command(cmd, rtimeout=50, wtimeout=50) - self._logger.debug("Command RESET: %s", - "success" if ret == 0 else "failure") - - def reset(self): - """Try resetting the USB interface until success. - - Use linear back off strategy when encounter the error with 10ms increment. - - Raises: - Exception on failure. - """ - max_reset_retry = 100 - for count in range(1, max_reset_retry + 1): - self.clear() - try: - self.send_reset() - return - except Exception as e: - self.clear() - self.clear() - self._logger.debug("TRY %d of %d: %s", count, max_reset_retry, e) - time.sleep(count * 0.01) - raise Exception("Power", "Failed to reset") - - def stop(self): - """Stop any active data acquisition.""" - cmd = struct.pack("<H", self.CMD_STOP) - ret = self.wr_command(cmd) - self._logger.debug("Command STOP: %s", - "success" if ret == 0 else "failure") - - def start(self, integration_us): - """Start data acquisition. - - Args: - integration_us: int, how many us between samples, and - how often the data block must be read. +def logoutput(msg): + print(msg) + sys.stdout.flush() - Returns: - actual sampling interval in ms. - """ - cmd = struct.pack("<HI", self.CMD_START, integration_us) - read = self.wr_command(cmd, read_count=5) - actual_us = 0 - if len(read) == 5: - ret, actual_us = struct.unpack("<BI", read) - self._logger.debug("Command START: %s %dus", - "success" if ret == 0 else "failure", actual_us) - else: - self._logger.debug("Command START: FAIL") - return actual_us +def process_filename(filename): + """Find the file path from the filename. - def add_ina_name(self, name_tuple): - """Add INA from board config. + If filename is already the complete path, return that directly. If filename is + just the short name, look for the file in the current working directory, in + the directory of the current .py file, and then in the directory installed by + hdctools. If the file is found, return the complete path of the file. Args: - name_tuple: name and type of power rail in board config. + filename: complete file path or short file name. Returns: - True if INA added, False if the INA is not on this board. + a complete file path. Raises: - Exception on unexpected failure. + IOError if filename does not exist. """ - name, ina_type = name_tuple - - for datum in self._brdcfg: - if datum["name"] == name: - rs = int(float(datum["rs"]) * 1000.) - board = datum["sweetberry"] - - if board == self._board: - if 'port' in datum and 'addr' in datum: - port = datum['port'] - addr = datum['addr'] - else: - channel = int(datum["channel"]) - port, addr = self.CHMAP[channel] - self.add_ina(port, ina_type, addr, 0, rs, data=datum) - return True - else: - return False - raise Exception("Power", "Failed to find INA %s" % name) + # Check if filename is absolute path. + if os.path.isabs(filename) and os.path.isfile(filename): + return filename + # Check if filename is relative to a known config location. + for dirname in CONFIG_LOCATIONS: + file_at_dir = os.path.join(dirname, filename) + if os.path.isfile(file_at_dir): + return file_at_dir + raise IOError("No such file or directory: '%s'" % filename) - def set_time(self, timestamp_us): - """Set sweetberry time to match host time. - Args: - timestamp_us: host timestmap in us. - """ - # 0x0005 , 8 byte timestamp - cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us) - ret = self.wr_command(cmd) - - self._logger.debug("Command SETTIME: %s", - "success" if ret == 0 else "failure") +class Spower(object): + """Power class to access devices on the bus. - def add_ina(self, bus, ina_type, addr, extra, resistance, data=None): - """Add an INA to the data acquisition list. + Usage: + bus = Spower() - Args: - bus: which i2c bus the INA is on. Same ordering as Si2c. - ina_type: Ina interface: INA_POWER/BUSV/etc. - addr: 7 bit i2c addr of this INA - extra: extra data for nonstandard configs. - resistance: int, shunt resistance in mOhm + Instance Variables: + _dev: pyUSB device object + _read_ep: pyUSB read endpoint for this interface + _write_ep: pyUSB write endpoint for this interface """ - # 0x0002, 1B: bus, 1B:INA type, 1B: INA addr, 1B: extra, 4B: Rs - cmd = struct.pack("<HBBBBI", self.CMD_ADDINA, - bus, ina_type, addr, extra, resistance) - ret = self.wr_command(cmd) - if ret == 0: - if data: - name = data['name'] - else: - name = "ina%d_%02x" % (bus, addr) - self.append_ina_struct(name, resistance, bus, addr, - data=data, ina_type=ina_type) - self._logger.debug("Command ADD_INA: %s", - "success" if ret == 0 else "failure") - - def report_header_size(self): - """Helper function to calculate power record header size.""" - result = 2 - timestamp = 8 - return result + timestamp - - def report_size(self, ina_count): - """Helper function to calculate full power record size.""" - record = 2 - - datasize = self.report_header_size() + ina_count * record - # Round to multiple of 4 bytes. - datasize = int(((datasize + 3) // 4) * 4) - - return datasize - - def read_line(self): - """Read a line of data from the setup INAs - Returns: - list of dicts of the values read by ina/type tuple, otherwise None. - [{ts:100, (vbat, power):450}, {ts:200, (vbat, power):440}] - """ - try: - expected_bytes = self.report_size(len(self._inas)) - cmd = struct.pack("<H", self.CMD_NEXT) - bytesread = self.wr_command(cmd, read_count=expected_bytes) - except usb.core.USBError as e: - self._logger.error("READ LINE FAILED %s", e) - return None - - if len(bytesread) == 1: - if bytesread[0] != 0x6: - self._logger.debug("READ LINE FAILED bytes: %d ret: %02x", - len(bytesread), bytesread[0]) - return None - - if len(bytesread) % expected_bytes != 0: - self._logger.debug("READ LINE WARNING: expected %d, got %d", - expected_bytes, len(bytesread)) - - packet_count = len(bytesread) // expected_bytes - - values = [] - for i in range(0, packet_count): - start = i * expected_bytes - end = (i + 1) * expected_bytes - record = self.interpret_line(bytesread[start:end]) - values.append(record) - - return values - - def interpret_line(self, data): - """Interpret a power record from INAs + # INA interface type. + INA_POWER = 1 + INA_BUSV = 2 + INA_CURRENT = 3 + INA_SHUNTV = 4 + # INA_SUFFIX is used to differentiate multiple ina types for the same power + # rail. No suffix for when ina type is 0 (non-existent) and when ina type is 1 + # (power, no suffix for backward compatibility). + INA_SUFFIX = ["", "", "_busv", "_cur", "_shuntv"] + + # usb power commands + CMD_RESET = 0x0000 + CMD_STOP = 0x0001 + CMD_ADDINA = 0x0002 + CMD_START = 0x0003 + CMD_NEXT = 0x0004 + CMD_SETTIME = 0x0005 + + # Map between header channel number (0-47) + # and INA I2C bus/addr on sweetberry. + CHMAP = { + 0: (3, 0x40), + 1: (1, 0x40), + 2: (2, 0x40), + 3: (0, 0x40), + 4: (3, 0x41), + 5: (1, 0x41), + 6: (2, 0x41), + 7: (0, 0x41), + 8: (3, 0x42), + 9: (1, 0x42), + 10: (2, 0x42), + 11: (0, 0x42), + 12: (3, 0x43), + 13: (1, 0x43), + 14: (2, 0x43), + 15: (0, 0x43), + 16: (3, 0x44), + 17: (1, 0x44), + 18: (2, 0x44), + 19: (0, 0x44), + 20: (3, 0x45), + 21: (1, 0x45), + 22: (2, 0x45), + 23: (0, 0x45), + 24: (3, 0x46), + 25: (1, 0x46), + 26: (2, 0x46), + 27: (0, 0x46), + 28: (3, 0x47), + 29: (1, 0x47), + 30: (2, 0x47), + 31: (0, 0x47), + 32: (3, 0x48), + 33: (1, 0x48), + 34: (2, 0x48), + 35: (0, 0x48), + 36: (3, 0x49), + 37: (1, 0x49), + 38: (2, 0x49), + 39: (0, 0x49), + 40: (3, 0x4A), + 41: (1, 0x4A), + 42: (2, 0x4A), + 43: (0, 0x4A), + 44: (3, 0x4B), + 45: (1, 0x4B), + 46: (2, 0x4B), + 47: (0, 0x4B), + } + + def __init__( + self, board, vendor=0x18D1, product=0x5020, interface=1, serialname=None + ): + self._logger = logging.getLogger(__name__) + self._board = board + + # Find the stm32. + dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True) + dev_list = list(dev_g) + if dev_list is None: + raise Exception("Power", "USB device not found") + + # Check if we have multiple stm32s and we've specified the serial. + dev = None + if serialname: + for d in dev_list: + dev_serial = "PyUSB dioesn't have a stable interface" + try: + dev_serial = usb.util.get_string(d, 256, d.iSerialNumber) + except ValueError: + # Incompatible pyUsb version. + dev_serial = usb.util.get_string(d, d.iSerialNumber) + if dev_serial == serialname: + dev = d + break + if dev is None: + raise Exception( + "Power", "USB device(%s) not found" % serialname + ) + else: + dev = dev_list[0] - Args: - data: one single record of bytes. + self._logger.debug("Found USB device: %04x:%04x", vendor, product) + self._dev = dev - Output: - stdout of the record in csv format. + # Get an endpoint instance. + try: + dev.set_configuration() + except usb.USBError: + pass + cfg = dev.get_active_configuration() + + intf = usb.util.find_descriptor( + cfg, + custom_match=lambda i: i.bInterfaceClass == 255 + and i.bInterfaceSubClass == 0x54, + ) + + self._intf = intf + self._logger.debug("InterfaceNumber: %s", intf.bInterfaceNumber) + + read_ep = usb.util.find_descriptor( + intf, + # match the first IN endpoint + custom_match=lambda e: usb.util.endpoint_direction( + e.bEndpointAddress + ) + == usb.util.ENDPOINT_IN, + ) + + self._read_ep = read_ep + self._logger.debug("Reader endpoint: 0x%x", read_ep.bEndpointAddress) + + write_ep = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match=lambda e: usb.util.endpoint_direction( + e.bEndpointAddress + ) + == usb.util.ENDPOINT_OUT, + ) + + self._write_ep = write_ep + self._logger.debug("Writer endpoint: 0x%x", write_ep.bEndpointAddress) + + self.clear_ina_struct() + + self._logger.debug("Found power logging USB endpoint.") + + def clear_ina_struct(self): + """Clear INA description struct.""" + self._inas = [] + + def append_ina_struct( + self, name, rs, port, addr, data=None, ina_type=INA_POWER + ): + """Add an INA descriptor into the list of active INAs. + + Args: + name: Readable name of this channel. + rs: Sense resistor value in ohms, floating point. + port: I2C channel this INA is connected to. + addr: I2C addr of this INA. + data: Misc data for special handling, board specific. + ina_type: INA function to use, power, voltage, etc. + """ + ina = {} + ina["name"] = name + ina["rs"] = rs + ina["port"] = port + ina["addr"] = addr + ina["type"] = ina_type + # Calculate INA231 Calibration register + # (see INA231 spec p.15) + # CurrentLSB = uA per div = 80mV / (Rsh * 2^15) + # CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000) + ina["uAscale"] = 80000000.0 / (rs * 0x8000) + ina["uWscale"] = 25.0 * ina["uAscale"] + ina["mVscale"] = 1.25 + ina["uVscale"] = 2.5 + ina["data"] = data + self._inas.append(ina) + + def wr_command(self, write_list, read_count=1, wtimeout=100, rtimeout=1000): + """Write command to logger logic. + + This function writes byte command values list to stm, then reads + byte status. + + Args: + write_list: list of command byte values [0~255]. + read_count: number of status byte values to read. + + Interface: + write: [command, data ... ] + read: [status ] + + Returns: + bytes read, or None on failure. + """ + self._logger.debug( + "Spower.wr_command(write_list=[%s] (%d), read_count=%s)", + list(bytearray(write_list)), + len(write_list), + read_count, + ) + + # Clean up args from python style to correct types. + write_length = 0 + if write_list: + write_length = len(write_list) + if not read_count: + read_count = 0 + + # Send command to stm32. + if write_list: + cmd = write_list + ret = self._write_ep.write(cmd, wtimeout) + + self._logger.debug("RET: %s ", ret) + + # Read back response if necessary. + if read_count: + bytesread = self._read_ep.read(512, rtimeout) + self._logger.debug("BYTES: [%s]", bytesread) + + if len(bytesread) != read_count: + pass + + self._logger.debug("STATUS: 0x%02x", int(bytesread[0])) + if read_count == 1: + return bytesread[0] + else: + return bytesread + + return None + + def clear(self): + """Clear pending reads on the stm32""" + try: + while True: + ret = self.wr_command( + b"", read_count=512, rtimeout=100, wtimeout=50 + ) + self._logger.debug( + "Try Clear: read %s", "success" if ret == 0 else "failure" + ) + except: + pass + + def send_reset(self): + """Reset the power interface on the stm32""" + cmd = struct.pack("<H", self.CMD_RESET) + ret = self.wr_command(cmd, rtimeout=50, wtimeout=50) + self._logger.debug( + "Command RESET: %s", "success" if ret == 0 else "failure" + ) + + def reset(self): + """Try resetting the USB interface until success. + + Use linear back off strategy when encounter the error with 10ms increment. + + Raises: + Exception on failure. + """ + max_reset_retry = 100 + for count in range(1, max_reset_retry + 1): + self.clear() + try: + self.send_reset() + return + except Exception as e: + self.clear() + self.clear() + self._logger.debug( + "TRY %d of %d: %s", count, max_reset_retry, e + ) + time.sleep(count * 0.01) + raise Exception("Power", "Failed to reset") + + def stop(self): + """Stop any active data acquisition.""" + cmd = struct.pack("<H", self.CMD_STOP) + ret = self.wr_command(cmd) + self._logger.debug( + "Command STOP: %s", "success" if ret == 0 else "failure" + ) + + def start(self, integration_us): + """Start data acquisition. + + Args: + integration_us: int, how many us between samples, and + how often the data block must be read. + + Returns: + actual sampling interval in ms. + """ + cmd = struct.pack("<HI", self.CMD_START, integration_us) + read = self.wr_command(cmd, read_count=5) + actual_us = 0 + if len(read) == 5: + ret, actual_us = struct.unpack("<BI", read) + self._logger.debug( + "Command START: %s %dus", + "success" if ret == 0 else "failure", + actual_us, + ) + else: + self._logger.debug("Command START: FAIL") + + return actual_us + + def add_ina_name(self, name_tuple): + """Add INA from board config. + + Args: + name_tuple: name and type of power rail in board config. + + Returns: + True if INA added, False if the INA is not on this board. + + Raises: + Exception on unexpected failure. + """ + name, ina_type = name_tuple + + for datum in self._brdcfg: + if datum["name"] == name: + rs = int(float(datum["rs"]) * 1000.0) + board = datum["sweetberry"] + + if board == self._board: + if "port" in datum and "addr" in datum: + port = datum["port"] + addr = datum["addr"] + else: + channel = int(datum["channel"]) + port, addr = self.CHMAP[channel] + self.add_ina(port, ina_type, addr, 0, rs, data=datum) + return True + else: + return False + raise Exception("Power", "Failed to find INA %s" % name) + + def set_time(self, timestamp_us): + """Set sweetberry time to match host time. + + Args: + timestamp_us: host timestmap in us. + """ + # 0x0005 , 8 byte timestamp + cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us) + ret = self.wr_command(cmd) + + self._logger.debug( + "Command SETTIME: %s", "success" if ret == 0 else "failure" + ) + + def add_ina(self, bus, ina_type, addr, extra, resistance, data=None): + """Add an INA to the data acquisition list. + + Args: + bus: which i2c bus the INA is on. Same ordering as Si2c. + ina_type: Ina interface: INA_POWER/BUSV/etc. + addr: 7 bit i2c addr of this INA + extra: extra data for nonstandard configs. + resistance: int, shunt resistance in mOhm + """ + # 0x0002, 1B: bus, 1B:INA type, 1B: INA addr, 1B: extra, 4B: Rs + cmd = struct.pack( + "<HBBBBI", self.CMD_ADDINA, bus, ina_type, addr, extra, resistance + ) + ret = self.wr_command(cmd) + if ret == 0: + if data: + name = data["name"] + else: + name = "ina%d_%02x" % (bus, addr) + self.append_ina_struct( + name, resistance, bus, addr, data=data, ina_type=ina_type + ) + self._logger.debug( + "Command ADD_INA: %s", "success" if ret == 0 else "failure" + ) + + def report_header_size(self): + """Helper function to calculate power record header size.""" + result = 2 + timestamp = 8 + return result + timestamp + + def report_size(self, ina_count): + """Helper function to calculate full power record size.""" + record = 2 + + datasize = self.report_header_size() + ina_count * record + # Round to multiple of 4 bytes. + datasize = int(((datasize + 3) // 4) * 4) + + return datasize + + def read_line(self): + """Read a line of data from the setup INAs + + Returns: + list of dicts of the values read by ina/type tuple, otherwise None. + [{ts:100, (vbat, power):450}, {ts:200, (vbat, power):440}] + """ + try: + expected_bytes = self.report_size(len(self._inas)) + cmd = struct.pack("<H", self.CMD_NEXT) + bytesread = self.wr_command(cmd, read_count=expected_bytes) + except usb.core.USBError as e: + self._logger.error("READ LINE FAILED %s", e) + return None + + if len(bytesread) == 1: + if bytesread[0] != 0x6: + self._logger.debug( + "READ LINE FAILED bytes: %d ret: %02x", + len(bytesread), + bytesread[0], + ) + return None + + if len(bytesread) % expected_bytes != 0: + self._logger.debug( + "READ LINE WARNING: expected %d, got %d", + expected_bytes, + len(bytesread), + ) + + packet_count = len(bytesread) // expected_bytes + + values = [] + for i in range(0, packet_count): + start = i * expected_bytes + end = (i + 1) * expected_bytes + record = self.interpret_line(bytesread[start:end]) + values.append(record) + + return values + + def interpret_line(self, data): + """Interpret a power record from INAs + + Args: + data: one single record of bytes. + + Output: + stdout of the record in csv format. + + Returns: + dict containing name, value of recorded data. + """ + status, size = struct.unpack("<BB", data[0:2]) + if len(data) != self.report_size(size): + self._logger.error( + "READ LINE FAILED st:%d size:%d expected:%d len:%d", + status, + size, + self.report_size(size), + len(data), + ) + else: + pass - Returns: - dict containing name, value of recorded data. - """ - status, size = struct.unpack("<BB", data[0:2]) - if len(data) != self.report_size(size): - self._logger.error("READ LINE FAILED st:%d size:%d expected:%d len:%d", - status, size, self.report_size(size), len(data)) - else: - pass + timestamp = struct.unpack("<Q", data[2:10])[0] + self._logger.debug( + "READ LINE: st:%d size:%d time:%dus", status, size, timestamp + ) + ftimestamp = float(timestamp) / 1000000.0 - timestamp = struct.unpack("<Q", data[2:10])[0] - self._logger.debug("READ LINE: st:%d size:%d time:%dus", status, size, - timestamp) - ftimestamp = float(timestamp) / 1000000. + record = {"ts": ftimestamp, "status": status, "berry": self._board} - record = {"ts": ftimestamp, "status": status, "berry":self._board} + for i in range(0, size): + idx = self.report_header_size() + 2 * i + name = self._inas[i]["name"] + name_tuple = (self._inas[i]["name"], self._inas[i]["type"]) - for i in range(0, size): - idx = self.report_header_size() + 2*i - name = self._inas[i]['name'] - name_tuple = (self._inas[i]['name'], self._inas[i]['type']) + raw_val = struct.unpack("<h", data[idx : idx + 2])[0] - raw_val = struct.unpack("<h", data[idx:idx+2])[0] + if self._inas[i]["type"] == Spower.INA_POWER: + val = raw_val * self._inas[i]["uWscale"] + elif self._inas[i]["type"] == Spower.INA_BUSV: + val = raw_val * self._inas[i]["mVscale"] + elif self._inas[i]["type"] == Spower.INA_CURRENT: + val = raw_val * self._inas[i]["uAscale"] + elif self._inas[i]["type"] == Spower.INA_SHUNTV: + val = raw_val * self._inas[i]["uVscale"] - if self._inas[i]['type'] == Spower.INA_POWER: - val = raw_val * self._inas[i]['uWscale'] - elif self._inas[i]['type'] == Spower.INA_BUSV: - val = raw_val * self._inas[i]['mVscale'] - elif self._inas[i]['type'] == Spower.INA_CURRENT: - val = raw_val * self._inas[i]['uAscale'] - elif self._inas[i]['type'] == Spower.INA_SHUNTV: - val = raw_val * self._inas[i]['uVscale'] + self._logger.debug( + "READ %d %s: %fs: 0x%04x %f", i, name, ftimestamp, raw_val, val + ) + record[name_tuple] = val - self._logger.debug("READ %d %s: %fs: 0x%04x %f", i, name, ftimestamp, - raw_val, val) - record[name_tuple] = val + return record - return record + def load_board(self, brdfile): + """Load a board config. - def load_board(self, brdfile): - """Load a board config. + Args: + brdfile: Filename of a json file decribing the INA wiring of this board. + """ + with open(process_filename(brdfile)) as data_file: + data = json.load(data_file) - Args: - brdfile: Filename of a json file decribing the INA wiring of this board. - """ - with open(process_filename(brdfile)) as data_file: - data = json.load(data_file) - - #TODO: validate this. - self._brdcfg = data; - self._logger.debug(pprint.pformat(data)) + # TODO: validate this. + self._brdcfg = data + self._logger.debug(pprint.pformat(data)) class powerlog(object): - """Power class to log aggregated power. - - Usage: - obj = powerlog() - - Instance Variables: - _data: a StatsManager object that records sweetberry readings and calculates - statistics. - _pwr[]: Spower objects for individual sweetberries. - """ + """Power class to log aggregated power. - def __init__(self, brdfile, cfgfile, serial_a=None, serial_b=None, - sync_date=False, use_ms=False, use_mW=False, print_stats=False, - stats_dir=None, stats_json_dir=None, print_raw_data=True, - raw_data_dir=None): - """Init the powerlog class and set the variables. - - Args: - brdfile: string name of json file containing board layout. - cfgfile: string name of json containing list of rails to read. - serial_a: serial number of sweetberry A. - serial_b: serial number of sweetberry B. - sync_date: report timestamps synced with host datetime. - use_ms: report timestamps in ms rather than us. - use_mW: report power as milliwatts, otherwise default to microwatts. - print_stats: print statistics for sweetberry readings at the end. - stats_dir: directory to save sweetberry readings statistics; if None then - do not save the statistics. - stats_json_dir: directory to save means of sweetberry readings in json - format; if None then do not save the statistics. - print_raw_data: print sweetberry readings raw data in real time, default - is to print. - raw_data_dir: directory to save sweetberry readings raw data; if None then - do not save the raw data. - """ - self._logger = logging.getLogger(__name__) - self._data = StatsManager() - self._pwr = {} - self._use_ms = use_ms - self._use_mW = use_mW - self._print_stats = print_stats - self._stats_dir = stats_dir - self._stats_json_dir = stats_json_dir - self._print_raw_data = print_raw_data - self._raw_data_dir = raw_data_dir - - if not serial_a and not serial_b: - self._pwr['A'] = Spower('A') - if serial_a: - self._pwr['A'] = Spower('A', serialname=serial_a) - if serial_b: - self._pwr['B'] = Spower('B', serialname=serial_b) - - with open(process_filename(cfgfile)) as data_file: - names = json.load(data_file) - self._names = self.process_scenario(names) - - for key in self._pwr: - self._pwr[key].load_board(brdfile) - self._pwr[key].reset() - - # Allocate the rails to the appropriate boards. - used_boards = [] - for name in self._names: - success = False - for key in self._pwr.keys(): - if self._pwr[key].add_ina_name(name): - success = True - if key not in used_boards: - used_boards.append(key) - if not success: - raise Exception("Failed to add %s (maybe missing " - "sweetberry, or bad board file?)" % name) - - # Evict unused boards. - for key in list(self._pwr.keys()): - if key not in used_boards: - self._pwr.pop(key) - - for key in self._pwr.keys(): - if sync_date: - self._pwr[key].set_time(time.time() * 1000000) - else: - self._pwr[key].set_time(0) - - def process_scenario(self, name_list): - """Return list of tuples indicating name and type. + Usage: + obj = powerlog() - Args: - json originated list of names, or [name, type] - Returns: - list of tuples of (name, type) defaulting to type "POWER" - Raises: exception, invalid INA type. + Instance Variables: + _data: a StatsManager object that records sweetberry readings and calculates + statistics. + _pwr[]: Spower objects for individual sweetberries. """ - names = [] - for entry in name_list: - if isinstance(entry, list): - name = entry[0] - if entry[1] == "POWER": - type = Spower.INA_POWER - elif entry[1] == "BUSV": - type = Spower.INA_BUSV - elif entry[1] == "CURRENT": - type = Spower.INA_CURRENT - elif entry[1] == "SHUNTV": - type = Spower.INA_SHUNTV - else: - raise Exception("Invalid INA type", "Type of %s [%s] not recognized," - " try one of POWER, BUSV, CURRENT" % (entry[0], entry[1])) - else: - name = entry - type = Spower.INA_POWER - names.append((name, type)) - return names + def __init__( + self, + brdfile, + cfgfile, + serial_a=None, + serial_b=None, + sync_date=False, + use_ms=False, + use_mW=False, + print_stats=False, + stats_dir=None, + stats_json_dir=None, + print_raw_data=True, + raw_data_dir=None, + ): + """Init the powerlog class and set the variables. + + Args: + brdfile: string name of json file containing board layout. + cfgfile: string name of json containing list of rails to read. + serial_a: serial number of sweetberry A. + serial_b: serial number of sweetberry B. + sync_date: report timestamps synced with host datetime. + use_ms: report timestamps in ms rather than us. + use_mW: report power as milliwatts, otherwise default to microwatts. + print_stats: print statistics for sweetberry readings at the end. + stats_dir: directory to save sweetberry readings statistics; if None then + do not save the statistics. + stats_json_dir: directory to save means of sweetberry readings in json + format; if None then do not save the statistics. + print_raw_data: print sweetberry readings raw data in real time, default + is to print. + raw_data_dir: directory to save sweetberry readings raw data; if None then + do not save the raw data. + """ + self._logger = logging.getLogger(__name__) + self._data = StatsManager() + self._pwr = {} + self._use_ms = use_ms + self._use_mW = use_mW + self._print_stats = print_stats + self._stats_dir = stats_dir + self._stats_json_dir = stats_json_dir + self._print_raw_data = print_raw_data + self._raw_data_dir = raw_data_dir + + if not serial_a and not serial_b: + self._pwr["A"] = Spower("A") + if serial_a: + self._pwr["A"] = Spower("A", serialname=serial_a) + if serial_b: + self._pwr["B"] = Spower("B", serialname=serial_b) + + with open(process_filename(cfgfile)) as data_file: + names = json.load(data_file) + self._names = self.process_scenario(names) - def start(self, integration_us_request, seconds, sync_speed=.8): - """Starts sampling. - - Args: - integration_us_request: requested interval between sample values. - seconds: time until exit, or None to run until cancel. - sync_speed: A usb request is sent every [.8] * integration_us. - """ - # We will get back the actual integration us. - # It should be the same for all devices. - integration_us = None - for key in self._pwr: - integration_us_new = self._pwr[key].start(integration_us_request) - if integration_us: - if integration_us != integration_us_new: - raise Exception("FAIL", - "Integration on A: %dus != integration on B %dus" % ( - integration_us, integration_us_new)) - integration_us = integration_us_new - - # CSV header - title = "ts:%dus" % integration_us - for name_tuple in self._names: - name, ina_type = name_tuple - - if ina_type == Spower.INA_POWER: - unit = "mW" if self._use_mW else "uW" - elif ina_type == Spower.INA_BUSV: - unit = "mV" - elif ina_type == Spower.INA_CURRENT: - unit = "uA" - elif ina_type == Spower.INA_SHUNTV: - unit = "uV" - - title += ", %s %s" % (name, unit) - name_type = name + Spower.INA_SUFFIX[ina_type] - self._data.SetUnit(name_type, unit) - title += ", status" - if self._print_raw_data: - logoutput(title) - - forever = False - if not seconds: - forever = True - end_time = time.time() + seconds - try: - pending_records = [] - while forever or end_time > time.time(): - if (integration_us > 5000): - time.sleep((integration_us / 1000000.) * sync_speed) for key in self._pwr: - records = self._pwr[key].read_line() - if not records: - continue - - for record in records: - pending_records.append(record) - - pending_records.sort(key=lambda r: r['ts']) - - aggregate_record = {"boards": set()} - for record in pending_records: - if record["berry"] not in aggregate_record["boards"]: - for rkey in record.keys(): - aggregate_record[rkey] = record[rkey] - aggregate_record["boards"].add(record["berry"]) - else: - self._logger.info("break %s, %s", record["berry"], - aggregate_record["boards"]) - break - - if aggregate_record["boards"] == set(self._pwr.keys()): - csv = "%f" % aggregate_record["ts"] - for name in self._names: - if name in aggregate_record: - multiplier = 0.001 if (self._use_mW and - name[1]==Spower.INA_POWER) else 1 - value = aggregate_record[name] * multiplier - csv += ", %.2f" % value - name_type = name[0] + Spower.INA_SUFFIX[name[1]] - self._data.AddSample(name_type, value) - else: - csv += ", " - csv += ", %d" % aggregate_record["status"] - if self._print_raw_data: - logoutput(csv) - - aggregate_record = {"boards": set()} - for r in range(0, len(self._pwr)): - pending_records.pop(0) - - except KeyboardInterrupt: - self._logger.info('\nCTRL+C caught.') - - finally: - for key in self._pwr: - self._pwr[key].stop() - self._data.CalculateStats() - if self._print_stats: - print(self._data.SummaryToString()) - save_dir = 'sweetberry%s' % time.time() - if self._stats_dir: - stats_dir = os.path.join(self._stats_dir, save_dir) - self._data.SaveSummary(stats_dir) - if self._stats_json_dir: - stats_json_dir = os.path.join(self._stats_json_dir, save_dir) - self._data.SaveSummaryJSON(stats_json_dir) - if self._raw_data_dir: - raw_data_dir = os.path.join(self._raw_data_dir, save_dir) - self._data.SaveRawData(raw_data_dir) + self._pwr[key].load_board(brdfile) + self._pwr[key].reset() + + # Allocate the rails to the appropriate boards. + used_boards = [] + for name in self._names: + success = False + for key in self._pwr.keys(): + if self._pwr[key].add_ina_name(name): + success = True + if key not in used_boards: + used_boards.append(key) + if not success: + raise Exception( + "Failed to add %s (maybe missing " + "sweetberry, or bad board file?)" % name + ) + + # Evict unused boards. + for key in list(self._pwr.keys()): + if key not in used_boards: + self._pwr.pop(key) + + for key in self._pwr.keys(): + if sync_date: + self._pwr[key].set_time(time.time() * 1000000) + else: + self._pwr[key].set_time(0) + + def process_scenario(self, name_list): + """Return list of tuples indicating name and type. + + Args: + json originated list of names, or [name, type] + Returns: + list of tuples of (name, type) defaulting to type "POWER" + Raises: exception, invalid INA type. + """ + names = [] + for entry in name_list: + if isinstance(entry, list): + name = entry[0] + if entry[1] == "POWER": + type = Spower.INA_POWER + elif entry[1] == "BUSV": + type = Spower.INA_BUSV + elif entry[1] == "CURRENT": + type = Spower.INA_CURRENT + elif entry[1] == "SHUNTV": + type = Spower.INA_SHUNTV + else: + raise Exception( + "Invalid INA type", + "Type of %s [%s] not recognized," + " try one of POWER, BUSV, CURRENT" + % (entry[0], entry[1]), + ) + else: + name = entry + type = Spower.INA_POWER + + names.append((name, type)) + return names + + def start(self, integration_us_request, seconds, sync_speed=0.8): + """Starts sampling. + + Args: + integration_us_request: requested interval between sample values. + seconds: time until exit, or None to run until cancel. + sync_speed: A usb request is sent every [.8] * integration_us. + """ + # We will get back the actual integration us. + # It should be the same for all devices. + integration_us = None + for key in self._pwr: + integration_us_new = self._pwr[key].start(integration_us_request) + if integration_us: + if integration_us != integration_us_new: + raise Exception( + "FAIL", + # pylint:disable=bad-string-format-type + "Integration on A: %dus != integration on B %dus" + % (integration_us, integration_us_new), + ) + integration_us = integration_us_new + + # CSV header + title = "ts:%dus" % integration_us + for name_tuple in self._names: + name, ina_type = name_tuple + + if ina_type == Spower.INA_POWER: + unit = "mW" if self._use_mW else "uW" + elif ina_type == Spower.INA_BUSV: + unit = "mV" + elif ina_type == Spower.INA_CURRENT: + unit = "uA" + elif ina_type == Spower.INA_SHUNTV: + unit = "uV" + + title += ", %s %s" % (name, unit) + name_type = name + Spower.INA_SUFFIX[ina_type] + self._data.SetUnit(name_type, unit) + title += ", status" + if self._print_raw_data: + logoutput(title) + + forever = False + if not seconds: + forever = True + end_time = time.time() + seconds + try: + pending_records = [] + while forever or end_time > time.time(): + if integration_us > 5000: + time.sleep((integration_us / 1000000.0) * sync_speed) + for key in self._pwr: + records = self._pwr[key].read_line() + if not records: + continue + + for record in records: + pending_records.append(record) + + pending_records.sort(key=lambda r: r["ts"]) + + aggregate_record = {"boards": set()} + for record in pending_records: + if record["berry"] not in aggregate_record["boards"]: + for rkey in record.keys(): + aggregate_record[rkey] = record[rkey] + aggregate_record["boards"].add(record["berry"]) + else: + self._logger.info( + "break %s, %s", + record["berry"], + aggregate_record["boards"], + ) + break + + if aggregate_record["boards"] == set(self._pwr.keys()): + csv = "%f" % aggregate_record["ts"] + for name in self._names: + if name in aggregate_record: + multiplier = ( + 0.001 + if ( + self._use_mW + and name[1] == Spower.INA_POWER + ) + else 1 + ) + value = aggregate_record[name] * multiplier + csv += ", %.2f" % value + name_type = name[0] + Spower.INA_SUFFIX[name[1]] + self._data.AddSample(name_type, value) + else: + csv += ", " + csv += ", %d" % aggregate_record["status"] + if self._print_raw_data: + logoutput(csv) + + aggregate_record = {"boards": set()} + for r in range(0, len(self._pwr)): + pending_records.pop(0) + + except KeyboardInterrupt: + self._logger.info("\nCTRL+C caught.") + + finally: + for key in self._pwr: + self._pwr[key].stop() + self._data.CalculateStats() + if self._print_stats: + print(self._data.SummaryToString()) + save_dir = "sweetberry%s" % time.time() + if self._stats_dir: + stats_dir = os.path.join(self._stats_dir, save_dir) + self._data.SaveSummary(stats_dir) + if self._stats_json_dir: + stats_json_dir = os.path.join(self._stats_json_dir, save_dir) + self._data.SaveSummaryJSON(stats_json_dir) + if self._raw_data_dir: + raw_data_dir = os.path.join(self._raw_data_dir, save_dir) + self._data.SaveRawData(raw_data_dir) def main(argv=None): - if argv is None: - argv = sys.argv[1:] - # Command line argument description. - parser = argparse.ArgumentParser( - description="Gather CSV data from sweetberry") - parser.add_argument('-b', '--board', type=str, - help="Board configuration file, eg. my.board", default="") - parser.add_argument('-c', '--config', type=str, - help="Rail config to monitor, eg my.scenario", default="") - parser.add_argument('-A', '--serial', type=str, - help="Serial number of sweetberry A", default="") - parser.add_argument('-B', '--serial_b', type=str, - help="Serial number of sweetberry B", default="") - parser.add_argument('-t', '--integration_us', type=int, - help="Target integration time for samples", default=100000) - parser.add_argument('-s', '--seconds', type=float, - help="Seconds to run capture", default=0.) - parser.add_argument('--date', default=False, - help="Sync logged timestamp to host date", action="store_true") - parser.add_argument('--ms', default=False, - help="Print timestamp as milliseconds", action="store_true") - parser.add_argument('--mW', default=False, - help="Print power as milliwatts, otherwise default to microwatts", - action="store_true") - parser.add_argument('--slow', default=False, - help="Intentionally overflow", action="store_true") - parser.add_argument('--print_stats', default=False, action="store_true", - help="Print statistics for sweetberry readings at the end") - parser.add_argument('--save_stats', type=str, nargs='?', - dest='stats_dir', metavar='STATS_DIR', - const=os.path.dirname(os.path.abspath(__file__)), default=None, - help="Save statistics for sweetberry readings to %(metavar)s if " - "%(metavar)s is specified, %(metavar)s will be created if it does " - "not exist; if %(metavar)s is not specified but the flag is set, " - "stats will be saved to where %(prog)s is located; if this flag is " - "not set, then do not save stats") - parser.add_argument('--save_stats_json', type=str, nargs='?', - dest='stats_json_dir', metavar='STATS_JSON_DIR', - const=os.path.dirname(os.path.abspath(__file__)), default=None, - help="Save means for sweetberry readings in json to %(metavar)s if " - "%(metavar)s is specified, %(metavar)s will be created if it does " - "not exist; if %(metavar)s is not specified but the flag is set, " - "stats will be saved to where %(prog)s is located; if this flag is " - "not set, then do not save stats") - parser.add_argument('--no_print_raw_data', - dest='print_raw_data', default=True, action="store_false", - help="Not print raw sweetberry readings at real time, default is to " - "print") - parser.add_argument('--save_raw_data', type=str, nargs='?', - dest='raw_data_dir', metavar='RAW_DATA_DIR', - const=os.path.dirname(os.path.abspath(__file__)), default=None, - help="Save raw data for sweetberry readings to %(metavar)s if " - "%(metavar)s is specified, %(metavar)s will be created if it does " - "not exist; if %(metavar)s is not specified but the flag is set, " - "raw data will be saved to where %(prog)s is located; if this flag " - "is not set, then do not save raw data") - parser.add_argument('-v', '--verbose', default=False, - help="Very chatty printout", action="store_true") - - args = parser.parse_args(argv) - - root_logger = logging.getLogger(__name__) - if args.verbose: - root_logger.setLevel(logging.DEBUG) - else: - root_logger.setLevel(logging.INFO) - - # if powerlog is used through main, log to sys.stdout - if __name__ == "__main__": - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) - root_logger.addHandler(stdout_handler) - - integration_us_request = args.integration_us - if not args.board: - raise Exception("Power", "No board file selected, see board.README") - if not args.config: - raise Exception("Power", "No config file selected, see board.README") - - brdfile = args.board - cfgfile = args.config - seconds = args.seconds - serial_a = args.serial - serial_b = args.serial_b - sync_date = args.date - use_ms = args.ms - use_mW = args.mW - print_stats = args.print_stats - stats_dir = args.stats_dir - stats_json_dir = args.stats_json_dir - print_raw_data = args.print_raw_data - raw_data_dir = args.raw_data_dir - - boards = [] - - sync_speed = .8 - if args.slow: - sync_speed = 1.2 - - # Set up logging interface. - powerlogger = powerlog(brdfile, cfgfile, serial_a=serial_a, serial_b=serial_b, - sync_date=sync_date, use_ms=use_ms, use_mW=use_mW, - print_stats=print_stats, stats_dir=stats_dir, - stats_json_dir=stats_json_dir, - print_raw_data=print_raw_data,raw_data_dir=raw_data_dir) - - # Start logging. - powerlogger.start(integration_us_request, seconds, sync_speed=sync_speed) + if argv is None: + argv = sys.argv[1:] + # Command line argument description. + parser = argparse.ArgumentParser( + description="Gather CSV data from sweetberry" + ) + parser.add_argument( + "-b", + "--board", + type=str, + help="Board configuration file, eg. my.board", + default="", + ) + parser.add_argument( + "-c", + "--config", + type=str, + help="Rail config to monitor, eg my.scenario", + default="", + ) + parser.add_argument( + "-A", + "--serial", + type=str, + help="Serial number of sweetberry A", + default="", + ) + parser.add_argument( + "-B", + "--serial_b", + type=str, + help="Serial number of sweetberry B", + default="", + ) + parser.add_argument( + "-t", + "--integration_us", + type=int, + help="Target integration time for samples", + default=100000, + ) + parser.add_argument( + "-s", + "--seconds", + type=float, + help="Seconds to run capture", + default=0.0, + ) + parser.add_argument( + "--date", + default=False, + help="Sync logged timestamp to host date", + action="store_true", + ) + parser.add_argument( + "--ms", + default=False, + help="Print timestamp as milliseconds", + action="store_true", + ) + parser.add_argument( + "--mW", + default=False, + help="Print power as milliwatts, otherwise default to microwatts", + action="store_true", + ) + parser.add_argument( + "--slow", + default=False, + help="Intentionally overflow", + action="store_true", + ) + parser.add_argument( + "--print_stats", + default=False, + action="store_true", + help="Print statistics for sweetberry readings at the end", + ) + parser.add_argument( + "--save_stats", + type=str, + nargs="?", + dest="stats_dir", + metavar="STATS_DIR", + const=os.path.dirname(os.path.abspath(__file__)), + default=None, + help="Save statistics for sweetberry readings to %(metavar)s if " + "%(metavar)s is specified, %(metavar)s will be created if it does " + "not exist; if %(metavar)s is not specified but the flag is set, " + "stats will be saved to where %(prog)s is located; if this flag is " + "not set, then do not save stats", + ) + parser.add_argument( + "--save_stats_json", + type=str, + nargs="?", + dest="stats_json_dir", + metavar="STATS_JSON_DIR", + const=os.path.dirname(os.path.abspath(__file__)), + default=None, + help="Save means for sweetberry readings in json to %(metavar)s if " + "%(metavar)s is specified, %(metavar)s will be created if it does " + "not exist; if %(metavar)s is not specified but the flag is set, " + "stats will be saved to where %(prog)s is located; if this flag is " + "not set, then do not save stats", + ) + parser.add_argument( + "--no_print_raw_data", + dest="print_raw_data", + default=True, + action="store_false", + help="Not print raw sweetberry readings at real time, default is to " + "print", + ) + parser.add_argument( + "--save_raw_data", + type=str, + nargs="?", + dest="raw_data_dir", + metavar="RAW_DATA_DIR", + const=os.path.dirname(os.path.abspath(__file__)), + default=None, + help="Save raw data for sweetberry readings to %(metavar)s if " + "%(metavar)s is specified, %(metavar)s will be created if it does " + "not exist; if %(metavar)s is not specified but the flag is set, " + "raw data will be saved to where %(prog)s is located; if this flag " + "is not set, then do not save raw data", + ) + parser.add_argument( + "-v", + "--verbose", + default=False, + help="Very chatty printout", + action="store_true", + ) + + args = parser.parse_args(argv) + + root_logger = logging.getLogger(__name__) + if args.verbose: + root_logger.setLevel(logging.DEBUG) + else: + root_logger.setLevel(logging.INFO) + + # if powerlog is used through main, log to sys.stdout + if __name__ == "__main__": + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter( + logging.Formatter("%(levelname)s: %(message)s") + ) + root_logger.addHandler(stdout_handler) + + integration_us_request = args.integration_us + if not args.board: + raise Exception("Power", "No board file selected, see board.README") + if not args.config: + raise Exception("Power", "No config file selected, see board.README") + + brdfile = args.board + cfgfile = args.config + seconds = args.seconds + serial_a = args.serial + serial_b = args.serial_b + sync_date = args.date + use_ms = args.ms + use_mW = args.mW + print_stats = args.print_stats + stats_dir = args.stats_dir + stats_json_dir = args.stats_json_dir + print_raw_data = args.print_raw_data + raw_data_dir = args.raw_data_dir + + boards = [] + + sync_speed = 0.8 + if args.slow: + sync_speed = 1.2 + + # Set up logging interface. + powerlogger = powerlog( + brdfile, + cfgfile, + serial_a=serial_a, + serial_b=serial_b, + sync_date=sync_date, + use_ms=use_ms, + use_mW=use_mW, + print_stats=print_stats, + stats_dir=stats_dir, + stats_json_dir=stats_json_dir, + print_raw_data=print_raw_data, + raw_data_dir=raw_data_dir, + ) + + # Start logging. + powerlogger.start(integration_us_request, seconds, sync_speed=sync_speed) if __name__ == "__main__": - main() + main() diff --git a/extra/usb_power/powerlog_unittest.py b/extra/usb_power/powerlog_unittest.py index 1d0718530e..62667e35b8 100644 --- a/extra/usb_power/powerlog_unittest.py +++ b/extra/usb_power/powerlog_unittest.py @@ -1,10 +1,6 @@ -# Copyright 2018 The Chromium OS Authors. All rights reserved. +# Copyright 2018 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """Unit tests for powerlog.""" @@ -13,42 +9,44 @@ import shutil import tempfile import unittest -import powerlog +from usb_power import powerlog + class TestPowerlog(unittest.TestCase): - """Test to verify powerlog util methods work as expected.""" - - def setUp(self): - """Set up data and create a temporary directory to save data and stats.""" - self.tempdir = tempfile.mkdtemp() - self.filename = 'testfile' - self.filepath = os.path.join(self.tempdir, self.filename) - with open(self.filepath, 'w') as f: - f.write('') - - def tearDown(self): - """Delete the temporary directory and its content.""" - shutil.rmtree(self.tempdir) - - def test_ProcessFilenameAbsoluteFilePath(self): - """Absolute file path is returned unchanged.""" - processed_fname = powerlog.process_filename(self.filepath) - self.assertEqual(self.filepath, processed_fname) - - def test_ProcessFilenameRelativeFilePath(self): - """Finds relative file path inside a known config location.""" - original = powerlog.CONFIG_LOCATIONS - powerlog.CONFIG_LOCATIONS = [self.tempdir] - processed_fname = powerlog.process_filename(self.filename) - try: - self.assertEqual(self.filepath, processed_fname) - finally: - powerlog.CONFIG_LOCATIONS = original - - def test_ProcessFilenameInvalid(self): - """IOError is raised when file cannot be found by any of the four ways.""" - with self.assertRaises(IOError): - powerlog.process_filename(self.filename) - -if __name__ == '__main__': - unittest.main() + """Test to verify powerlog util methods work as expected.""" + + def setUp(self): + """Set up data and create a temporary directory to save data and stats.""" + self.tempdir = tempfile.mkdtemp() + self.filename = "testfile" + self.filepath = os.path.join(self.tempdir, self.filename) + with open(self.filepath, "w") as f: + f.write("") + + def tearDown(self): + """Delete the temporary directory and its content.""" + shutil.rmtree(self.tempdir) + + def test_ProcessFilenameAbsoluteFilePath(self): + """Absolute file path is returned unchanged.""" + processed_fname = powerlog.process_filename(self.filepath) + self.assertEqual(self.filepath, processed_fname) + + def test_ProcessFilenameRelativeFilePath(self): + """Finds relative file path inside a known config location.""" + original = powerlog.CONFIG_LOCATIONS + powerlog.CONFIG_LOCATIONS = [self.tempdir] + processed_fname = powerlog.process_filename(self.filename) + try: + self.assertEqual(self.filepath, processed_fname) + finally: + powerlog.CONFIG_LOCATIONS = original + + def test_ProcessFilenameInvalid(self): + """IOError is raised when file cannot be found by any of the four ways.""" + with self.assertRaises(IOError): + powerlog.process_filename(self.filename) + + +if __name__ == "__main__": + unittest.main() diff --git a/extra/usb_power/stats_manager.py b/extra/usb_power/stats_manager.py index 0f8c3fcb15..2035138731 100644 --- a/extra/usb_power/stats_manager.py +++ b/extra/usb_power/stats_manager.py @@ -1,10 +1,6 @@ -# Copyright 2017 The Chromium OS Authors. All rights reserved. +# Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """Calculates statistics for lists of data and pretty print them.""" @@ -18,384 +14,409 @@ import logging import math import os -import numpy +import numpy # pylint:disable=import-error -STATS_PREFIX = '@@' -NAN_TAG = '*' -NAN_DESCRIPTION = '%s domains contain NaN samples' % NAN_TAG +STATS_PREFIX = "@@" +NAN_TAG = "*" +NAN_DESCRIPTION = "%s domains contain NaN samples" % NAN_TAG LONG_UNIT = { - '': 'N/A', - 'mW': 'milliwatt', - 'uW': 'microwatt', - 'mV': 'millivolt', - 'uA': 'microamp', - 'uV': 'microvolt' + "": "N/A", + "mW": "milliwatt", + "uW": "microwatt", + "mV": "millivolt", + "uA": "microamp", + "uV": "microvolt", } class StatsManagerError(Exception): - """Errors in StatsManager class.""" - pass + """Errors in StatsManager class.""" + pass -class StatsManager(object): - """Calculates statistics for several lists of data(float). - - Example usage: - - >>> stats = StatsManager(title='Title Banner') - >>> stats.AddSample(TIME_KEY, 50.0) - >>> stats.AddSample(TIME_KEY, 25.0) - >>> stats.AddSample(TIME_KEY, 40.0) - >>> stats.AddSample(TIME_KEY, 10.0) - >>> stats.AddSample(TIME_KEY, 10.0) - >>> stats.AddSample('frobnicate', 11.5) - >>> stats.AddSample('frobnicate', 9.0) - >>> stats.AddSample('foobar', 11111.0) - >>> stats.AddSample('foobar', 22222.0) - >>> stats.CalculateStats() - >>> print(stats.SummaryToString()) - ` @@-------------------------------------------------------------- - ` @@ Title Banner - @@-------------------------------------------------------------- - @@ NAME COUNT MEAN STDDEV MAX MIN - @@ sample_msecs 4 31.25 15.16 50.00 10.00 - @@ foobar 2 16666.50 5555.50 22222.00 11111.00 - @@ frobnicate 2 10.25 1.25 11.50 9.00 - ` @@-------------------------------------------------------------- - - Attributes: - _data: dict of list of readings for each domain(key) - _unit: dict of unit for each domain(key) - _smid: id supplied to differentiate data output to other StatsManager - instances that potentially save to the same directory - if smid all output files will be named |smid|_|fname| - _title: title to add as banner to formatted summary. If no title, - no banner gets added - _order: list of formatting order for domains. Domains not listed are - displayed in sorted order - _hide_domains: collection of domains to hide when formatting summary string - _accept_nan: flag to indicate if NaN samples are acceptable - _nan_domains: set to keep track of which domains contain NaN samples - _summary: dict of stats per domain (key): min, max, count, mean, stddev - _logger = StatsManager logger - - Note: - _summary is empty until CalculateStats() is called, and is updated when - CalculateStats() is called. - """ - - # pylint: disable=W0102 - def __init__(self, smid='', title='', order=[], hide_domains=[], - accept_nan=True): - """Initialize infrastructure for data and their statistics.""" - self._title = title - self._data = collections.defaultdict(list) - self._unit = collections.defaultdict(str) - self._smid = smid - self._order = order - self._hide_domains = hide_domains - self._accept_nan = accept_nan - self._nan_domains = set() - self._summary = {} - self._logger = logging.getLogger(type(self).__name__) - - def AddSample(self, domain, sample): - """Add one sample for a domain. - - Args: - domain: the domain name for the sample. - sample: one time sample for domain, expect type float. - - Raises: - StatsManagerError: if trying to add NaN and |_accept_nan| is false - """ - try: - sample = float(sample) - except ValueError: - # if we don't accept nan this will be caught below - self._logger.debug('sample %s for domain %s is not a number. Making NaN', - sample, domain) - sample = float('NaN') - if not self._accept_nan and math.isnan(sample): - raise StatsManagerError('accept_nan is false. Cannot add NaN sample.') - self._data[domain].append(sample) - if math.isnan(sample): - self._nan_domains.add(domain) - - def SetUnit(self, domain, unit): - """Set the unit for a domain. - - There can be only one unit for each domain. Setting unit twice will - overwrite the original unit. - - Args: - domain: the domain name. - unit: unit of the domain. - """ - if domain in self._unit: - self._logger.warning('overwriting the unit of %s, old unit is %s, new ' - 'unit is %s.', domain, self._unit[domain], unit) - self._unit[domain] = unit - def CalculateStats(self): - """Calculate stats for all domain-data pairs. - - First erases all previous stats, then calculate stats for all data. - """ - self._summary = {} - for domain, data in self._data.items(): - data_np = numpy.array(data) - self._summary[domain] = { - 'mean': numpy.nanmean(data_np), - 'min': numpy.nanmin(data_np), - 'max': numpy.nanmax(data_np), - 'stddev': numpy.nanstd(data_np), - 'count': data_np.size, - } - - @property - def DomainsToDisplay(self): - """List of domains that the manager will output in summaries.""" - return set(self._summary.keys()) - set(self._hide_domains) - - @property - def NanInOutput(self): - """Return whether any of the domains to display have NaN values.""" - return bool(len(set(self._nan_domains) & self.DomainsToDisplay)) - - def _SummaryTable(self): - """Generate the matrix to output as a summary. - - Returns: - A 2d matrix of headers and their data for each domain - e.g. - [[NAME, COUNT, MEAN, STDDEV, MAX, MIN], - [pp5000_mw, 10, 50, 0, 50, 50]] - """ - headers = ('NAME', 'COUNT', 'MEAN', 'STDDEV', 'MAX', 'MIN') - table = [headers] - # determine what domains to display & and the order - domains_to_display = self.DomainsToDisplay - display_order = [key for key in self._order if key in domains_to_display] - domains_to_display -= set(display_order) - display_order.extend(sorted(domains_to_display)) - for domain in display_order: - stats = self._summary[domain] - if not domain.endswith(self._unit[domain]): - domain = '%s_%s' % (domain, self._unit[domain]) - if domain in self._nan_domains: - domain = '%s%s' % (domain, NAN_TAG) - row = [domain] - row.append(str(stats['count'])) - for entry in headers[2:]: - row.append('%.2f' % stats[entry.lower()]) - table.append(row) - return table - - def SummaryToMarkdownString(self): - """Format the summary into a b/ compatible markdown table string. - - This requires this sort of output format - - | header1 | header2 | header3 | ... - | --------- | --------- | --------- | ... - | sample1h1 | sample1h2 | sample1h3 | ... - . - . - . - - Returns: - formatted summary string. - """ - # All we need to do before processing is insert a row of '-' between - # the headers, and the data - table = self._SummaryTable() - columns = len(table[0]) - # Using '-:' to allow the numbers to be right aligned - sep_row = ['-'] + ['-:'] * (columns - 1) - table.insert(1, sep_row) - text_rows = ['|'.join(r) for r in table] - body = '\n'.join(['|%s|' % r for r in text_rows]) - if self._title: - title_section = '**%s** \n\n' % self._title - body = title_section + body - # Make sure that the body is terminated with a newline. - return body + '\n' - - def SummaryToString(self, prefix=STATS_PREFIX): - """Format summary into a string, ready for pretty print. - - See class description for format example. - - Args: - prefix: start every row in summary string with prefix, for easier reading. - - Returns: - formatted summary string. - """ - table = self._SummaryTable() - max_col_width = [] - for col_idx in range(len(table[0])): - col_item_widths = [len(row[col_idx]) for row in table] - max_col_width.append(max(col_item_widths)) - - formatted_lines = [] - for row in table: - formatted_row = prefix + ' ' - for i in range(len(row)): - formatted_row += row[i].rjust(max_col_width[i] + 2) - formatted_lines.append(formatted_row) - if self.NanInOutput: - formatted_lines.append('%s %s' % (prefix, NAN_DESCRIPTION)) - - if self._title: - line_length = len(formatted_lines[0]) - dec_length = len(prefix) - # trim title to be at most as long as the longest line without the prefix - title = self._title[:(line_length - dec_length)] - # line is a seperator line consisting of ----- - line = '%s%s' % (prefix, '-' * (line_length - dec_length)) - # prepend the prefix to the centered title - padded_title = '%s%s' % (prefix, title.center(line_length)[dec_length:]) - formatted_lines = [line, padded_title, line] + formatted_lines + [line] - formatted_output = '\n'.join(formatted_lines) - return formatted_output - - def GetSummary(self): - """Getter for summary.""" - return self._summary - - def _MakeUniqueFName(self, fname): - """prepend |_smid| to fname & rotate fname to ensure uniqueness. - - Before saving a file through the StatsManager, make sure that the filename - is unique, first by prepending the smid if any and otherwise by appending - increasing integer suffixes until the filename is unique. - - If |smid| is defined /path/to/example/file.txt becomes - /path/to/example/{smid}_file.txt. - - The rotation works by changing /path/to/example/somename.txt to - /path/to/example/somename1.txt if the first one already exists on the - system. - - Note: this is not thread-safe. While it makes sense to use StatsManager - in a threaded data-collection, the data retrieval should happen in a - single threaded environment to ensure files don't get potentially clobbered. - - Args: - fname: filename to ensure uniqueness. - - Returns: - {smid_}fname{tag}.[b].ext - the smid portion gets prepended if |smid| is defined - the tag portion gets appended if necessary to ensure unique fname - """ - fdir = os.path.dirname(fname) - base, ext = os.path.splitext(os.path.basename(fname)) - if self._smid: - base = '%s_%s' % (self._smid, base) - unique_fname = os.path.join(fdir, '%s%s' % (base, ext)) - tag = 0 - while os.path.exists(unique_fname): - old_fname = unique_fname - unique_fname = os.path.join(fdir, '%s%d%s' % (base, tag, ext)) - self._logger.warning('Attempted to store stats information at %s, but ' - 'file already exists. Attempting to store at %s ' - 'now.', old_fname, unique_fname) - tag += 1 - return unique_fname - - def SaveSummary(self, directory, fname='summary.txt', prefix=STATS_PREFIX): - """Save summary to file. - - Args: - directory: directory to save the summary in. - fname: filename to save summary under. - prefix: start every row in summary string with prefix, for easier reading. - - Returns: - full path of summary save location - """ - summary_str = self.SummaryToString(prefix=prefix) + '\n' - return self._SaveSummary(summary_str, directory, fname) - - def SaveSummaryJSON(self, directory, fname='summary.json'): - """Save summary (only MEAN) into a JSON file. - - Args: - directory: directory to save the JSON summary in. - fname: filename to save summary under. - - Returns: - full path of summary save location - """ - data = {} - for domain in self._summary: - unit = LONG_UNIT.get(self._unit[domain], self._unit[domain]) - data_entry = {'mean': self._summary[domain]['mean'], 'unit': unit} - data[domain] = data_entry - summary_str = json.dumps(data, indent=2) - return self._SaveSummary(summary_str, directory, fname) - - def SaveSummaryMD(self, directory, fname='summary.md'): - """Save summary into a MD file to paste into b/. - - Args: - directory: directory to save the MD summary in. - fname: filename to save summary under. - - Returns: - full path of summary save location +class StatsManager(object): + """Calculates statistics for several lists of data(float). + + Example usage: + + >>> stats = StatsManager(title='Title Banner') + >>> stats.AddSample(TIME_KEY, 50.0) + >>> stats.AddSample(TIME_KEY, 25.0) + >>> stats.AddSample(TIME_KEY, 40.0) + >>> stats.AddSample(TIME_KEY, 10.0) + >>> stats.AddSample(TIME_KEY, 10.0) + >>> stats.AddSample('frobnicate', 11.5) + >>> stats.AddSample('frobnicate', 9.0) + >>> stats.AddSample('foobar', 11111.0) + >>> stats.AddSample('foobar', 22222.0) + >>> stats.CalculateStats() + >>> print(stats.SummaryToString()) + ` @@-------------------------------------------------------------- + ` @@ Title Banner + @@-------------------------------------------------------------- + @@ NAME COUNT MEAN STDDEV MAX MIN + @@ sample_msecs 4 31.25 15.16 50.00 10.00 + @@ foobar 2 16666.50 5555.50 22222.00 11111.00 + @@ frobnicate 2 10.25 1.25 11.50 9.00 + ` @@-------------------------------------------------------------- + + Attributes: + _data: dict of list of readings for each domain(key) + _unit: dict of unit for each domain(key) + _smid: id supplied to differentiate data output to other StatsManager + instances that potentially save to the same directory + if smid all output files will be named |smid|_|fname| + _title: title to add as banner to formatted summary. If no title, + no banner gets added + _order: list of formatting order for domains. Domains not listed are + displayed in sorted order + _hide_domains: collection of domains to hide when formatting summary string + _accept_nan: flag to indicate if NaN samples are acceptable + _nan_domains: set to keep track of which domains contain NaN samples + _summary: dict of stats per domain (key): min, max, count, mean, stddev + _logger = StatsManager logger + + Note: + _summary is empty until CalculateStats() is called, and is updated when + CalculateStats() is called. """ - summary_str = self.SummaryToMarkdownString() - return self._SaveSummary(summary_str, directory, fname) - def _SaveSummary(self, output_str, directory, fname): - """Wrote |output_str| to |fname|. - - Args: - output_str: formatted output string - directory: directory to save the summary in. - fname: filename to save summary under. - - Returns: - full path of summary save location - """ - if not os.path.exists(directory): - os.makedirs(directory) - fname = self._MakeUniqueFName(os.path.join(directory, fname)) - with open(fname, 'w') as f: - f.write(output_str) - return fname - - def GetRawData(self): - """Getter for all raw_data.""" - return self._data - - def SaveRawData(self, directory, dirname='raw_data'): - """Save raw data to file. - - Args: - directory: directory to create the raw data folder in. - dirname: folder in which raw data live. - - Returns: - list of full path of each domain's raw data save location - """ - if not os.path.exists(directory): - os.makedirs(directory) - dirname = os.path.join(directory, dirname) - if not os.path.exists(dirname): - os.makedirs(dirname) - fnames = [] - for domain, data in self._data.items(): - if not domain.endswith(self._unit[domain]): - domain = '%s_%s' % (domain, self._unit[domain]) - fname = self._MakeUniqueFName(os.path.join(dirname, '%s.txt' % domain)) - with open(fname, 'w') as f: - f.write('\n'.join('%.2f' % sample for sample in data) + '\n') - fnames.append(fname) - return fnames + # pylint: disable=W0102 + def __init__( + self, smid="", title="", order=[], hide_domains=[], accept_nan=True + ): + """Initialize infrastructure for data and their statistics.""" + self._title = title + self._data = collections.defaultdict(list) + self._unit = collections.defaultdict(str) + self._smid = smid + self._order = order + self._hide_domains = hide_domains + self._accept_nan = accept_nan + self._nan_domains = set() + self._summary = {} + self._logger = logging.getLogger(type(self).__name__) + + def AddSample(self, domain, sample): + """Add one sample for a domain. + + Args: + domain: the domain name for the sample. + sample: one time sample for domain, expect type float. + + Raises: + StatsManagerError: if trying to add NaN and |_accept_nan| is false + """ + try: + sample = float(sample) + except ValueError: + # if we don't accept nan this will be caught below + self._logger.debug( + "sample %s for domain %s is not a number. Making NaN", + sample, + domain, + ) + sample = float("NaN") + if not self._accept_nan and math.isnan(sample): + raise StatsManagerError( + "accept_nan is false. Cannot add NaN sample." + ) + self._data[domain].append(sample) + if math.isnan(sample): + self._nan_domains.add(domain) + + def SetUnit(self, domain, unit): + """Set the unit for a domain. + + There can be only one unit for each domain. Setting unit twice will + overwrite the original unit. + + Args: + domain: the domain name. + unit: unit of the domain. + """ + if domain in self._unit: + self._logger.warning( + "overwriting the unit of %s, old unit is %s, new " + "unit is %s.", + domain, + self._unit[domain], + unit, + ) + self._unit[domain] = unit + + def CalculateStats(self): + """Calculate stats for all domain-data pairs. + + First erases all previous stats, then calculate stats for all data. + """ + self._summary = {} + for domain, data in self._data.items(): + data_np = numpy.array(data) + self._summary[domain] = { + "mean": numpy.nanmean(data_np), + "min": numpy.nanmin(data_np), + "max": numpy.nanmax(data_np), + "stddev": numpy.nanstd(data_np), + "count": data_np.size, + } + + @property + def DomainsToDisplay(self): + """List of domains that the manager will output in summaries.""" + return set(self._summary.keys()) - set(self._hide_domains) + + @property + def NanInOutput(self): + """Return whether any of the domains to display have NaN values.""" + return bool(len(set(self._nan_domains) & self.DomainsToDisplay)) + + def _SummaryTable(self): + """Generate the matrix to output as a summary. + + Returns: + A 2d matrix of headers and their data for each domain + e.g. + [[NAME, COUNT, MEAN, STDDEV, MAX, MIN], + [pp5000_mw, 10, 50, 0, 50, 50]] + """ + headers = ("NAME", "COUNT", "MEAN", "STDDEV", "MAX", "MIN") + table = [headers] + # determine what domains to display & and the order + domains_to_display = self.DomainsToDisplay + display_order = [ + key for key in self._order if key in domains_to_display + ] + domains_to_display -= set(display_order) + display_order.extend(sorted(domains_to_display)) + for domain in display_order: + stats = self._summary[domain] + if not domain.endswith(self._unit[domain]): + domain = "%s_%s" % (domain, self._unit[domain]) + if domain in self._nan_domains: + domain = "%s%s" % (domain, NAN_TAG) + row = [domain] + row.append(str(stats["count"])) + for entry in headers[2:]: + row.append("%.2f" % stats[entry.lower()]) + table.append(row) + return table + + def SummaryToMarkdownString(self): + """Format the summary into a b/ compatible markdown table string. + + This requires this sort of output format + + | header1 | header2 | header3 | ... + | --------- | --------- | --------- | ... + | sample1h1 | sample1h2 | sample1h3 | ... + . + . + . + + Returns: + formatted summary string. + """ + # All we need to do before processing is insert a row of '-' between + # the headers, and the data + table = self._SummaryTable() + columns = len(table[0]) + # Using '-:' to allow the numbers to be right aligned + sep_row = ["-"] + ["-:"] * (columns - 1) + table.insert(1, sep_row) + text_rows = ["|".join(r) for r in table] + body = "\n".join(["|%s|" % r for r in text_rows]) + if self._title: + title_section = "**%s** \n\n" % self._title + body = title_section + body + # Make sure that the body is terminated with a newline. + return body + "\n" + + def SummaryToString(self, prefix=STATS_PREFIX): + """Format summary into a string, ready for pretty print. + + See class description for format example. + + Args: + prefix: start every row in summary string with prefix, for easier reading. + + Returns: + formatted summary string. + """ + table = self._SummaryTable() + max_col_width = [] + for col_idx in range(len(table[0])): + col_item_widths = [len(row[col_idx]) for row in table] + max_col_width.append(max(col_item_widths)) + + formatted_lines = [] + for row in table: + formatted_row = prefix + " " + for i in range(len(row)): + formatted_row += row[i].rjust(max_col_width[i] + 2) + formatted_lines.append(formatted_row) + if self.NanInOutput: + formatted_lines.append("%s %s" % (prefix, NAN_DESCRIPTION)) + + if self._title: + line_length = len(formatted_lines[0]) + dec_length = len(prefix) + # trim title to be at most as long as the longest line without the prefix + title = self._title[: (line_length - dec_length)] + # line is a seperator line consisting of ----- + line = "%s%s" % (prefix, "-" * (line_length - dec_length)) + # prepend the prefix to the centered title + padded_title = "%s%s" % ( + prefix, + title.center(line_length)[dec_length:], + ) + formatted_lines = ( + [line, padded_title, line] + formatted_lines + [line] + ) + formatted_output = "\n".join(formatted_lines) + return formatted_output + + def GetSummary(self): + """Getter for summary.""" + return self._summary + + def _MakeUniqueFName(self, fname): + """prepend |_smid| to fname & rotate fname to ensure uniqueness. + + Before saving a file through the StatsManager, make sure that the filename + is unique, first by prepending the smid if any and otherwise by appending + increasing integer suffixes until the filename is unique. + + If |smid| is defined /path/to/example/file.txt becomes + /path/to/example/{smid}_file.txt. + + The rotation works by changing /path/to/example/somename.txt to + /path/to/example/somename1.txt if the first one already exists on the + system. + + Note: this is not thread-safe. While it makes sense to use StatsManager + in a threaded data-collection, the data retrieval should happen in a + single threaded environment to ensure files don't get potentially clobbered. + + Args: + fname: filename to ensure uniqueness. + + Returns: + {smid_}fname{tag}.[b].ext + the smid portion gets prepended if |smid| is defined + the tag portion gets appended if necessary to ensure unique fname + """ + fdir = os.path.dirname(fname) + base, ext = os.path.splitext(os.path.basename(fname)) + if self._smid: + base = "%s_%s" % (self._smid, base) + unique_fname = os.path.join(fdir, "%s%s" % (base, ext)) + tag = 0 + while os.path.exists(unique_fname): + old_fname = unique_fname + unique_fname = os.path.join(fdir, "%s%d%s" % (base, tag, ext)) + self._logger.warning( + "Attempted to store stats information at %s, but " + "file already exists. Attempting to store at %s " + "now.", + old_fname, + unique_fname, + ) + tag += 1 + return unique_fname + + def SaveSummary(self, directory, fname="summary.txt", prefix=STATS_PREFIX): + """Save summary to file. + + Args: + directory: directory to save the summary in. + fname: filename to save summary under. + prefix: start every row in summary string with prefix, for easier reading. + + Returns: + full path of summary save location + """ + summary_str = self.SummaryToString(prefix=prefix) + "\n" + return self._SaveSummary(summary_str, directory, fname) + + def SaveSummaryJSON(self, directory, fname="summary.json"): + """Save summary (only MEAN) into a JSON file. + + Args: + directory: directory to save the JSON summary in. + fname: filename to save summary under. + + Returns: + full path of summary save location + """ + data = {} + for domain in self._summary: + unit = LONG_UNIT.get(self._unit[domain], self._unit[domain]) + data_entry = {"mean": self._summary[domain]["mean"], "unit": unit} + data[domain] = data_entry + summary_str = json.dumps(data, indent=2) + return self._SaveSummary(summary_str, directory, fname) + + def SaveSummaryMD(self, directory, fname="summary.md"): + """Save summary into a MD file to paste into b/. + + Args: + directory: directory to save the MD summary in. + fname: filename to save summary under. + + Returns: + full path of summary save location + """ + summary_str = self.SummaryToMarkdownString() + return self._SaveSummary(summary_str, directory, fname) + + def _SaveSummary(self, output_str, directory, fname): + """Wrote |output_str| to |fname|. + + Args: + output_str: formatted output string + directory: directory to save the summary in. + fname: filename to save summary under. + + Returns: + full path of summary save location + """ + if not os.path.exists(directory): + os.makedirs(directory) + fname = self._MakeUniqueFName(os.path.join(directory, fname)) + with open(fname, "w") as f: + f.write(output_str) + return fname + + def GetRawData(self): + """Getter for all raw_data.""" + return self._data + + def SaveRawData(self, directory, dirname="raw_data"): + """Save raw data to file. + + Args: + directory: directory to create the raw data folder in. + dirname: folder in which raw data live. + + Returns: + list of full path of each domain's raw data save location + """ + if not os.path.exists(directory): + os.makedirs(directory) + dirname = os.path.join(directory, dirname) + if not os.path.exists(dirname): + os.makedirs(dirname) + fnames = [] + for domain, data in self._data.items(): + if not domain.endswith(self._unit[domain]): + domain = "%s_%s" % (domain, self._unit[domain]) + fname = self._MakeUniqueFName( + os.path.join(dirname, "%s.txt" % domain) + ) + with open(fname, "w") as f: + f.write("\n".join("%.2f" % sample for sample in data) + "\n") + fnames.append(fname) + return fnames diff --git a/extra/usb_power/stats_manager_unittest.py b/extra/usb_power/stats_manager_unittest.py index beb9984b93..2bfaa5c83d 100644 --- a/extra/usb_power/stats_manager_unittest.py +++ b/extra/usb_power/stats_manager_unittest.py @@ -1,14 +1,11 @@ -# Copyright 2017 The Chromium OS Authors. All rights reserved. +# Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """Unit tests for StatsManager.""" from __future__ import print_function + import json import os import re @@ -16,300 +13,314 @@ import shutil import tempfile import unittest -import stats_manager +import stats_manager # pylint:disable=import-error class TestStatsManager(unittest.TestCase): - """Test to verify StatsManager methods work as expected. - - StatsManager should collect raw data, calculate their statistics, and save - them in expected format. - """ - - def _populate_mock_stats(self): - """Create a populated & processed StatsManager to test data retrieval.""" - self.data.AddSample('A', 99999.5) - self.data.AddSample('A', 100000.5) - self.data.SetUnit('A', 'uW') - self.data.SetUnit('A', 'mW') - self.data.AddSample('B', 1.5) - self.data.AddSample('B', 2.5) - self.data.AddSample('B', 3.5) - self.data.SetUnit('B', 'mV') - self.data.CalculateStats() - - def _populate_mock_stats_no_unit(self): - self.data.AddSample('B', 1000) - self.data.AddSample('A', 200) - self.data.SetUnit('A', 'blue') - - def setUp(self): - """Set up StatsManager and create a temporary directory for test.""" - self.tempdir = tempfile.mkdtemp() - self.data = stats_manager.StatsManager() - - def tearDown(self): - """Delete the temporary directory and its content.""" - shutil.rmtree(self.tempdir) - - def test_AddSample(self): - """Adding a sample successfully adds a sample.""" - self.data.AddSample('Test', 1000) - self.data.SetUnit('Test', 'test') - self.data.CalculateStats() - summary = self.data.GetSummary() - self.assertEqual(1, summary['Test']['count']) - - def test_AddSampleNoFloatAcceptNaN(self): - """Adding a non-number adds 'NaN' and doesn't raise an exception.""" - self.data.AddSample('Test', 10) - self.data.AddSample('Test', 20) - # adding a fake NaN: one that gets converted into NaN internally - self.data.AddSample('Test', 'fiesta') - # adding a real NaN - self.data.AddSample('Test', float('NaN')) - self.data.SetUnit('Test', 'test') - self.data.CalculateStats() - summary = self.data.GetSummary() - # assert that 'NaN' as added. - self.assertEqual(4, summary['Test']['count']) - # assert that mean, min, and max calculatings ignore the 'NaN' - self.assertEqual(10, summary['Test']['min']) - self.assertEqual(20, summary['Test']['max']) - self.assertEqual(15, summary['Test']['mean']) - - def test_AddSampleNoFloatNotAcceptNaN(self): - """Adding a non-number raises a StatsManagerError if accept_nan is False.""" - self.data = stats_manager.StatsManager(accept_nan=False) - with self.assertRaisesRegexp(stats_manager.StatsManagerError, - 'accept_nan is false. Cannot add NaN sample.'): - # adding a fake NaN: one that gets converted into NaN internally - self.data.AddSample('Test', 'fiesta') - with self.assertRaisesRegexp(stats_manager.StatsManagerError, - 'accept_nan is false. Cannot add NaN sample.'): - # adding a real NaN - self.data.AddSample('Test', float('NaN')) - - def test_AddSampleNoUnit(self): - """Not adding a unit does not cause an exception on CalculateStats().""" - self.data.AddSample('Test', 17) - self.data.CalculateStats() - summary = self.data.GetSummary() - self.assertEqual(1, summary['Test']['count']) - - def test_UnitSuffix(self): - """Unit gets appended as a suffix in the displayed summary.""" - self.data.AddSample('test', 250) - self.data.SetUnit('test', 'mw') - self.data.CalculateStats() - summary_str = self.data.SummaryToString() - self.assertIn('test_mw', summary_str) - - def test_DoubleUnitSuffix(self): - """If domain already ends in unit, verify that unit doesn't get appended.""" - self.data.AddSample('test_mw', 250) - self.data.SetUnit('test_mw', 'mw') - self.data.CalculateStats() - summary_str = self.data.SummaryToString() - self.assertIn('test_mw', summary_str) - self.assertNotIn('test_mw_mw', summary_str) - - def test_GetRawData(self): - """GetRawData returns exact same data as fed in.""" - self._populate_mock_stats() - raw_data = self.data.GetRawData() - self.assertListEqual([99999.5, 100000.5], raw_data['A']) - self.assertListEqual([1.5, 2.5, 3.5], raw_data['B']) - - def test_GetSummary(self): - """GetSummary returns expected stats about the data fed in.""" - self._populate_mock_stats() - summary = self.data.GetSummary() - self.assertEqual(2, summary['A']['count']) - self.assertAlmostEqual(100000.5, summary['A']['max']) - self.assertAlmostEqual(99999.5, summary['A']['min']) - self.assertAlmostEqual(0.5, summary['A']['stddev']) - self.assertAlmostEqual(100000.0, summary['A']['mean']) - self.assertEqual(3, summary['B']['count']) - self.assertAlmostEqual(3.5, summary['B']['max']) - self.assertAlmostEqual(1.5, summary['B']['min']) - self.assertAlmostEqual(0.81649658092773, summary['B']['stddev']) - self.assertAlmostEqual(2.5, summary['B']['mean']) - - def test_SaveRawData(self): - """SaveRawData stores same data as fed in.""" - self._populate_mock_stats() - dirname = 'unittest_raw_data' - expected_files = set(['A_mW.txt', 'B_mV.txt']) - fnames = self.data.SaveRawData(self.tempdir, dirname) - files_returned = set([os.path.basename(f) for f in fnames]) - # Assert that only the expected files got returned. - self.assertEqual(expected_files, files_returned) - # Assert that only the returned files are in the outdir. - self.assertEqual(set(os.listdir(os.path.join(self.tempdir, dirname))), - files_returned) - for fname in fnames: - with open(fname, 'r') as f: - if 'A_mW' in fname: - self.assertEqual('99999.50', f.readline().strip()) - self.assertEqual('100000.50', f.readline().strip()) - if 'B_mV' in fname: - self.assertEqual('1.50', f.readline().strip()) - self.assertEqual('2.50', f.readline().strip()) - self.assertEqual('3.50', f.readline().strip()) - - def test_SaveRawDataNoUnit(self): - """SaveRawData appends no unit suffix if the unit is not specified.""" - self._populate_mock_stats_no_unit() - self.data.CalculateStats() - outdir = 'unittest_raw_data' - files = self.data.SaveRawData(self.tempdir, outdir) - files = [os.path.basename(f) for f in files] - # Verify nothing gets appended to domain for filename if no unit exists. - self.assertIn('B.txt', files) - - def test_SaveRawDataSMID(self): - """SaveRawData uses the smid when creating output filename.""" - identifier = 'ec' - self.data = stats_manager.StatsManager(smid=identifier) - self._populate_mock_stats() - files = self.data.SaveRawData(self.tempdir) - for fname in files: - self.assertTrue(os.path.basename(fname).startswith(identifier)) - - def test_SummaryToStringNaNHelp(self): - """NaN containing row gets tagged with *, help banner gets added.""" - help_banner_exp = '%s %s' % (stats_manager.STATS_PREFIX, - stats_manager.NAN_DESCRIPTION) - nan_domain = 'A-domain' - nan_domain_exp = '%s%s' % (nan_domain, stats_manager.NAN_TAG) - # NaN helper banner is added when a NaN domain is found & domain gets tagged - data = stats_manager.StatsManager() - data.AddSample(nan_domain, float('NaN')) - data.AddSample(nan_domain, 17) - data.AddSample('B-domain', 17) - data.CalculateStats() - summarystr = data.SummaryToString() - self.assertIn(help_banner_exp, summarystr) - self.assertIn(nan_domain_exp, summarystr) - # NaN helper banner is not added when no NaN domain output, no tagging - data = stats_manager.StatsManager() - # nan_domain in this scenario does not contain any NaN - data.AddSample(nan_domain, 19) - data.AddSample('B-domain', 17) - data.CalculateStats() - summarystr = data.SummaryToString() - self.assertNotIn(help_banner_exp, summarystr) - self.assertNotIn(nan_domain_exp, summarystr) - - def test_SummaryToStringTitle(self): - """Title shows up in SummaryToString if title specified.""" - title = 'titulo' - data = stats_manager.StatsManager(title=title) - self._populate_mock_stats() - summary_str = data.SummaryToString() - self.assertIn(title, summary_str) - - def test_SummaryToStringHideDomains(self): - """Keys indicated in hide_domains are not printed in the summary.""" - data = stats_manager.StatsManager(hide_domains=['A-domain']) - data.AddSample('A-domain', 17) - data.AddSample('B-domain', 17) - data.CalculateStats() - summary_str = data.SummaryToString() - self.assertIn('B-domain', summary_str) - self.assertNotIn('A-domain', summary_str) - - def test_SummaryToStringOrder(self): - """Order passed into StatsManager is honoured when formatting summary.""" - # StatsManager that should print D & B first, and the subsequent elements - # are sorted. - d_b_a_c_regexp = re.compile('D-domain.*B-domain.*A-domain.*C-domain', - re.DOTALL) - data = stats_manager.StatsManager(order=['D-domain', 'B-domain']) - data.AddSample('A-domain', 17) - data.AddSample('B-domain', 17) - data.AddSample('C-domain', 17) - data.AddSample('D-domain', 17) - data.CalculateStats() - summary_str = data.SummaryToString() - self.assertRegexpMatches(summary_str, d_b_a_c_regexp) - - def test_MakeUniqueFName(self): - data = stats_manager.StatsManager() - testfile = os.path.join(self.tempdir, 'testfile.txt') - with open(testfile, 'w') as f: - f.write('') - expected_fname = os.path.join(self.tempdir, 'testfile0.txt') - self.assertEqual(expected_fname, data._MakeUniqueFName(testfile)) - - def test_SaveSummary(self): - """SaveSummary properly dumps the summary into a file.""" - self._populate_mock_stats() - fname = 'unittest_summary.txt' - expected_fname = os.path.join(self.tempdir, fname) - fname = self.data.SaveSummary(self.tempdir, fname) - # Assert the reported fname is the same as the expected fname - self.assertEqual(expected_fname, fname) - # Assert only the reported fname is output (in the tempdir) - self.assertEqual(set([os.path.basename(fname)]), - set(os.listdir(self.tempdir))) - with open(fname, 'r') as f: - self.assertEqual( - '@@ NAME COUNT MEAN STDDEV MAX MIN\n', - f.readline()) - self.assertEqual( - '@@ A_mW 2 100000.00 0.50 100000.50 99999.50\n', - f.readline()) - self.assertEqual( - '@@ B_mV 3 2.50 0.82 3.50 1.50\n', - f.readline()) - - def test_SaveSummarySMID(self): - """SaveSummary uses the smid when creating output filename.""" - identifier = 'ec' - self.data = stats_manager.StatsManager(smid=identifier) - self._populate_mock_stats() - fname = os.path.basename(self.data.SaveSummary(self.tempdir)) - self.assertTrue(fname.startswith(identifier)) - - def test_SaveSummaryJSON(self): - """SaveSummaryJSON saves the added data properly in JSON format.""" - self._populate_mock_stats() - fname = 'unittest_summary.json' - expected_fname = os.path.join(self.tempdir, fname) - fname = self.data.SaveSummaryJSON(self.tempdir, fname) - # Assert the reported fname is the same as the expected fname - self.assertEqual(expected_fname, fname) - # Assert only the reported fname is output (in the tempdir) - self.assertEqual(set([os.path.basename(fname)]), - set(os.listdir(self.tempdir))) - with open(fname, 'r') as f: - summary = json.load(f) - self.assertAlmostEqual(100000.0, summary['A']['mean']) - self.assertEqual('milliwatt', summary['A']['unit']) - self.assertAlmostEqual(2.5, summary['B']['mean']) - self.assertEqual('millivolt', summary['B']['unit']) - - def test_SaveSummaryJSONSMID(self): - """SaveSummaryJSON uses the smid when creating output filename.""" - identifier = 'ec' - self.data = stats_manager.StatsManager(smid=identifier) - self._populate_mock_stats() - fname = os.path.basename(self.data.SaveSummaryJSON(self.tempdir)) - self.assertTrue(fname.startswith(identifier)) - - def test_SaveSummaryJSONNoUnit(self): - """SaveSummaryJSON marks unknown units properly as N/A.""" - self._populate_mock_stats_no_unit() - self.data.CalculateStats() - fname = 'unittest_summary.json' - fname = self.data.SaveSummaryJSON(self.tempdir, fname) - with open(fname, 'r') as f: - summary = json.load(f) - self.assertEqual('blue', summary['A']['unit']) - # if no unit is specified, JSON should save 'N/A' as the unit. - self.assertEqual('N/A', summary['B']['unit']) - -if __name__ == '__main__': - unittest.main() + """Test to verify StatsManager methods work as expected. + + StatsManager should collect raw data, calculate their statistics, and save + them in expected format. + """ + + def _populate_mock_stats(self): + """Create a populated & processed StatsManager to test data retrieval.""" + self.data.AddSample("A", 99999.5) + self.data.AddSample("A", 100000.5) + self.data.SetUnit("A", "uW") + self.data.SetUnit("A", "mW") + self.data.AddSample("B", 1.5) + self.data.AddSample("B", 2.5) + self.data.AddSample("B", 3.5) + self.data.SetUnit("B", "mV") + self.data.CalculateStats() + + def _populate_mock_stats_no_unit(self): + self.data.AddSample("B", 1000) + self.data.AddSample("A", 200) + self.data.SetUnit("A", "blue") + + def setUp(self): + """Set up StatsManager and create a temporary directory for test.""" + self.tempdir = tempfile.mkdtemp() + self.data = stats_manager.StatsManager() + + def tearDown(self): + """Delete the temporary directory and its content.""" + shutil.rmtree(self.tempdir) + + def test_AddSample(self): + """Adding a sample successfully adds a sample.""" + self.data.AddSample("Test", 1000) + self.data.SetUnit("Test", "test") + self.data.CalculateStats() + summary = self.data.GetSummary() + self.assertEqual(1, summary["Test"]["count"]) + + def test_AddSampleNoFloatAcceptNaN(self): + """Adding a non-number adds 'NaN' and doesn't raise an exception.""" + self.data.AddSample("Test", 10) + self.data.AddSample("Test", 20) + # adding a fake NaN: one that gets converted into NaN internally + self.data.AddSample("Test", "fiesta") + # adding a real NaN + self.data.AddSample("Test", float("NaN")) + self.data.SetUnit("Test", "test") + self.data.CalculateStats() + summary = self.data.GetSummary() + # assert that 'NaN' as added. + self.assertEqual(4, summary["Test"]["count"]) + # assert that mean, min, and max calculatings ignore the 'NaN' + self.assertEqual(10, summary["Test"]["min"]) + self.assertEqual(20, summary["Test"]["max"]) + self.assertEqual(15, summary["Test"]["mean"]) + + def test_AddSampleNoFloatNotAcceptNaN(self): + """Adding a non-number raises a StatsManagerError if accept_nan is False.""" + self.data = stats_manager.StatsManager(accept_nan=False) + with self.assertRaisesRegexp( + stats_manager.StatsManagerError, + "accept_nan is false. Cannot add NaN sample.", + ): + # adding a fake NaN: one that gets converted into NaN internally + self.data.AddSample("Test", "fiesta") + with self.assertRaisesRegexp( + stats_manager.StatsManagerError, + "accept_nan is false. Cannot add NaN sample.", + ): + # adding a real NaN + self.data.AddSample("Test", float("NaN")) + + def test_AddSampleNoUnit(self): + """Not adding a unit does not cause an exception on CalculateStats().""" + self.data.AddSample("Test", 17) + self.data.CalculateStats() + summary = self.data.GetSummary() + self.assertEqual(1, summary["Test"]["count"]) + + def test_UnitSuffix(self): + """Unit gets appended as a suffix in the displayed summary.""" + self.data.AddSample("test", 250) + self.data.SetUnit("test", "mw") + self.data.CalculateStats() + summary_str = self.data.SummaryToString() + self.assertIn("test_mw", summary_str) + + def test_DoubleUnitSuffix(self): + """If domain already ends in unit, verify that unit doesn't get appended.""" + self.data.AddSample("test_mw", 250) + self.data.SetUnit("test_mw", "mw") + self.data.CalculateStats() + summary_str = self.data.SummaryToString() + self.assertIn("test_mw", summary_str) + self.assertNotIn("test_mw_mw", summary_str) + + def test_GetRawData(self): + """GetRawData returns exact same data as fed in.""" + self._populate_mock_stats() + raw_data = self.data.GetRawData() + self.assertListEqual([99999.5, 100000.5], raw_data["A"]) + self.assertListEqual([1.5, 2.5, 3.5], raw_data["B"]) + + def test_GetSummary(self): + """GetSummary returns expected stats about the data fed in.""" + self._populate_mock_stats() + summary = self.data.GetSummary() + self.assertEqual(2, summary["A"]["count"]) + self.assertAlmostEqual(100000.5, summary["A"]["max"]) + self.assertAlmostEqual(99999.5, summary["A"]["min"]) + self.assertAlmostEqual(0.5, summary["A"]["stddev"]) + self.assertAlmostEqual(100000.0, summary["A"]["mean"]) + self.assertEqual(3, summary["B"]["count"]) + self.assertAlmostEqual(3.5, summary["B"]["max"]) + self.assertAlmostEqual(1.5, summary["B"]["min"]) + self.assertAlmostEqual(0.81649658092773, summary["B"]["stddev"]) + self.assertAlmostEqual(2.5, summary["B"]["mean"]) + + def test_SaveRawData(self): + """SaveRawData stores same data as fed in.""" + self._populate_mock_stats() + dirname = "unittest_raw_data" + expected_files = set(["A_mW.txt", "B_mV.txt"]) + fnames = self.data.SaveRawData(self.tempdir, dirname) + files_returned = set([os.path.basename(f) for f in fnames]) + # Assert that only the expected files got returned. + self.assertEqual(expected_files, files_returned) + # Assert that only the returned files are in the outdir. + self.assertEqual( + set(os.listdir(os.path.join(self.tempdir, dirname))), files_returned + ) + for fname in fnames: + with open(fname, "r") as f: + if "A_mW" in fname: + self.assertEqual("99999.50", f.readline().strip()) + self.assertEqual("100000.50", f.readline().strip()) + if "B_mV" in fname: + self.assertEqual("1.50", f.readline().strip()) + self.assertEqual("2.50", f.readline().strip()) + self.assertEqual("3.50", f.readline().strip()) + + def test_SaveRawDataNoUnit(self): + """SaveRawData appends no unit suffix if the unit is not specified.""" + self._populate_mock_stats_no_unit() + self.data.CalculateStats() + outdir = "unittest_raw_data" + files = self.data.SaveRawData(self.tempdir, outdir) + files = [os.path.basename(f) for f in files] + # Verify nothing gets appended to domain for filename if no unit exists. + self.assertIn("B.txt", files) + + def test_SaveRawDataSMID(self): + """SaveRawData uses the smid when creating output filename.""" + identifier = "ec" + self.data = stats_manager.StatsManager(smid=identifier) + self._populate_mock_stats() + files = self.data.SaveRawData(self.tempdir) + for fname in files: + self.assertTrue(os.path.basename(fname).startswith(identifier)) + + def test_SummaryToStringNaNHelp(self): + """NaN containing row gets tagged with *, help banner gets added.""" + help_banner_exp = "%s %s" % ( + stats_manager.STATS_PREFIX, + stats_manager.NAN_DESCRIPTION, + ) + nan_domain = "A-domain" + nan_domain_exp = "%s%s" % (nan_domain, stats_manager.NAN_TAG) + # NaN helper banner is added when a NaN domain is found & domain gets tagged + data = stats_manager.StatsManager() + data.AddSample(nan_domain, float("NaN")) + data.AddSample(nan_domain, 17) + data.AddSample("B-domain", 17) + data.CalculateStats() + summarystr = data.SummaryToString() + self.assertIn(help_banner_exp, summarystr) + self.assertIn(nan_domain_exp, summarystr) + # NaN helper banner is not added when no NaN domain output, no tagging + data = stats_manager.StatsManager() + # nan_domain in this scenario does not contain any NaN + data.AddSample(nan_domain, 19) + data.AddSample("B-domain", 17) + data.CalculateStats() + summarystr = data.SummaryToString() + self.assertNotIn(help_banner_exp, summarystr) + self.assertNotIn(nan_domain_exp, summarystr) + + def test_SummaryToStringTitle(self): + """Title shows up in SummaryToString if title specified.""" + title = "titulo" + data = stats_manager.StatsManager(title=title) + self._populate_mock_stats() + summary_str = data.SummaryToString() + self.assertIn(title, summary_str) + + def test_SummaryToStringHideDomains(self): + """Keys indicated in hide_domains are not printed in the summary.""" + data = stats_manager.StatsManager(hide_domains=["A-domain"]) + data.AddSample("A-domain", 17) + data.AddSample("B-domain", 17) + data.CalculateStats() + summary_str = data.SummaryToString() + self.assertIn("B-domain", summary_str) + self.assertNotIn("A-domain", summary_str) + + def test_SummaryToStringOrder(self): + """Order passed into StatsManager is honoured when formatting summary.""" + # StatsManager that should print D & B first, and the subsequent elements + # are sorted. + d_b_a_c_regexp = re.compile( + "D-domain.*B-domain.*A-domain.*C-domain", re.DOTALL + ) + data = stats_manager.StatsManager(order=["D-domain", "B-domain"]) + data.AddSample("A-domain", 17) + data.AddSample("B-domain", 17) + data.AddSample("C-domain", 17) + data.AddSample("D-domain", 17) + data.CalculateStats() + summary_str = data.SummaryToString() + self.assertRegexpMatches(summary_str, d_b_a_c_regexp) + + def test_MakeUniqueFName(self): + data = stats_manager.StatsManager() + testfile = os.path.join(self.tempdir, "testfile.txt") + with open(testfile, "w") as f: + f.write("") + expected_fname = os.path.join(self.tempdir, "testfile0.txt") + self.assertEqual(expected_fname, data._MakeUniqueFName(testfile)) + + def test_SaveSummary(self): + """SaveSummary properly dumps the summary into a file.""" + self._populate_mock_stats() + fname = "unittest_summary.txt" + expected_fname = os.path.join(self.tempdir, fname) + fname = self.data.SaveSummary(self.tempdir, fname) + # Assert the reported fname is the same as the expected fname + self.assertEqual(expected_fname, fname) + # Assert only the reported fname is output (in the tempdir) + self.assertEqual( + set([os.path.basename(fname)]), set(os.listdir(self.tempdir)) + ) + with open(fname, "r") as f: + self.assertEqual( + "@@ NAME COUNT MEAN STDDEV MAX MIN\n", + f.readline(), + ) + self.assertEqual( + "@@ A_mW 2 100000.00 0.50 100000.50 99999.50\n", + f.readline(), + ) + self.assertEqual( + "@@ B_mV 3 2.50 0.82 3.50 1.50\n", + f.readline(), + ) + + def test_SaveSummarySMID(self): + """SaveSummary uses the smid when creating output filename.""" + identifier = "ec" + self.data = stats_manager.StatsManager(smid=identifier) + self._populate_mock_stats() + fname = os.path.basename(self.data.SaveSummary(self.tempdir)) + self.assertTrue(fname.startswith(identifier)) + + def test_SaveSummaryJSON(self): + """SaveSummaryJSON saves the added data properly in JSON format.""" + self._populate_mock_stats() + fname = "unittest_summary.json" + expected_fname = os.path.join(self.tempdir, fname) + fname = self.data.SaveSummaryJSON(self.tempdir, fname) + # Assert the reported fname is the same as the expected fname + self.assertEqual(expected_fname, fname) + # Assert only the reported fname is output (in the tempdir) + self.assertEqual( + set([os.path.basename(fname)]), set(os.listdir(self.tempdir)) + ) + with open(fname, "r") as f: + summary = json.load(f) + self.assertAlmostEqual(100000.0, summary["A"]["mean"]) + self.assertEqual("milliwatt", summary["A"]["unit"]) + self.assertAlmostEqual(2.5, summary["B"]["mean"]) + self.assertEqual("millivolt", summary["B"]["unit"]) + + def test_SaveSummaryJSONSMID(self): + """SaveSummaryJSON uses the smid when creating output filename.""" + identifier = "ec" + self.data = stats_manager.StatsManager(smid=identifier) + self._populate_mock_stats() + fname = os.path.basename(self.data.SaveSummaryJSON(self.tempdir)) + self.assertTrue(fname.startswith(identifier)) + + def test_SaveSummaryJSONNoUnit(self): + """SaveSummaryJSON marks unknown units properly as N/A.""" + self._populate_mock_stats_no_unit() + self.data.CalculateStats() + fname = "unittest_summary.json" + fname = self.data.SaveSummaryJSON(self.tempdir, fname) + with open(fname, "r") as f: + summary = json.load(f) + self.assertEqual("blue", summary["A"]["unit"]) + # if no unit is specified, JSON should save 'N/A' as the unit. + self.assertEqual("N/A", summary["B"]["unit"]) + + +if __name__ == "__main__": + unittest.main() |