#!/usr/bin/env python # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # # Ignore indention messages, since legacy scripts use 2 spaces instead of 4. # pylint: disable=bad-indentation,docstring-section-indent # pylint: disable=docstring-trailing-quotes """Program to fetch power logging data from a sweetberry device or other usb device that exports a USB power logging interface. """ # Note: This is a py2/3 compatible file. from __future__ import print_function import argparse import array from distutils import sysconfig import json import logging import os import pprint import struct import sys import time import traceback import usb from stats_manager import StatsManager # Directory where hdctools installs configuration files into. LIB_DIR = os.path.join(sysconfig.get_python_lib(standard_lib=False), 'servo', 'data') # Potential config file locations: current working directory, the same directory # as powerlog.py file or LIB_DIR. CONFIG_LOCATIONS = [os.getcwd(), os.path.dirname(os.path.realpath(__file__)), LIB_DIR] def logoutput(msg): print(msg) sys.stdout.flush() def process_filename(filename): """Find the file path from the filename. If filename is already the complete path, return that directly. If filename is just the short name, look for the file in the current working directory, in the directory of the current .py file, and then in the directory installed by hdctools. If the file is found, return the complete path of the file. Args: filename: complete file path or short file name. Returns: a complete file path. Raises: IOError if filename does not exist. """ # Check if filename is absolute path. if os.path.isabs(filename) and os.path.isfile(filename): return filename # Check if filename is relative to a known config location. for dirname in CONFIG_LOCATIONS: file_at_dir = os.path.join(dirname, filename) if os.path.isfile(file_at_dir): return file_at_dir raise IOError('No such file or directory: \'%s\'' % filename) class Spower(object): """Power class to access devices on the bus. Usage: bus = Spower() Instance Variables: _dev: pyUSB device object _read_ep: pyUSB read endpoint for this interface _write_ep: pyUSB write endpoint for this interface """ # INA interface type. INA_POWER = 1 INA_BUSV = 2 INA_CURRENT = 3 INA_SHUNTV = 4 # INA_SUFFIX is used to differentiate multiple ina types for the same power # rail. No suffix for when ina type is 0 (non-existent) and when ina type is 1 # (power, no suffix for backward compatibility). INA_SUFFIX = ['', '', '_busv', '_cur', '_shuntv'] # usb power commands CMD_RESET = 0x0000 CMD_STOP = 0x0001 CMD_ADDINA = 0x0002 CMD_START = 0x0003 CMD_NEXT = 0x0004 CMD_SETTIME = 0x0005 # Map between header channel number (0-47) # and INA I2C bus/addr on sweetberry. CHMAP = { 0: (3, 0x40), 1: (1, 0x40), 2: (2, 0x40), 3: (0, 0x40), 4: (3, 0x41), 5: (1, 0x41), 6: (2, 0x41), 7: (0, 0x41), 8: (3, 0x42), 9: (1, 0x42), 10: (2, 0x42), 11: (0, 0x42), 12: (3, 0x43), 13: (1, 0x43), 14: (2, 0x43), 15: (0, 0x43), 16: (3, 0x44), 17: (1, 0x44), 18: (2, 0x44), 19: (0, 0x44), 20: (3, 0x45), 21: (1, 0x45), 22: (2, 0x45), 23: (0, 0x45), 24: (3, 0x46), 25: (1, 0x46), 26: (2, 0x46), 27: (0, 0x46), 28: (3, 0x47), 29: (1, 0x47), 30: (2, 0x47), 31: (0, 0x47), 32: (3, 0x48), 33: (1, 0x48), 34: (2, 0x48), 35: (0, 0x48), 36: (3, 0x49), 37: (1, 0x49), 38: (2, 0x49), 39: (0, 0x49), 40: (3, 0x4a), 41: (1, 0x4a), 42: (2, 0x4a), 43: (0, 0x4a), 44: (3, 0x4b), 45: (1, 0x4b), 46: (2, 0x4b), 47: (0, 0x4b), } def __init__(self, board, vendor=0x18d1, product=0x5020, interface=1, serialname=None): self._logger = logging.getLogger(__name__) self._board = board # Find the stm32. dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True) dev_list = list(dev_g) if dev_list is None: raise Exception("Power", "USB device not found") # Check if we have multiple stm32s and we've specified the serial. dev = None if serialname: for d in dev_list: dev_serial = "PyUSB dioesn't have a stable interface" try: dev_serial = usb.util.get_string(d, 256, d.iSerialNumber) except ValueError: # Incompatible pyUsb version. dev_serial = usb.util.get_string(d, d.iSerialNumber) if dev_serial == serialname: dev = d break if dev is None: raise Exception("Power", "USB device(%s) not found" % serialname) else: try: dev = dev_list[0] except TypeError: # Incompatible pyUsb version. dev = dev_list.next() self._logger.debug("Found USB device: %04x:%04x", vendor, product) self._dev = dev # Get an endpoint instance. try: dev.set_configuration() except usb.USBError: pass cfg = dev.get_active_configuration() intf = usb.util.find_descriptor(cfg, custom_match=lambda i: \ i.bInterfaceClass==255 and i.bInterfaceSubClass==0x54) self._intf = intf self._logger.debug("InterfaceNumber: %s", intf.bInterfaceNumber) read_ep = usb.util.find_descriptor( intf, # match the first IN endpoint custom_match = \ lambda e: \ usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_IN ) self._read_ep = read_ep self._logger.debug("Reader endpoint: 0x%x", read_ep.bEndpointAddress) write_ep = usb.util.find_descriptor( intf, # match the first OUT endpoint custom_match = \ lambda e: \ usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_OUT ) self._write_ep = write_ep self._logger.debug("Writer endpoint: 0x%x", write_ep.bEndpointAddress) self.clear_ina_struct() self._logger.debug("Found power logging USB endpoint.") def clear_ina_struct(self): """ Clear INA description struct.""" self._inas = [] def append_ina_struct(self, name, rs, port, addr, data=None, ina_type=INA_POWER): """Add an INA descriptor into the list of active INAs. Args: name: Readable name of this channel. rs: Sense resistor value in ohms, floating point. port: I2C channel this INA is connected to. addr: I2C addr of this INA. data: Misc data for special handling, board specific. ina_type: INA function to use, power, voltage, etc. """ ina = {} ina['name'] = name ina['rs'] = rs ina['port'] = port ina['addr'] = addr ina['type'] = ina_type # Calculate INA231 Calibration register # (see INA231 spec p.15) # CurrentLSB = uA per div = 80mV / (Rsh * 2^15) # CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000) ina['uAscale'] = 80000000. / (rs * 0x8000); ina['uWscale'] = 25. * ina['uAscale']; ina['mVscale'] = 1.25 ina['uVscale'] = 2.5 ina['data'] = data self._inas.append(ina) def wr_command(self, write_list, read_count=1, wtimeout=100, rtimeout=1000): """Write command to logger logic. This function writes byte command values list to stm, then reads byte status. Args: write_list: list of command byte values [0~255]. read_count: number of status byte values to read. Interface: write: [command, data ... ] read: [status ] Returns: bytes read, or None on failure. """ self._logger.debug("Spower.wr_command(write_list=[%s] (%d), read_count=%s)", list(bytearray(write_list)), len(write_list), read_count) # Clean up args from python style to correct types. write_length = 0 if write_list: write_length = len(write_list) if not read_count: read_count = 0 # Send command to stm32. if write_list: cmd = write_list ret = self._write_ep.write(cmd, wtimeout) self._logger.debug("RET: %s ", ret) # Read back response if necessary. if read_count: bytesread = self._read_ep.read(512, rtimeout) self._logger.debug("BYTES: [%s]", bytesread) if len(bytesread) != read_count: pass self._logger.debug("STATUS: 0x%02x", int(bytesread[0])) if read_count == 1: return bytesread[0] else: return bytesread return None def clear(self): """Clear pending reads on the stm32""" try: while True: ret = self.wr_command(b"", read_count=512, rtimeout=100, wtimeout=50) self._logger.debug("Try Clear: read %s", "success" if ret == 0 else "failure") except: pass def send_reset(self): """Reset the power interface on the stm32""" cmd = struct.pack(" time.time(): if (integration_us > 5000): time.sleep((integration_us / 1000000.) * sync_speed) for key in self._pwr: records = self._pwr[key].read_line() if not records: continue for record in records: pending_records.append(record) pending_records.sort(key=lambda r: r['ts']) aggregate_record = {"boards": set()} for record in pending_records: if record["berry"] not in aggregate_record["boards"]: for rkey in record.keys(): aggregate_record[rkey] = record[rkey] aggregate_record["boards"].add(record["berry"]) else: self._logger.info("break %s, %s", record["berry"], aggregate_record["boards"]) break if aggregate_record["boards"] == set(self._pwr.keys()): csv = "%f" % aggregate_record["ts"] for name in self._names: if name in aggregate_record: multiplier = 0.001 if (self._use_mW and name[1]==Spower.INA_POWER) else 1 value = aggregate_record[name] * multiplier csv += ", %.2f" % value name_type = name[0] + Spower.INA_SUFFIX[name[1]] self._data.AddSample(name_type, value) else: csv += ", " csv += ", %d" % aggregate_record["status"] if self._print_raw_data: logoutput(csv) aggregate_record = {"boards": set()} for r in range(0, len(self._pwr)): pending_records.pop(0) except KeyboardInterrupt: self._logger.info('\nCTRL+C caught.') finally: for key in self._pwr: self._pwr[key].stop() self._data.CalculateStats() if self._print_stats: print(self._data.SummaryToString()) save_dir = 'sweetberry%s' % time.time() if self._stats_dir: stats_dir = os.path.join(self._stats_dir, save_dir) self._data.SaveSummary(stats_dir) if self._stats_json_dir: stats_json_dir = os.path.join(self._stats_json_dir, save_dir) self._data.SaveSummaryJSON(stats_json_dir) if self._raw_data_dir: raw_data_dir = os.path.join(self._raw_data_dir, save_dir) self._data.SaveRawData(raw_data_dir) def main(argv=None): if argv is None: argv = sys.argv[1:] # Command line argument description. parser = argparse.ArgumentParser( description="Gather CSV data from sweetberry") parser.add_argument('-b', '--board', type=str, help="Board configuration file, eg. my.board", default="") parser.add_argument('-c', '--config', type=str, help="Rail config to monitor, eg my.scenario", default="") parser.add_argument('-A', '--serial', type=str, help="Serial number of sweetberry A", default="") parser.add_argument('-B', '--serial_b', type=str, help="Serial number of sweetberry B", default="") parser.add_argument('-t', '--integration_us', type=int, help="Target integration time for samples", default=100000) parser.add_argument('-s', '--seconds', type=float, help="Seconds to run capture", default=0.) parser.add_argument('--date', default=False, help="Sync logged timestamp to host date", action="store_true") parser.add_argument('--ms', default=False, help="Print timestamp as milliseconds", action="store_true") parser.add_argument('--mW', default=False, help="Print power as milliwatts, otherwise default to microwatts", action="store_true") parser.add_argument('--slow', default=False, help="Intentionally overflow", action="store_true") parser.add_argument('--print_stats', default=False, action="store_true", help="Print statistics for sweetberry readings at the end") parser.add_argument('--save_stats', type=str, nargs='?', dest='stats_dir', metavar='STATS_DIR', const=os.path.dirname(os.path.abspath(__file__)), default=None, help="Save statistics for sweetberry readings to %(metavar)s if " "%(metavar)s is specified, %(metavar)s will be created if it does " "not exist; if %(metavar)s is not specified but the flag is set, " "stats will be saved to where %(prog)s is located; if this flag is " "not set, then do not save stats") parser.add_argument('--save_stats_json', type=str, nargs='?', dest='stats_json_dir', metavar='STATS_JSON_DIR', const=os.path.dirname(os.path.abspath(__file__)), default=None, help="Save means for sweetberry readings in json to %(metavar)s if " "%(metavar)s is specified, %(metavar)s will be created if it does " "not exist; if %(metavar)s is not specified but the flag is set, " "stats will be saved to where %(prog)s is located; if this flag is " "not set, then do not save stats") parser.add_argument('--no_print_raw_data', dest='print_raw_data', default=True, action="store_false", help="Not print raw sweetberry readings at real time, default is to " "print") parser.add_argument('--save_raw_data', type=str, nargs='?', dest='raw_data_dir', metavar='RAW_DATA_DIR', const=os.path.dirname(os.path.abspath(__file__)), default=None, help="Save raw data for sweetberry readings to %(metavar)s if " "%(metavar)s is specified, %(metavar)s will be created if it does " "not exist; if %(metavar)s is not specified but the flag is set, " "raw data will be saved to where %(prog)s is located; if this flag " "is not set, then do not save raw data") parser.add_argument('-v', '--verbose', default=False, help="Very chatty printout", action="store_true") args = parser.parse_args(argv) root_logger = logging.getLogger(__name__) if args.verbose: root_logger.setLevel(logging.DEBUG) else: root_logger.setLevel(logging.INFO) # if powerlog is used through main, log to sys.stdout if __name__ == "__main__": stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) root_logger.addHandler(stdout_handler) integration_us_request = args.integration_us if not args.board: raise Exception("Power", "No board file selected, see board.README") if not args.config: raise Exception("Power", "No config file selected, see board.README") brdfile = args.board cfgfile = args.config seconds = args.seconds serial_a = args.serial serial_b = args.serial_b sync_date = args.date use_ms = args.ms use_mW = args.mW print_stats = args.print_stats stats_dir = args.stats_dir stats_json_dir = args.stats_json_dir print_raw_data = args.print_raw_data raw_data_dir = args.raw_data_dir boards = [] sync_speed = .8 if args.slow: sync_speed = 1.2 # Set up logging interface. powerlogger = powerlog(brdfile, cfgfile, serial_a=serial_a, serial_b=serial_b, sync_date=sync_date, use_ms=use_ms, use_mW=use_mW, print_stats=print_stats, stats_dir=stats_dir, stats_json_dir=stats_json_dir, print_raw_data=print_raw_data,raw_data_dir=raw_data_dir) # Start logging. powerlogger.start(integration_us_request, seconds, sync_speed=sync_speed) if __name__ == "__main__": main()