From 5f1832666f71df85bb3c451f514c6319a9c4dacb Mon Sep 17 00:00:00 2001 From: Vadim Sukhomlinov Date: Mon, 8 Jun 2020 18:49:27 -0700 Subject: test/tpm_test: fix cros lint complains BUG=b:158533918 TEST=tpmtest.py Signed-off-by: Vadim Sukhomlinov Change-Id: Ia6b59c49afc7ed19507fab254cab44b2a5c1953b Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2236588 Reviewed-by: Vadim Sukhomlinov Reviewed-by: Vadim Bendebury Tested-by: Vadim Sukhomlinov Commit-Queue: Vadim Sukhomlinov Auto-Submit: Vadim Sukhomlinov (cherry picked from commit e1b8aaed2a60b88dd047bc6e341327636d0f0212) Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2314113 Tested-by: Mary Ruthven Reviewed-by: Mary Ruthven Commit-Queue: Mary Ruthven (cherry picked from commit 69e127fb8e0235596d82e18df4a0a9d89997279e) Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2350275 --- test/tpm_test/crypto_test.py | 413 ++++++++++++++++++------------------- test/tpm_test/drbg_test.py | 101 +++++----- test/tpm_test/ecc_test.py | 265 ++++++++++++------------ test/tpm_test/ecies_test.py | 122 +++++------ test/tpm_test/genvectors.py | 16 +- test/tpm_test/hash_test.py | 193 +++++++++--------- test/tpm_test/hkdf_test.py | 51 ++--- test/tpm_test/rsa_test.py | 458 +++++++++++++++++++++--------------------- test/tpm_test/subcmd.py | 4 +- test/tpm_test/tpmtest.py | 298 ++++++++++++++------------- test/tpm_test/trng_test.py | 45 +++-- test/tpm_test/upgrade_test.py | 95 ++++----- test/tpm_test/utils.py | 41 ++-- 13 files changed, 1062 insertions(+), 1040 deletions(-) mode change 100644 => 100755 test/tpm_test/genvectors.py diff --git a/test/tpm_test/crypto_test.py b/test/tpm_test/crypto_test.py index 36253952c5..c11528c8f2 100644 --- a/test/tpm_test/crypto_test.py +++ b/test/tpm_test/crypto_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -7,7 +7,6 @@ from __future__ import print_function -import binascii import struct import xml.etree.ElementTree as ET @@ -19,225 +18,211 @@ DECRYPT = 0 ENCRYPT = 1 def get_attribute(tdesc, attr_name, required=True): - """Retrieve an attribute value from an XML node. - - Args: - tdesc: an Element of the ElementTree, a test descriptor containing - necessary information to run a single encryption/description - session. - attr_name: a string, the name of the attribute to retrieve. - required: a Boolean, if True - the attribute must be present in the - descriptor, otherwise it is considered optional - Returns: - The attribute value as a string (ascii or binary) - Raises: - subcmd.TpmTestError: on various format errors, or in case a required - attribute is not found, the error message describes the problem. - - """ - # Fields stored in hex format by default. - default_hex = ('aad', 'cipher_text', 'iv', 'key', 'tag') - - data = tdesc.find(attr_name) - if data is None: - if required: - raise subcmd.TpmTestError('node "%s" does not have attribute "%s"' % - (tdesc.get('name'), attr_name)) - return '' - - # Attribute is present, does it have to be decoded from hex? - cell_format = data.get('format') - if not cell_format: - if attr_name in default_hex: - cell_format = 'hex' - else: - cell_format = 'ascii' - elif cell_format not in ('hex', 'ascii'): - raise subcmd.TpmTestError('%s:%s, unrecognizable format "%s"' % - (tdesc.get('name'), attr_name, cell_format)) - - text = ' '.join(x.strip() for x in data.text.splitlines() if x) - if cell_format == 'ascii': - return text - - # Drop spaces from hex representation. - text = text.replace(' ', '') - - # Convert hex-text to little-endian binary (in 4-byte word chunks) - value = '' - for x in range(len(text)/8): - try: - value += struct.pack(': ....", where is - the major encryption mode (say AED or DES) and submode - a variant of - the major scheme, if exists. - - op_type: an int, encodes the operation to perform (encrypt/decrypt), passed - directly to the device as a field in the extended command - key: a binary string - iv: a binary string, might be empty - aad: additional authenticated data - in_text: a binary string, the input of the encrypt/decrypt operation - out_text: a binary string, might be empty, the expected output of the - operation. Note that it could be shorter than actual output (padded to - integer number of blocks), in which case only its length of bytes is - compared debug_mode: a Boolean, if True - enables tracing on the console - tpm: a TPM object to send extended commands to an initialized TPM - - Returns: - The actual binary string, result of the operation, if the - comparison with the expected value was successful. - - Raises: - subcmd.TpmTestError: in case there were problems parsing the node name, or - verifying the operation results. - - """ - mode_name, submode_name = node_name.split(':') - submode_name = submode_name[:3].upper() - - mode = SUPPORTED_MODES.get(mode_name.upper()) - if not mode: - raise subcmd.TpmTestError('unrecognizable mode in node "%s"' % node_name) - - submode = mode.submodes.get(submode_name, 0) - cmd = '%c' % op_type # Encrypt or decrypt - cmd += '%c' % submode # A particular type of a generic algorithm. - cmd += '%c' % len(key) - cmd += key - cmd += '%c' % len(iv) - if iv: - cmd += iv - cmd += '%c' % len(aad) - if aad: - cmd += aad - cmd += struct.pack('>H', len(in_text)) - cmd += in_text - if tpm.debug_enabled(): - print('%d:%d cmd size' % (op_type, mode.subcmd), - len(cmd), utils.hex_dump(cmd)) - wrapped_response = tpm.command(tpm.wrap_ext_command(mode.subcmd, cmd)) - real_out_text = tpm.unwrap_ext_response(mode.subcmd, wrapped_response) - if out_text: - if len(real_out_text) > len(out_text): - real_out_text = real_out_text[:len(out_text)] # Ignore padding - if real_out_text != out_text: - if tpm.debug_enabled(): - print('Out text mismatch in node %s:\n' % node_name) - else: - raise subcmd.TpmTestError( - 'Out text mismatch in node %s, operation %s:\n' - 'In text:%sExpected out text:%sReal out text:%s' % ( - node_name, 'ENCRYPT' if op_type == ENCRYPT else 'DECRYPT', - utils.hex_dump(in_text), - utils.hex_dump(out_text), - utils.hex_dump(real_out_text))) - return real_out_text +# pylint: disable=too-many-arguments, too-many-locals +def crypto_run(node_name, op_type, key, init_vec, aad, in_text, out_text, tpm): + """Perform a basic operation(encrypt or decrypt). + + This function creates an extended command with the requested parameters, + sends it to the device, and then compares the response to the expected + value. + + Args: + node_name: a string, the name of the XML node this data comes from. The + format of the name is ": ....", where is + the major encryption mode (say AED or DES) and submode - a variant of + the major scheme, if exists. + op_type: an int, encodes the operation to perform (encrypt/decrypt), + passed directly to the device as a field in the extended command + key: a binary string + init_vec: a binary string, might be empty + aad: additional authenticated data + in_text: a binary string, the input of the encrypt/decrypt operation + out_text: a binary string, might be empty, the expected output of the + operation. Note that it could be shorter than actual output (padded to + integer number of blocks), in which case only its length of bytes is + compared debug_mode: a Boolean, if True - enables tracing on the console + tpm: a TPM object to send extended commands to an initialized TPM + + Returns: + The actual binary string, result of the operation, if the + comparison with the expected value was successful. + + Raises: + subcmd.TpmTestError: in case there were problems parsing the node name, + or verifying the operation results. + """ + mode_name, submode_name = node_name.split(':') + submode_name = submode_name[:3].upper() + + # commands below will raise exception if incorrect mode/submode used + try: + mode_cmd, submodes = SUPPORTED_MODES[mode_name.upper()] + submode = submodes[submode_name] + except KeyError: + raise subcmd.TpmTestError('unrecognizable mode in node "%s"' % + node_name) + + cmd = '%c' % op_type # Encrypt or decrypt + cmd += '%c' % submode # A particular type of a generic algorithm. + cmd += '%c' % len(key) + cmd += key + cmd += '%c' % len(init_vec) + if init_vec: + cmd += init_vec + cmd += '%c' % len(aad) + if aad: + cmd += aad + cmd += struct.pack('>H', len(in_text)) + cmd += in_text + if tpm.debug_enabled(): + print('%d:%d cmd size' % (op_type, mode_cmd), + len(cmd), utils.hex_dump(cmd)) + wrapped_response = tpm.command(tpm.wrap_ext_command(mode_cmd, cmd)) + real_out_text = tpm.unwrap_ext_response(mode_cmd, wrapped_response) + if out_text: + if len(real_out_text) > len(out_text): + real_out_text = real_out_text[:len(out_text)] # Ignore padding + if real_out_text != out_text: + if tpm.debug_enabled(): + print('Out text mismatch in node %s:\n' % node_name) + else: + raise subcmd.TpmTestError( + 'Out text mismatch in node %s, operation %s:\n' + 'In text:%sExpected out text:%sReal out text:%s' % ( + node_name, 'ENCRYPT' if op_type == ENCRYPT else 'DECRYPT', + utils.hex_dump(in_text), + utils.hex_dump(out_text), + utils.hex_dump(real_out_text))) + return real_out_text def crypto_test(tdesc, tpm): - """Perform a single test described in the xml file. - - The xml node contains all pertinent information about the test inputs and - outputs. - - Args: - tdesc: an Element of the ElementTree, a test descriptor containing - necessary information to run a single encryption/description - session. - tpm: a TPM object to send extended commands to an initialized TPM - Raises: - subcmd.TpmTestError: on various execution errors, the details are included - in the error message. - - """ - node_name = tdesc.get('name') - key = get_attribute(tdesc, 'key') - if len(key) not in (16, 24, 32): - raise subcmd.TpmTestError('wrong key size "%s:%s"' % ( - node_name, - ''.join('%2.2x' % ord(x) for x in key))) - iv = get_attribute(tdesc, 'iv', required=False) - if iv and not node_name.startswith('AES:GCM') and len(iv) != 16: - raise subcmd.TpmTestError('wrong iv size "%s:%s"' % ( - node_name, - ''.join('%2.2x' % ord(x) for x in iv))) - clear_text = get_attribute(tdesc, 'clear_text', required=False) - if clear_text: - clear_text_len = get_attribute(tdesc, 'clear_text_len', required=False) + """Perform a single test described in the xml file. + + The xml node contains all pertinent information about the test inputs and + outputs. + + Args: + tdesc: an Element of the ElementTree, a test descriptor containing + necessary information to run a single encryption/description session. + tpm: a TPM object to send extended commands to an initialized TPM + + Raises: + subcmd.TpmTestError: on various execution errors, the details are + included in the error message. + """ + node_name = tdesc.get('name') + key = get_attribute(tdesc, 'key') + if len(key) not in (16, 24, 32): + raise subcmd.TpmTestError('wrong key size "%s:%s"' % ( + node_name, + ''.join('%2.2x' % ord(x) for x in key))) + init_vec = get_attribute(tdesc, 'iv', required=False) + if init_vec and not node_name.startswith('AES:GCM') and len(init_vec) != 16: + raise subcmd.TpmTestError('wrong iv size "%s:%s"' % ( + node_name, + ''.join('%2.2x' % ord(x) for x in init_vec))) + clear_text = get_attribute(tdesc, 'clear_text', required=False) + if clear_text: + clear_text_len = get_attribute(tdesc, 'clear_text_len', required=False) + if clear_text_len: + clear_text = clear_text[:int(clear_text_len)] + else: + clear_text_len = None + if tpm.debug_enabled(): + print('clear text size', len(clear_text)) + cipher_text = get_attribute(tdesc, 'cipher_text', required=False) if clear_text_len: - clear_text = clear_text[:int(clear_text_len)] - else: - clear_text_len = None - if tpm.debug_enabled(): - print('clear text size', len(clear_text)) - cipher_text = get_attribute(tdesc, 'cipher_text', required=False) - if clear_text_len: - cipher_text = cipher_text[:int(clear_text_len)] - tag = get_attribute(tdesc, 'tag', required=False) - aad = get_attribute(tdesc, 'aad', required=False) - if aad: - aad_len = get_attribute(tdesc, 'aad_len', required=False) - if aad_len: - aad = aad[:int(aad_len)] - real_cipher_text = crypto_run(node_name, ENCRYPT, key, iv, - aad or '', clear_text, cipher_text + tag, tpm) - crypto_run(node_name, DECRYPT, key, iv, aad or '', - real_cipher_text[:len(real_cipher_text) - len(tag)], - clear_text + tag, tpm) - print(utils.cursor_back() + 'SUCCESS: %s' % node_name) + cipher_text = cipher_text[:int(clear_text_len)] + tag = get_attribute(tdesc, 'tag', required=False) + aad = get_attribute(tdesc, 'aad', required=False) + if aad: + aad_len = get_attribute(tdesc, 'aad_len', required=False) + if aad_len: + aad = aad[:int(aad_len)] + real_cipher_text = crypto_run(node_name, ENCRYPT, key, init_vec, + aad or '', clear_text, cipher_text + tag, tpm) + crypto_run(node_name, DECRYPT, key, init_vec, aad or '', + real_cipher_text[:len(real_cipher_text) - len(tag)], + clear_text + tag, tpm) + print(utils.cursor_back() + 'SUCCESS: %s' % node_name) def crypto_tests(tpm, xml_file): - tree = ET.parse(xml_file) - root = tree.getroot() - for child in root: - crypto_test(child, tpm) + """Run AES cryptographic tests""" + tree = ET.parse(xml_file) + root = tree.getroot() + for child in root: + crypto_test(child, tpm) diff --git a/test/tpm_test/drbg_test.py b/test/tpm_test/drbg_test.py index 097a222923..e85e841d7c 100644 --- a/test/tpm_test/drbg_test.py +++ b/test/tpm_test/drbg_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2019 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -24,7 +24,7 @@ DRBG_INIT = 0 DRBG_RESEED = 1 DRBG_GENERATE = 2 -test_inputs = ( +TEST_INPUTS = ( (DRBG_INIT, ('C40894D0C37712140924115BF8A3110C7258532365BB598F81B127A5E4CB8EB0', 'FBB1EDAF92D0C2699F5C0A7418D308B09AC679FFBB0D8918C8E62D35091DD2B9', @@ -52,57 +52,60 @@ test_inputs = ( ) _DRBG_INIT_FORMAT = '{op:c}{p0l:s}{p0}{p1l:s}{p1}{p2l:s}{p2}' -def _drbg_init_cmd(op, entropy, nonce, perso): - return _DRBG_INIT_FORMAT.format(op=op, - p0l=pack('>H', len(entropy)), p0=entropy, - p1l=pack('>H', len(nonce)), p1=nonce, - p2l=pack('>H', len(perso)), p2=perso) +def _drbg_init_cmd(operation, entropy, nonce, perso): + return _DRBG_INIT_FORMAT.format(op=operation, + p0l=pack('>H', len(entropy)), p0=entropy, + p1l=pack('>H', len(nonce)), p1=nonce, + p2l=pack('>H', len(perso)), p2=perso) _DRBG_GEN_FORMAT = '{op:c}{p0l:s}{p0}{p1l:s}' def _drbg_gen_cmd(inp, out): - outlen = len(out) - if outlen == 0: - outlen = 32 # if we don't care about output value, still need to have it - return _DRBG_GEN_FORMAT.format(op=DRBG_GENERATE, - p0l=pack('>H', len(inp)), p0=inp, - p1l=pack('>H', outlen)) + outlen = len(out) + if outlen == 0: + outlen = 32 # if we don't care about output value, still need to have it + return _DRBG_GEN_FORMAT.format(op=DRBG_GENERATE, + p0l=pack('>H', len(inp)), p0=inp, + p1l=pack('>H', outlen)) def drbg_test(tpm): - """Runs DRBG test case. - - Args: - tpm: a tpm object used to communicate with the device - - Raises: - subcmd.TpmTestError: on unexpected target responses - """ - - for test in test_inputs: - drbg_op, drbg_params = test - if drbg_op == DRBG_INIT: - entropy, nonce, perso = drbg_params - cmd = _drbg_init_cmd(drbg_op, a2b(entropy), a2b(nonce), a2b(perso)) - response = tpm.command(tpm.wrap_ext_command(subcmd.DRBG_TEST, cmd)) - if response != EMPTY_DRBG_RESPONSE: - raise subcmd.TpmTestError("Unexpected response to DRBG_INIT: %s" % - (utils.hex_dump(wrapped_response))) - elif drbg_op == DRBG_RESEED: - entropy, inp1, inp2 = drbg_params - cmd = _drbg_init_cmd(drbg_op, a2b(entropy), a2b(inp1), a2b(inp2)) - response = tpm.command(tpm.wrap_ext_command(subcmd.DRBG_TEST, cmd)) - if response != EMPTY_DRBG_RESPONSE: - raise subcmd.TpmTestError("Unexpected response to DRBG_RESEED: %s" % - (utils.hex_dump(wrapped_response))) - elif drbg_op == DRBG_GENERATE: - inp, expected = drbg_params - cmd = _drbg_gen_cmd(a2b(inp), a2b(expected)) - response = tpm.command(tpm.wrap_ext_command(subcmd.DRBG_TEST, cmd)) - if expected != '': - result = response[12:] - if a2b(expected) != result: - raise subcmd.TpmTestError('error:\nexpected %s\nreceived %s' % - (utils.hex_dump(a2b(expected)), - utils.hex_dump(result))) - print('%sSUCCESS: %s' % (utils.cursor_back(), 'DRBG test')) + """Runs DRBG test case. + + Args: + tpm: a tpm object used to communicate with the device + + Raises: + subcmd.TpmTestError: on unexpected target responses + """ + + for test in TEST_INPUTS: + drbg_op, drbg_params = test + if drbg_op == DRBG_INIT: + entropy, nonce, perso = drbg_params + cmd = _drbg_init_cmd(drbg_op, a2b(entropy), a2b(nonce), a2b(perso)) + response = tpm.command(tpm.wrap_ext_command(subcmd.DRBG_TEST, cmd)) + if response != EMPTY_DRBG_RESPONSE: + raise subcmd.TpmTestError('Unexpected response to ' + 'DRBG_INIT: %s' % + (utils.hex_dump(response))) + elif drbg_op == DRBG_RESEED: + entropy, inp1, inp2 = drbg_params + cmd = _drbg_init_cmd(drbg_op, a2b(entropy), a2b(inp1), a2b(inp2)) + response = tpm.command(tpm.wrap_ext_command(subcmd.DRBG_TEST, cmd)) + if response != EMPTY_DRBG_RESPONSE: + raise subcmd.TpmTestError('Unexpected response to ' + 'DRBG_RESEED: %s' % + (utils.hex_dump(response))) + elif drbg_op == DRBG_GENERATE: + inp, expected = drbg_params + cmd = _drbg_gen_cmd(a2b(inp), a2b(expected)) + response = tpm.command(tpm.wrap_ext_command(subcmd.DRBG_TEST, cmd)) + if expected != '': + result = response[12:] + if a2b(expected) != result: + raise subcmd.TpmTestError('error:\nexpected %s' + '\nreceived %s' % + (utils.hex_dump(a2b(expected)), + utils.hex_dump(result))) + print('%sSUCCESS: %s' % (utils.cursor_back(), 'DRBG test')) diff --git a/test/tpm_test/ecc_test.py b/test/tpm_test/ecc_test.py index aebaa7c4b3..a20c6e0e4b 100644 --- a/test/tpm_test/ecc_test.py +++ b/test/tpm_test/ecc_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -29,9 +29,7 @@ _ECC_CURVES = { # TPM2 signature codes. _SIGN_MODE = { 'NONE': 0x00, - 'ECDSA': 0x18, - # TODO(ngm): add support for SCHNORR. - # 'SCHNORR': 0x1c + 'ECDSA': 0x18 } # TPM2 ALG codes. @@ -42,7 +40,7 @@ _HASH = { } _HASH_FUNC = { - 'NIST-P256': hashlib.sha256 + 'NIST-P256': hashlib.sha256 } NIST_P256_QX = ('12c3d6a2679ca8ee3c4d927f204ed5bc' @@ -53,7 +51,7 @@ NIST_P256_QY = ('5c85ad7413971172fca5738fee9d0e7b' PKEY = ('fc441e07744e48f109b7e66b29482f7b' '7e3ec91fa27fd4870991b289fea0d20a') -## +# # Field size: # FIELD LENGTH # OP 1 @@ -120,59 +118,61 @@ _TEST_POINT = '{o:c}{c:c}{qxl:s}{qx}{qyl:s}{qy}' _TEST_KEYGEN = '{o:c}{c:c}' def _sign_cmd(curve_id, hash_func, sign_mode, msg): - op = _ECC_OPCODES['SIGN'] - digest = hash_func(msg).digest() - digest_len = len(digest) - return _TEST_SIGN.format(o=op, c=curve_id, s=sign_mode, h=_HASH['SHA256'], - dl=struct.pack('>H', digest_len), dig=digest) + ecc_op = _ECC_OPCODES['SIGN'] + digest = hash_func(msg).digest() + digest_len = len(digest) + return _TEST_SIGN.format(o=ecc_op, c=curve_id, s=sign_mode, + h=_HASH['SHA256'], + dl=struct.pack('>H', digest_len), dig=digest) def _sign_any_cmd(curve_id, hash_func, sign_mode, msg, pkey): - op = _ECC_OPCODES['SIGN_ANY'] - digest = hash_func(msg).digest() - digest_len = len(digest) - return _TEST_SIGN_ANY.format(o=op, c=curve_id, s=sign_mode, - h=_HASH['SHA256'], - dl=struct.pack('>H', digest_len), dig=digest, - pl=struct.pack('>H', len(pkey)), pk=pkey) - - -def _verify_cmd(curve_id, hash_func, sign_mode, msg, sig): - op = _ECC_OPCODES['VERIFY'] - digest = hash_func(msg).digest() - digest_len = len(digest) - return _TEST_VERIFY.format(o=op, c=curve_id, sm=sign_mode, h=_HASH['SHA256'], - rs=sig, - dl=struct.pack('>H', digest_len), - dig=digest) - - -def _verify_any_cmd(curve_id, hash_func, sign_mode, msg, sig, qx, qy): - op = _ECC_OPCODES['VERIFY_ANY'] - digest = hash_func(msg).digest() - digest_len = len(digest) - return _TEST_VERIFY_ANY.format(o=op, c=curve_id, sm=sign_mode, + ecc_op = _ECC_OPCODES['SIGN_ANY'] + digest = hash_func(msg).digest() + digest_len = len(digest) + return _TEST_SIGN_ANY.format(o=ecc_op, c=curve_id, s=sign_mode, h=_HASH['SHA256'], - rs=sig, dl=struct.pack('>H', digest_len), dig=digest, - qxl=struct.pack('>H', len(qx)), qx=qx, - qyl=struct.pack('>H', len(qy)), qy=qy) + pl=struct.pack('>H', len(pkey)), pk=pkey) + -def _test_point_cmd(curve_id, qx, qy): - op = _ECC_OPCODES['TEST_POINT'] - return _TEST_POINT.format(o=op, c=curve_id, - qxl=struct.pack('>H', len(qx)), qx=qx, - qyl=struct.pack('>H', len(qy)), qy=qy) +def _verify_cmd(curve_id, hash_func, sign_mode, msg, sig): + ecc_op = _ECC_OPCODES['VERIFY'] + digest = hash_func(msg).digest() + digest_len = len(digest) + return _TEST_VERIFY.format(o=ecc_op, c=curve_id, sm=sign_mode, + h=_HASH['SHA256'], + rs=sig, + dl=struct.pack('>H', digest_len), + dig=digest) + +# pylint: disable=too-many-arguments +def _verify_any_cmd(curve_id, hash_func, sign_mode, msg, sig, q_x, q_y): + ecc_op = _ECC_OPCODES['VERIFY_ANY'] + digest = hash_func(msg).digest() + digest_len = len(digest) + return _TEST_VERIFY_ANY.format(o=ecc_op, c=curve_id, sm=sign_mode, + h=_HASH['SHA256'], + rs=sig, + dl=struct.pack('>H', digest_len), dig=digest, + qxl=struct.pack('>H', len(q_x)), qx=q_x, + qyl=struct.pack('>H', len(q_y)), qy=q_y) + +def _test_point_cmd(curve_id, q_x, q_y): + ecc_op = _ECC_OPCODES['TEST_POINT'] + return _TEST_POINT.format(o=ecc_op, c=curve_id, + qxl=struct.pack('>H', len(q_x)), qx=q_x, + qyl=struct.pack('>H', len(q_y)), qy=q_y) def _keygen_cmd(curve_id): - op = _ECC_OPCODES['KEYGEN'] - return _TEST_KEYGEN.format(o=op, c=curve_id) + ecc_op = _ECC_OPCODES['KEYGEN'] + return _TEST_KEYGEN.format(o=ecc_op, c=curve_id) def _keyderive_cmd(curve_id, seed): - op = _ECC_OPCODES['KEYDERIVE'] - seed_len = len(seed) - return _TEST_KEYDERIVE.format(o=op, c=curve_id, - ml=struct.pack('>H', seed_len), msg=seed) + ecc_op = _ECC_OPCODES['KEYDERIVE'] + seed_len = len(seed) + return _TEST_KEYDERIVE.format(o=ecc_op, c=curve_id, + ml=struct.pack('>H', seed_len), msg=seed) _SIGN_INPUTS = ( @@ -192,98 +192,103 @@ _KEYDERIVE_INPUTS = ( def _sign_test(tpm): - msg = 'Hello CR50' - - for data in _SIGN_INPUTS: - curve_id, sign_mode = data - test_name = 'ECC-SIGN:%s:%s' % data - cmd = _sign_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], - _SIGN_MODE[sign_mode], msg) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) - expected = '\x01' - signature = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - if signature[:1] != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(signature[:1]), utils.hex_dump(expected))) - cmd = _verify_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], - _SIGN_MODE[sign_mode], msg, signature[1:]) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) - verified = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - - if verified[:1] != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(verified[:1]), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + msg = 'Hello CR50' + + for data in _SIGN_INPUTS: + curve_id, sign_mode = data + test_name = 'ECC-SIGN:%s:%s' % data + cmd = _sign_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], + _SIGN_MODE[sign_mode], msg) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) + expected = '\x01' + signature = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) + if signature[:1] != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(signature[:1]), + utils.hex_dump(expected))) + cmd = _verify_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], + _SIGN_MODE[sign_mode], msg, signature[1:]) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) + verified = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) + + if verified[:1] != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(verified[:1]), + utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _sign_test_any(tpm): - msg = 'Hello CR50' + msg = 'Hello CR50' + + for data in _SIGN_INPUTS: + curve_id, sign_mode = data + test_name = 'ECC-SIGN, Q:%s:%s' % data + cmd = _sign_any_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], + _SIGN_MODE[sign_mode], msg, a2b(PKEY)) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) + expected = '\x01' + signature = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) + if signature[:1] != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(signature[:1]), + utils.hex_dump(expected))) + # make sure properly supplied Q.x, Q.y works + cmd = _verify_any_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], + _SIGN_MODE[sign_mode], msg, signature[1:], + a2b(NIST_P256_QX), a2b(NIST_P256_QY)) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) + verified = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) + if verified[:1] != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(verified[:1]), + utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) - for data in _SIGN_INPUTS: - curve_id, sign_mode = data - test_name = 'ECC-SIGN, Q:%s:%s' % data - cmd = _sign_any_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], - _SIGN_MODE[sign_mode], msg, a2b(PKEY)) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) - expected = '\x01' - signature = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - if signature[:1] != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(signature[:1]), utils.hex_dump(expected))) - # make sure properly supplied Q.x, Q.y works - cmd = _verify_any_cmd(_ECC_CURVES[curve_id], _HASH_FUNC[curve_id], - _SIGN_MODE[sign_mode], msg, signature[1:], - a2b(NIST_P256_QX), a2b(NIST_P256_QY)) +def _point_test(tpm): + test_name = 'POINT-TEST: NIST-P256' + cmd = _test_point_cmd(_ECC_CURVES['NIST-P256'], + a2b(NIST_P256_QX), a2b(NIST_P256_QY)) wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) verified = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - if verified[:1] != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(verified[:1]), utils.hex_dump(expected))) + expected = '\x01' + if verified != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(verified), utils.hex_dump(expected))) print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) -def _point_test(tpm): - test_name = 'POINT-TEST: NIST-P256' - cmd = _test_point_cmd(_ECC_CURVES['NIST-P256'], - a2b(NIST_P256_QX), a2b(NIST_P256_QY)) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) - verified = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - expected = '\x01' - if verified != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(verified), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) - def _keygen_test(tpm): - for data in _KEYGEN_INPUTS: - curve_id, = data - test_name = 'ECC-KEYGEN:%s' % data - cmd = _keygen_cmd(_ECC_CURVES[curve_id]) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) - valid = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - expected = '\x01' - if valid[:1] != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(valid[:1]), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _KEYGEN_INPUTS: + curve_id, = data + test_name = 'ECC-KEYGEN:%s' % data + cmd = _keygen_cmd(_ECC_CURVES[curve_id]) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) + valid = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) + expected = '\x01' + if valid[:1] != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(valid[:1]), utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _keyderive_test(tpm): - for data in _KEYDERIVE_INPUTS: - curve_id, seed_bytes = data - seed = os.urandom(seed_bytes) - test_name = 'ECC-KEYDERIVE:%s' % data[0] - cmd = _keyderive_cmd(_ECC_CURVES[curve_id], seed) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) - valid = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) - expected = '\x01' - if valid != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(valid), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _KEYDERIVE_INPUTS: + curve_id, seed_bytes = data + seed = os.urandom(seed_bytes) + test_name = 'ECC-KEYDERIVE:%s' % data[0] + cmd = _keyderive_cmd(_ECC_CURVES[curve_id], seed) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECC, cmd)) + valid = tpm.unwrap_ext_response(subcmd.ECC, wrapped_response) + expected = '\x01' + if valid != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % ( + test_name, utils.hex_dump(valid), utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def ecc_test(tpm): - _sign_test(tpm) - _sign_test_any(tpm) - _point_test(tpm) - _keygen_test(tpm) - _keyderive_test(tpm) + """Run ECDSA tests""" + _sign_test(tpm) + _sign_test_any(tpm) + _point_test(tpm) + _keygen_test(tpm) + _keyderive_test(tpm) diff --git a/test/tpm_test/ecies_test.py b/test/tpm_test/ecies_test.py index 96620c14b5..04d5963310 100644 --- a/test/tpm_test/ecies_test.py +++ b/test/tpm_test/ecies_test.py @@ -1,10 +1,9 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Module for testing ECIES using extended commands.""" -from binascii import b2a_hex as b2a from binascii import a2b_hex as a2b from struct import pack @@ -19,7 +18,7 @@ _ECIES_OPCODES = { # # Command format. # -# WIDTH FIELD +# WIDTH FIELD # 1 OP # 1 MSB IN LEN # 1 LSB IN LEN @@ -40,7 +39,8 @@ _ECIES_OPCODES = { # 1 LSB INFO LEN # INFO_LEN INFO # -_ECIES_CMD_FORMAT = '{o:c}{inl:s}{input}{al:s}{iv}{xl:s}{x}{yl:s}{y}{sl:s}{s}{il:s}{i}' +_ECIES_CMD_FORMAT = ('{o:c}{inl:s}{input}{al:s}{iv}' + '{xl:s}{x}{yl:s}{y}{sl:s}{s}{il:s}{i}') _DEFAULT_SALT = 'Salt!' @@ -168,66 +168,72 @@ _ECIES_COMPAT_INPUTS = ( ) ) - -def _encrypt_cmd(auth, input, iv, pubx, puby, salt, info): - op = _ECIES_OPCODES['ENCRYPT'] - return _ECIES_CMD_FORMAT.format(o=op, inl=pack('>H', len(auth+input)), input=auth+input, - al=pack('>H', len(auth)), iv=iv, - xl=pack('>H', len(pubx)), x=pubx, - yl=pack('>H', len(puby)), y=puby, - sl=pack('>H', len(salt)), s=salt, - il=pack('>H', len(info)), i=info) - - -def _decrypt_cmd(auth, input, iv, d, salt, info): - op = _ECIES_OPCODES['DECRYPT'] - return _ECIES_CMD_FORMAT.format(o=op, inl=pack('>H', len(input)), input=input, - al=pack('>H', len(auth)), iv=iv, - xl=pack('>H', len(d)), x=d, - yl=pack('>H', 0), y='', - sl=pack('>H', len(salt)), s=salt, - il=pack('>H', len(info)), i=info) - - +# pylint: disable=too-many-arguments +def _encrypt_cmd(auth, in_data, init_vec, pubx, puby, salt, info): + ecies_op = _ECIES_OPCODES['ENCRYPT'] + return _ECIES_CMD_FORMAT.format(o=ecies_op, + inl=pack('>H', len(auth+in_data)), + input=auth+in_data, + al=pack('>H', len(auth)), iv=init_vec, + xl=pack('>H', len(pubx)), x=pubx, + yl=pack('>H', len(puby)), y=puby, + sl=pack('>H', len(salt)), s=salt, + il=pack('>H', len(info)), i=info) + +# pylint: disable=too-many-arguments +def _decrypt_cmd(auth, in_data, init_vec, tag, salt, info): + ecies_op = _ECIES_OPCODES['DECRYPT'] + return _ECIES_CMD_FORMAT.format(o=ecies_op, inl=pack('>H', len(in_data)), + input=in_data, + al=pack('>H', len(auth)), iv=init_vec, + xl=pack('>H', len(tag)), x=tag, + yl=pack('>H', 0), y='', + sl=pack('>H', len(salt)), s=salt, + il=pack('>H', len(info)), i=info) + +# pylint: disable=too-many-locals def _ecies_test(tpm): - for data in _ECIES_INPUTS: - auth, input, iv, d, pubx, puby, salt, info = data[:-1] - test_name = 'ECIES-TEST:%s' % data[-1] - cmd = _encrypt_cmd(auth, input, iv, pubx, puby, salt, info) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECIES, cmd)) - encrypted = tpm.unwrap_ext_response(subcmd.ECIES, wrapped_response) - # check length of encrypted. - if not encrypted: - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, 'null encrypted')) - - cmd = _decrypt_cmd(auth, encrypted, iv, d, salt, info) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECIES, cmd)) - decrypted = tpm.unwrap_ext_response(subcmd.ECIES, wrapped_response) - - expected = auth + input - if decrypted != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(decrypted), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _ECIES_INPUTS: + auth, in_data, init_vec, tag, pubx, puby, salt, info = data[:-1] + test_name = 'ECIES-TEST:%s' % data[-1] + cmd = _encrypt_cmd(auth, in_data, init_vec, pubx, puby, salt, info) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECIES, cmd)) + encrypted = tpm.unwrap_ext_response(subcmd.ECIES, wrapped_response) + # check length of encrypted. + if not encrypted: + raise subcmd.TpmTestError('%s error:%s' % + (test_name, 'null encrypted')) + + cmd = _decrypt_cmd(auth, encrypted, init_vec, tag, salt, info) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECIES, cmd)) + decrypted = tpm.unwrap_ext_response(subcmd.ECIES, wrapped_response) + + expected = auth + in_data + if decrypted != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % + (test_name, utils.hex_dump(decrypted), + utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _compat_test(tpm): - for data in _ECIES_COMPAT_INPUTS: - auth, plaintext, iv, ciphertext, d, salt, info = data[:-1] - test_name = 'ECIES-TEST:%s' % data[-1] + for data in _ECIES_COMPAT_INPUTS: + auth, plaintext, init_vec, ciphertext, tag, salt, info = data[:-1] + test_name = 'ECIES-TEST:%s' % data[-1] - cmd = _decrypt_cmd(auth, ciphertext, iv, d, salt, info) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECIES, cmd)) - decrypted = tpm.unwrap_ext_response(subcmd.ECIES, wrapped_response) + cmd = _decrypt_cmd(auth, ciphertext, init_vec, tag, salt, info) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.ECIES, cmd)) + decrypted = tpm.unwrap_ext_response(subcmd.ECIES, wrapped_response) - expected = auth + plaintext - if decrypted != expected: - raise subcmd.TpmTestError('%s error:%s:%s' % ( - test_name, utils.hex_dump(decrypted), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + expected = auth + plaintext + if decrypted != expected: + raise subcmd.TpmTestError('%s error:%s:%s' % + (test_name, utils.hex_dump(decrypted), + utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def ecies_test(tpm): - _ecies_test(tpm) - _compat_test(tpm) + """Run ECIES cryptographic tests""" + _ecies_test(tpm) + _compat_test(tpm) diff --git a/test/tpm_test/genvectors.py b/test/tpm_test/genvectors.py old mode 100644 new mode 100755 index 593a6aab71..85c9b8a200 --- a/test/tpm_test/genvectors.py +++ b/test/tpm_test/genvectors.py @@ -1,4 +1,5 @@ #!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -6,9 +7,9 @@ """Module for generating AES test vectors.""" from binascii import b2a_hex as b2a -from Crypto.Cipher import AES from itertools import izip_longest import os +from Crypto.Cipher import AES modes = { AES.MODE_CBC: 'CBC', @@ -17,7 +18,7 @@ modes = { } template = \ -''' +""" {pt} @@ -32,11 +33,14 @@ template = \ {iv} -''' +""" def h2be(v): - # Convert input big-endian byte-string to 4-byte segmented - # little-endian words. Pad-bytes (if necessary) are the empty string. + """Convert input big-endian byte-string to 4-byte segmented + + Convert input big-endian byte-string to 4-byte segmented + little-endian words. Pad-bytes (if necessary) are the empty string. + """ word = [iter(v)] * 4 return ''.join([ ''.join(b[::-1]) for b in izip_longest(*word, fillvalue='') @@ -73,5 +77,3 @@ for mode in [AES.MODE_CBC, AES.MODE_CFB, AES.MODE_OFB]: key=b2a(h2be(key)), ct=b2a(h2be(ct[:actual_pt_len])), iv=b2a(h2be(iv))), - - diff --git a/test/tpm_test/hash_test.py b/test/tpm_test/hash_test.py index 9f10894d2e..a518be043c 100644 --- a/test/tpm_test/hash_test.py +++ b/test/tpm_test/hash_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -32,10 +32,10 @@ ALG_SHA512 = 3 # A standard empty response to HASH extended commands. EMPTY_RESPONSE = ''.join('%c' % x for x in (0x80, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01)) -test_inputs = ( +TEST_INPUTS = ( # Hash cmd alg handle hmac_key text (CMD_HMAC_SW, ALG_SHA256, 0, 'hmac_key1', 'some text, this time for sw hmac'), - (CMD_HMAC_SW, ALG_SHA1, 0, 'hmac_key2', 'some text, this time for sw hmac'), + (CMD_HMAC_SW, ALG_SHA1, 0, 'hmac_key2', 'some text, this time for sw hmac'), (CMD_HMAC_SW, ALG_SHA384, 0, 'hmac_key3', 'some text, this time for sw hmac'), (CMD_HMAC_SW, ALG_SHA512, 0, 'hmac_key4', 'some text, this time for sw hmac'), (CMD_HMAC_HW, ALG_SHA256, 0, 'hmac_key5', 'some text, this time for hw hmac'), @@ -43,37 +43,38 @@ test_inputs = ( (CMD_HMAC_HW, ALG_SHA256, 0, 'very long hmac_key 123456789012345', ' text'), (CMD_HMAC_SW, ALG_SHA384, 0, 'very long hmac_key 456456789012345', ' text'), (CMD_HMAC_SW, ALG_SHA512, 0, 'very long hmac_key 456456789012345', ' text'), - (CMD_HASH, ALG_SHA1, 0, '', ''), - (CMD_HASH, ALG_SHA256, 0, '', ''), - (CMD_HASH, ALG_SHA1, 0, '', 'anything really will work here'), - (CMD_HASH, ALG_SHA256, 0, '', 'some more text, this time for sha256'), - (CMD_HASH_START, ALG_SHA256, 1, '', 'some more text, this time for sha256'), - (CMD_HASH_CONT, ALG_SHA256, 1, '', 'some more text, this time for sha256'), - (CMD_HASH_START, ALG_SHA256, 2, '', 'this could be anything here'), - (CMD_HASH, ALG_SHA1, 3, '', 'interleave a SHA1 single shot'), - (CMD_HASH, ALG_SHA256, 3, '', 'interleave a SHA256 single shot'), - (CMD_HASH_START, ALG_SHA1, 3, '', 'let\'s interleave a sha1 calculation'), - (CMD_HASH_CONT, ALG_SHA256, 2, '', 'fill up a second context with data'), - (CMD_HASH_CONT, ALG_SHA256, 1, '', 'let\'s feed some more into context 1'), + (CMD_HASH, ALG_SHA1, 0, '', ''), + (CMD_HASH, ALG_SHA256, 0, '', ''), + (CMD_HASH, ALG_SHA1, 0, '', 'anything really will work here'), + (CMD_HASH, ALG_SHA256, 0, '', 'some more text, this time for sha256'), + (CMD_HASH_START, ALG_SHA256, 1, '', 'some more text, this time for sha256'), + (CMD_HASH_CONT, ALG_SHA256, 1, '', 'some more text, this time for sha256'), + (CMD_HASH_START, ALG_SHA256, 2, '', 'this could be anything here'), + (CMD_HASH, ALG_SHA1, 3, '', 'interleave a SHA1 single shot'), + (CMD_HASH, ALG_SHA256, 3, '', 'interleave a SHA256 single shot'), + (CMD_HASH_START, ALG_SHA1, 3, '', "let's interleave a sha1 calculation"), + (CMD_HASH_CONT, ALG_SHA256, 2, '', 'fill up a second context with data'), + (CMD_HASH_CONT, ALG_SHA256, 1, '', "let's feed some more into context 1"), (CMD_HASH_FINISH, ALG_SHA256, 1, '', 'some more text, this time for sha256'), - (CMD_HASH_CONT, ALG_SHA1, 3, '', 'with two active sha256 calculations'), - (CMD_HASH_FINISH, ALG_SHA1, 3, '', 'this should be enough'), + (CMD_HASH_CONT, ALG_SHA1, 3, '', 'with two active sha256 calculations'), + (CMD_HASH_FINISH, ALG_SHA1, 3, '', 'this should be enough'), (CMD_HASH_FINISH, ALG_SHA256, 2, '', 'it does not really matter what'), - (CMD_HASH, ALG_SHA384, 0, '', 'some more text, this time for sha384'), - (CMD_HASH, ALG_SHA512, 0, '', 'some more text, this time for sha512'), - (CMD_HASH_START, ALG_SHA256, 0, '', 'some more text, this time for sha256'), - (CMD_HASH_START, ALG_SHA384, 1, '', 'some more text, this time for sha384'), - (CMD_HASH_CONT, ALG_SHA384, 1, '', 'some more text, this time for sha384'), - (CMD_HASH_CONT, ALG_SHA256, 0, '', 'some more text, this time for sha256'), - (CMD_HASH_START, ALG_SHA512, 2, '', 'some more text, this time for sha512'), - (CMD_HASH_CONT, ALG_SHA512, 2, '', 'some more text, this time for sha512'), + (CMD_HASH, ALG_SHA384, 0, '', 'some more text, this time for sha384'), + (CMD_HASH, ALG_SHA512, 0, '', 'some more text, this time for sha512'), + (CMD_HASH_START, ALG_SHA256, 0, '', 'some more text, this time for sha256'), + (CMD_HASH_START, ALG_SHA384, 1, '', 'some more text, this time for sha384'), + (CMD_HASH_CONT, ALG_SHA384, 1, '', 'some more text, this time for sha384'), + (CMD_HASH_CONT, ALG_SHA256, 0, '', 'some more text, this time for sha256'), + (CMD_HASH_START, ALG_SHA512, 2, '', 'some more text, this time for sha512'), + (CMD_HASH_CONT, ALG_SHA512, 2, '', 'some more text, this time for sha512'), (CMD_HASH_FINISH, ALG_SHA512, 2, '', 'this should be enough'), (CMD_HASH_FINISH, ALG_SHA256, 0, '', 'this should be enough'), (CMD_HASH_FINISH, ALG_SHA384, 1, '', 'this should be enough'), ) def hash_test(tpm): - """Exercise multiple hash threads simultaneously. +# pylint: disable=too-many-locals + """Exercise multiple hash threads simultaneously. Command structure, shared out of band with the test running on the target: @@ -89,72 +90,74 @@ def hash_test(tpm): for HMAC single shot only: key_len | 2 | size of the key for HMAC, big endian key | key_len | key for HMAC single shot - Args: - tpm: a tpm object used to communicate with the device - - Raises: - subcmd.TpmTestError: on unexpected target responses - """ - - contexts = {} - - alg_map = { - ALG_SHA1: ('sha1', hashlib.sha1), - ALG_SHA256: ('sha256', hashlib.sha256), - ALG_SHA384: ('sha384', hashlib.sha384), - ALG_SHA512: ('sha512', hashlib.sha512), - } - - cmd_map = { - CMD_HASH_START: 'hash start', - CMD_HASH_CONT: 'hash cont', - CMD_HASH_FINISH: 'hash finish', - CMD_HASH: 'hash', - CMD_HMAC_SW: 'hmac sw', - CMD_HMAC_HW: 'hmac hw' - } - - for test in test_inputs: - hash_cmd, hash_alg, handle, hmac_key, text = test - mode_name = cmd_map[hash_cmd] - alg_name, hash_func = alg_map[hash_alg] - - test_name = '%s:%s:%d' % (mode_name, alg_name, handle) - - cmd = '%c' % hash_cmd - cmd += '%c' % hash_alg - cmd += '%c' % handle # Ignored for single shots - - cmd += struct.pack('>H', len(text)) - cmd += text - # for HMAC add key - if hash_cmd in (CMD_HMAC_SW, CMD_HMAC_HW): - cmd += struct.pack('>H', len(hmac_key)) - cmd += hmac_key - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.HASH, cmd)) - if hash_cmd in (CMD_HASH_START, CMD_HASH_CONT): - if hash_cmd == CMD_HASH_START: - contexts[handle] = hash_func() - h = contexts[handle] - h.update(text) - if wrapped_response != EMPTY_RESPONSE: - raise subcmd.TpmTestError("Unexpected response to '%s': %s" % - (test_name, utils.hex_dump(wrapped_response))) - continue - if hash_cmd == CMD_HASH_FINISH: - h = contexts[handle] - elif hash_cmd == CMD_HASH: - h = hash_func() - elif hash_cmd in (CMD_HMAC_SW, CMD_HMAC_HW): - h = hmac.new(bytes(hmac_key), digestmod=hash_func) - else: - raise subcmd.TpmTestError('Unknown command %d' % hash_cmd) - h.update(text) - digest = h.digest() - result = wrapped_response[12:] - if result != h.digest(): - raise subcmd.TpmTestError('%s error:%s%s' % (test_name, - utils.hex_dump(digest), - utils.hex_dump(result))) - - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + + Args: + tpm: a tpm object used to communicate with the device + + Raises: + subcmd.TpmTestError: on unexpected target responses + """ + + contexts = {} + + alg_map = { + ALG_SHA1: ('sha1', hashlib.sha1), + ALG_SHA256: ('sha256', hashlib.sha256), + ALG_SHA384: ('sha384', hashlib.sha384), + ALG_SHA512: ('sha512', hashlib.sha512), + } + + cmd_map = { + CMD_HASH_START: 'hash start', + CMD_HASH_CONT: 'hash cont', + CMD_HASH_FINISH: 'hash finish', + CMD_HASH: 'hash', + CMD_HMAC_SW: 'hmac sw', + CMD_HMAC_HW: 'hmac hw' + } + + for test in TEST_INPUTS: + hash_cmd, hash_alg, handle, hmac_key, text = test + mode_name = cmd_map[hash_cmd] + alg_name, hash_func = alg_map[hash_alg] + + test_name = '%s:%s:%d' % (mode_name, alg_name, handle) + + cmd = '%c' % hash_cmd + cmd += '%c' % hash_alg + cmd += '%c' % handle # Ignored for single shots + + cmd += struct.pack('>H', len(text)) + cmd += text + # for HMAC add key + if hash_cmd in (CMD_HMAC_SW, CMD_HMAC_HW): + cmd += struct.pack('>H', len(hmac_key)) + cmd += hmac_key + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.HASH, cmd)) + if hash_cmd in (CMD_HASH_START, CMD_HASH_CONT): + if hash_cmd == CMD_HASH_START: + contexts[handle] = hash_func() + hash_context = contexts[handle] + hash_context.update(text) + if wrapped_response != EMPTY_RESPONSE: + raise subcmd.TpmTestError("Unexpected response to '%s': %s" % + (test_name, + utils.hex_dump(wrapped_response))) + continue + if hash_cmd == CMD_HASH_FINISH: + hash_context = contexts[handle] + elif hash_cmd == CMD_HASH: + hash_context = hash_func() + elif hash_cmd in (CMD_HMAC_SW, CMD_HMAC_HW): + hash_context = hmac.new(bytes(hmac_key), digestmod=hash_func) + else: + raise subcmd.TpmTestError('Unknown command %d' % hash_cmd) + hash_context.update(text) + digest = hash_context.digest() + result = wrapped_response[12:] + if result != hash_context.digest(): + raise subcmd.TpmTestError('%s error:%s%s' % + (test_name, utils.hex_dump(digest), + utils.hex_dump(result))) + + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) diff --git a/test/tpm_test/hkdf_test.py b/test/tpm_test/hkdf_test.py index 3330d50105..8fe8fe8b56 100644 --- a/test/tpm_test/hkdf_test.py +++ b/test/tpm_test/hkdf_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -13,7 +13,7 @@ import utils _HKDF_OPCODES = { - 'TEST_RFC': 0x00, + 'TEST_RFC': 0x00, } @@ -37,12 +37,12 @@ _HKDF_CMD_FORMAT = '{op:c}{sl:s}{salt}{ikml:s}{ikm}{infol:s}{info}{okml:s}' def _rfc_test_cmd(salt, ikm, info, okml): - op = _HKDF_OPCODES['TEST_RFC'] - return _HKDF_CMD_FORMAT.format(op=op, - sl=pack('>H', len(salt)), salt=salt, - ikml=pack('>H', len(ikm)), ikm=ikm, - infol=pack('>H', len(info)), info=info, - okml=pack('>H', okml)) + hkdf_op = _HKDF_OPCODES['TEST_RFC'] + return _HKDF_CMD_FORMAT.format(op=hkdf_op, + sl=pack('>H', len(salt)), salt=salt, + ikml=pack('>H', len(ikm)), ikm=ikm, + infol=pack('>H', len(info)), info=info, + okml=pack('>H', okml)) # @@ -56,8 +56,8 @@ _RFC_TEST_INPUTS = ( ('3cb25f25faacd57a90434f64d0362f2a' '2d2d0a90cf1a5a4c5db02d56ecc4c5bf' '34007208d5b887185865'), - 'BASIC', - ), + 'BASIC', + ), ( ('000102030405060708090a0b0c0d0e0f' '101112131415161718191a1b1c1d1e1f' @@ -80,8 +80,8 @@ _RFC_TEST_INPUTS = ( 'da3275600c2f09b8367793a9aca3db71' 'cc30c58179ec3e87c14c01d5c1f3434f' '1d87'), - 'LONG INPUTS', - ), + 'LONG INPUTS', + ), ( '0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b', '', @@ -89,24 +89,25 @@ _RFC_TEST_INPUTS = ( ('8da4e775a563c18f715f802a063c5a31' 'b8a11f5c5ee1879ec3454e5f3c738d2d' '9d201395faa4b61a96c8'), - 'ZERO SALT/INFO', - ) + 'ZERO SALT/INFO', + ) ) def _rfc_tests(tpm): - for data in _RFC_TEST_INPUTS: - IKM, salt, info, OKM = map(a2b, data[:-1]) - test_name = 'HKDF:SHA256:%s' % data[-1] - cmd = _rfc_test_cmd(salt, IKM, info, len(OKM)) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.HKDF, cmd)) - result = tpm.unwrap_ext_response(subcmd.HKDF, wrapped_response) + for data in _RFC_TEST_INPUTS: + ikm, salt, info, okm = map(a2b, data[:-1]) + test_name = 'HKDF:SHA256:%s' % data[-1] + cmd = _rfc_test_cmd(salt, ikm, info, len(okm)) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.HKDF, cmd)) + result = tpm.unwrap_ext_response(subcmd.HKDF, wrapped_response) - if result != OKM: - raise subcmd.TpmTestError('%s error:%s%s' % ( - test_name, utils.hex_dump(result), utils.hex_dump(OKM))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + if result != okm: + raise subcmd.TpmTestError('%s error:%s%s' % ( + test_name, utils.hex_dump(result), utils.hex_dump(okm))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def hkdf_test(tpm): - _rfc_tests(tpm) + """Run HKDF tests""" + _rfc_tests(tpm) diff --git a/test/tpm_test/rsa_test.py b/test/tpm_test/rsa_test.py index e411df57b0..1d377b3ae2 100644 --- a/test/tpm_test/rsa_test.py +++ b/test/tpm_test/rsa_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -6,6 +6,9 @@ """Module for testing rsa functions using extended commands.""" import binascii +import os +import struct + import Crypto import Crypto.Hash.SHA import Crypto.Hash.SHA256 @@ -14,10 +17,7 @@ import Crypto.Hash.SHA512 from Crypto.PublicKey import RSA import Crypto.Signature.PKCS1_PSS import Crypto.Signature.PKCS1_v1_5 -import hashlib -import os import rsa -import struct import subcmd import utils @@ -93,91 +93,93 @@ _RSA_CMD_FORMAT = '{o:c}{p:c}{h:c}{kl:s}{ml:s}{msg}{dl:s}{dig}' def _decrypt_cmd(padding, hashing, key_len, msg): - op = _RSA_OPCODES['DECRYPT'] - msg_len = len(msg) - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', msg_len), msg=msg, - dl='', dig='') + rsa_op = _RSA_OPCODES['DECRYPT'] + msg_len = len(msg) + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', msg_len), msg=msg, + dl='', dig='') def _encrypt_cmd(padding, hashing, key_len, msg): - op = _RSA_OPCODES['ENCRYPT'] - msg_len = len(msg) - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', msg_len), msg=msg, - dl='', dig='') + rsa_op = _RSA_OPCODES['ENCRYPT'] + msg_len = len(msg) + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', msg_len), msg=msg, + dl='', dig='') def _sign_cmd(padding, hashing, key_len, digest): - op = _RSA_OPCODES['SIGN'] - digest_len = len(digest) - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', digest_len), msg=digest, - dl='', dig='') + rsa_op = _RSA_OPCODES['SIGN'] + digest_len = len(digest) + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', digest_len), msg=digest, + dl='', dig='') def _verify_cmd(padding, hashing, key_len, sig, digest): - op = _RSA_OPCODES['VERIFY'] - sig_len = len(sig) - digest_len = len(digest) - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', sig_len), msg=sig, - dl=struct.pack('>H', digest_len), dig=digest) + rsa_op = _RSA_OPCODES['VERIFY'] + sig_len = len(sig) + digest_len = len(digest) + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', sig_len), msg=sig, + dl=struct.pack('>H', digest_len), dig=digest) def _keytest_cmd(key_len): - op = _RSA_OPCODES['KEYTEST'] - return _RSA_CMD_FORMAT.format(o=op, p=0, h=_HASH['NONE'], - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', 0), msg='', - dl='', dig='') + rsa_op = _RSA_OPCODES['KEYTEST'] + return _RSA_CMD_FORMAT.format(o=rsa_op, p=0, h=_HASH['NONE'], + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', 0), msg='', + dl='', dig='') -def _keygen_cmd(key_len, e, label): - op = _RSA_OPCODES['KEYGEN'] - padding = _RSA_PADDING['NONE'] - hashing = _HASH['NONE'] - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', len(label)), msg=label, - dl=struct.pack('>H', 0), dig='') +def _keygen_cmd(key_len, exponent, label): + assert exponent == 65537 + rsa_op = _RSA_OPCODES['KEYGEN'] + padding = _RSA_PADDING['NONE'] + hashing = _HASH['NONE'] + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', len(label)), msg=label, + dl=struct.pack('>H', 0), dig='') def _primegen_cmd(seed): - op = _RSA_OPCODES['PRIMEGEN'] - padding = _RSA_PADDING['NONE'] - hashing = _HASH['NONE'] - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', len(seed) * 8 * 2), - ml=struct.pack('>H', len(seed)), msg=seed, - dl=struct.pack('>H', 0), dig='') + rsa_op = _RSA_OPCODES['PRIMEGEN'] + padding = _RSA_PADDING['NONE'] + hashing = _HASH['NONE'] + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', len(seed) * 8 * 2), + ml=struct.pack('>H', len(seed)), msg=seed, + dl=struct.pack('>H', 0), dig='') def _x509_verify_cmd(key_len): - op = _RSA_OPCODES['X509_VERIFY'] - padding = _RSA_PADDING['NONE'] - hashing = _HASH['NONE'] - return _RSA_CMD_FORMAT.format(o=op, p=padding, h=hashing, - kl=struct.pack('>H', key_len), - ml=struct.pack('>H', 0), msg='', - dl=struct.pack('>H', 0), dig='') + rsa_op = _RSA_OPCODES['X509_VERIFY'] + padding = _RSA_PADDING['NONE'] + hashing = _HASH['NONE'] + return _RSA_CMD_FORMAT.format(o=rsa_op, p=padding, h=hashing, + kl=struct.pack('>H', key_len), + ml=struct.pack('>H', 0), msg='', + dl=struct.pack('>H', 0), dig='') _PRIMES = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, - 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, - 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, - 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, - 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, - 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, - 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, - 563, 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, - 643, 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, - 739, 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, - 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, - 937, 941, 947, 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, + 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, + 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, + 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, + 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, + 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, + 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, + 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599, + 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, 673, + 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, + 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853, 857, + 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941, 947, + 953, 967, 971, 977, 983, 991, 997, 1009, 1013, 1019, 1021, 1031, 1033, 1039, 1049, 1051, 1061, 1063, 1069, 1087, 1091, 1093, 1097, 1103, 1109, 1117, 1123, 1129, 1151, 1153, 1163, 1171, 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, @@ -564,28 +566,28 @@ _PRIMES = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, def _prime_from_seed(seed): - ROUNDS = 7 - - def _window(s, primes): - w = [0] * 4096 - for i in primes: - rem = s % i - if rem != 0: - rem = i - rem - for j in range(rem, len(w), i): - w[j] = 1 - return w - - # Set LSB, and top two bits. - candidate = chr(ord(seed[0]) | 192) + seed[1:-1] + chr(ord(seed[-1]) | 1) - candidate = int(binascii.b2a_hex(candidate), 16) - assert len(bin(candidate)[2:]) == len(seed) * 8 - w = _window(candidate, _PRIMES[:4096]) - for i, bit in enumerate(w): - if not bit: - if rsa.prime.randomized_primality_testing(candidate + i, ROUNDS): - return candidate + i - return None + rounds = 7 + + def _window(candidate, primes): + window = [0] * 4096 + for i in primes: + rem = candidate % i + if rem != 0: + rem = i - rem + for j in range(rem, len(window), i): + window[j] = 1 + return window + + # Set LSB, and top two bits. + candidate = chr(ord(seed[0]) | 192) + seed[1:-1] + chr(ord(seed[-1]) | 1) + candidate = int(binascii.b2a_hex(candidate), 16) + assert len(bin(candidate)[2:]) == len(seed) * 8 + window = _window(candidate, _PRIMES[:4096]) + for i, bit in enumerate(window): + if not bit: + if rsa.prime.randomized_primality_testing(candidate + i, rounds): + return candidate + i + return None # @@ -633,7 +635,8 @@ _KEYTEST_INPUTS = ( _KEYGEN_INPUTS = ( (768, 65537, '', None), (1024, 65537, 'rsa_test', None), - (2048, 65537, 'RSA key by vendor', 20811475686431332186511278472307159547870512766846593830860105577496044159545322178313772755518365593670114793803805067608811418757734989708137784444223785391864604211835387393923163468734914392307047296990698533218399115126417934050463597455237478939601236799120239663591264311485133747167378663829046579164891864068853210530642835833947569643788911200934265596274935082689832626616967124524353322373059893974744194447740045242468136414689225322177212281193879756355471091445748150740871146034049776312457888356154834233819876846764944450478069436248506560967902863015152471662817623176815923756421011384149834497587L), + # pylint: disable=line-too-long + (2048, 65537, 'RSA key by vendor', 20811475686431332186511278472307159547870512766846593830860105577496044159545322178313772755518365593670114793803805067608811418757734989708137784444223785391864604211835387393923163468734914392307047296990698533218399115126417934050463597455237478939601236799120239663591264311485133747167378663829046579164891864068853210530642835833947569643788911200934265596274935082689832626616967124524353322373059893974744194447740045242468136414689225322177212281193879756355471091445748150740871146034049776312457888356154834233819876846764944450478069436248506560967902863015152471662817623176815923756421011384149834497587), (2048, 65537, '', None), ) @@ -650,169 +653,162 @@ _PRIMEGEN_INPUTS = ( ) def _encrypt_tests(tpm): - msg = 'Hello CR50!' - - for data in _ENCRYPT_INPUTS: - padding, hashing, key_len = data - test_name = 'RSA-ENC:%s:%s:%d' % data - cmd = _encrypt_cmd(_RSA_PADDING[padding], _HASH[hashing], key_len, msg) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - ciphertext = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - - cmd = _decrypt_cmd(_RSA_PADDING[padding], _HASH[hashing], - key_len, ciphertext) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - plaintext = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - if padding == 'NULL': - # Check for leading zeros. - if reduce(lambda x, y: x | y, - map(ord, plaintext[:len(plaintext) - len(msg)])): - raise subcmd.TpmTestError('%s error:%s%s' % ( - test_name, utils.hex_dump(msg), utils.hex_dump(plaintext))) - else: - plaintext = plaintext[len(plaintext) - len(msg):] - if msg != plaintext: - raise subcmd.TpmTestError('%s error:%s%s' % ( - test_name, utils.hex_dump(msg), utils.hex_dump(plaintext))) + msg = 'Hello CR50!' + + for data in _ENCRYPT_INPUTS: + padding, hashing, key_len = data + test_name = 'RSA-ENC:%s:%s:%d' % data + cmd = _encrypt_cmd(_RSA_PADDING[padding], _HASH[hashing], key_len, msg) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + ciphertext = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + + cmd = _decrypt_cmd(_RSA_PADDING[padding], _HASH[hashing], + key_len, ciphertext) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + plaintext = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + if padding == 'NULL' and msg != plaintext[-len(msg):]: + raise subcmd.TpmTestError('%s error:%s%s' % + (test_name, utils.hex_dump(msg), + utils.hex_dump(plaintext))) print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _sign_tests(tpm): - for data in _SIGN_INPUTS: - msg = rsa.randnum.read_random_bits(256) - padding, hashing, key_len = data - test_name = 'RSA-SIGN:%s:%s:%d' % data + for data in _SIGN_INPUTS: + msg = rsa.randnum.read_random_bits(256) + padding, hashing, key_len = data + test_name = 'RSA-SIGN:%s:%s:%d' % data - key = _KEYS[key_len] - verifier = _SIGNER[padding].new(key) - h = _HASHER[hashing].new() - h.update(msg) + key = _KEYS[key_len] + verifier = _SIGNER[padding].new(key) + msg_hash = _HASHER[hashing].new() + msg_hash.update(msg) - cmd = _sign_cmd(_RSA_PADDING[padding], _HASH[hashing], key_len, h.digest()) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - signature = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + cmd = _sign_cmd(_RSA_PADDING[padding], _HASH[hashing], + key_len, msg_hash.digest()) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + signature = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - signer = _SIGNER[padding].new(key) - expected_signature = signer.sign(h) - - if not verifier.verify(h, signature): - raise subcmd.TpmTestError('%s error' % ( - test_name,)) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + if not verifier.verify(msg_hash, signature): + raise subcmd.TpmTestError('%s error' % (test_name,)) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _verify_tests(tpm): - for data in _VERIFY_INPUTS: - msg = rsa.randnum.read_random_bits(256) - padding, hashing, key_len = data - test_name = 'RSA-VERIFY:%s:%s:%d' % data - - key = _KEYS[key_len] - signer = _SIGNER[padding].new(key) - h = _HASHER[hashing].new() - h.update(msg) - signature = signer.sign(h) - - cmd = _verify_cmd(_RSA_PADDING[padding], _HASH[hashing], - key_len, signature, h.digest()) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - verified = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - expected = '\x01' - if verified != expected: - raise subcmd.TpmTestError('%s error:%s%s' % ( - test_name, utils.hex_dump(verified), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _VERIFY_INPUTS: + msg = rsa.randnum.read_random_bits(256) + padding, hashing, key_len = data + test_name = 'RSA-VERIFY:%s:%s:%d' % data + + key = _KEYS[key_len] + signer = _SIGNER[padding].new(key) + msg_hash = _HASHER[hashing].new() + msg_hash.update(msg) + signature = signer.sign(msg_hash) + + cmd = _verify_cmd(_RSA_PADDING[padding], _HASH[hashing], + key_len, signature, msg_hash.digest()) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + verified = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + expected = '\x01' + if verified != expected: + raise subcmd.TpmTestError('%s error:%s%s' % ( + test_name, utils.hex_dump(verified), utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _keytest_tests(tpm): - for data in _KEYTEST_INPUTS: - key_len, = data - test_name = 'RSA-KEYTEST:%d' % data - cmd = _keytest_cmd(key_len) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - valid = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - expected = '\x01' - if valid != expected: - raise subcmd.TpmTestError('%s error:%s%s' % ( - test_name, utils.hex_dump(valid), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _KEYTEST_INPUTS: + key_len, = data + test_name = 'RSA-KEYTEST:%d' % data + cmd = _keytest_cmd(key_len) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + valid = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + expected = '\x01' + if valid != expected: + raise subcmd.TpmTestError('%s error:%s%s' % ( + test_name, utils.hex_dump(valid), utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _keygen_tests(tpm): - for data in _KEYGEN_INPUTS: - key_len, e, label, expected_N = data - test_name = 'RSA-KEYGEN:%d:%d:%s' % data[:-1] - cmd = _keygen_cmd(key_len, e, label) - - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - result = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - result_len = len(result) - if result_len != int(key_len / 8 * 1.5): - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - - N = int(binascii.b2a_hex(result[0:result_len * 2 / 3]), 16) - if expected_N and N != expected_N: - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - p = int(binascii.b2a_hex(result[result_len * 2 / 3:]), 16) - q = N / p - if not rsa.prime.is_prime(p): - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - if not rsa.prime.is_prime(q): - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - if p == q: - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _KEYGEN_INPUTS: +# N, p, q - are common names for RSA, so allow it's use +# pylint: disable=invalid-name + key_len, exponent, label, expected_N = data + test_name = 'RSA-KEYGEN:%d:%d:%s' % data[:-1] + cmd = _keygen_cmd(key_len, exponent, label) + + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + result = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + result_len = len(result) + if result_len != int(key_len / 8 * 1.5): + raise subcmd.TpmTestError('%s error:%s' % ( + test_name, utils.hex_dump(result))) + + N = int(binascii.b2a_hex(result[0:result_len * 2 / 3]), 16) + if expected_N and N != expected_N: + raise subcmd.TpmTestError('%s error:%s' % + (test_name, utils.hex_dump(result))) + p = int(binascii.b2a_hex(result[result_len * 2 / 3:]), 16) + q = N / p + if not rsa.prime.is_prime(p): + raise subcmd.TpmTestError('%s error:%s' % + (test_name, utils.hex_dump(result))) + if not rsa.prime.is_prime(q): + raise subcmd.TpmTestError('%s error:%s' % + (test_name, utils.hex_dump(result))) + if p == q: + raise subcmd.TpmTestError('%s error:%s' % + (test_name, utils.hex_dump(result))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _primegen_tests(tpm): - for data in _PRIMEGEN_INPUTS: - key_len = data - test_name = 'RSA-PRIMEGEN:%d' % data - seed = rsa.randnum.read_random_bits(key_len / 2) - assert len(seed) == key_len / 16 - # dcrypto interface is little-endian. - cmd = _primegen_cmd(seed[::-1]) - - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - result = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - result_len = len(result) - if result_len != key_len / 16: - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - - p = int(binascii.b2a_hex(result[::-1]), 16) - if not rsa.prime.is_prime(p): - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - calculated = _prime_from_seed(seed) - if p != calculated: - raise subcmd.TpmTestError('%s error:%s' % ( - test_name, utils.hex_dump(result))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + for data in _PRIMEGEN_INPUTS: + key_len = data + test_name = 'RSA-PRIMEGEN:%d' % data + seed = rsa.randnum.read_random_bits(key_len / 2) + assert len(seed) == key_len / 16 + # dcrypto interface is little-endian. + cmd = _primegen_cmd(seed[::-1]) + + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + result = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + result_len = len(result) + if result_len != key_len / 16: + raise subcmd.TpmTestError('%s error:%s' % ( + test_name, utils.hex_dump(result))) + + prime = int(binascii.b2a_hex(result[::-1]), 16) + if not rsa.prime.is_prime(prime): + raise subcmd.TpmTestError('%s error:%s' % ( + test_name, utils.hex_dump(result))) + calculated = _prime_from_seed(seed) + if prime != calculated: + raise subcmd.TpmTestError('%s error:%s' % ( + test_name, utils.hex_dump(result))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def _x509_verify_tests(tpm): - test_name = 'RSA-X509-2048-VERIFY' - cmd = _x509_verify_cmd(2048) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) - valid = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) - expected = '\x01' - if valid != expected: - raise subcmd.TpmTestError('%s error:%s%s' % ( - test_name, utils.hex_dump(valid), utils.hex_dump(expected))) - print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) + test_name = 'RSA-X509-2048-VERIFY' + cmd = _x509_verify_cmd(2048) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.RSA, cmd)) + valid = tpm.unwrap_ext_response(subcmd.RSA, wrapped_response) + expected = '\x01' + if valid != expected: + raise subcmd.TpmTestError('%s error:%s%s' % ( + test_name, utils.hex_dump(valid), utils.hex_dump(expected))) + print('%sSUCCESS: %s' % (utils.cursor_back(), test_name)) def rsa_test(tpm): - _encrypt_tests(tpm) - _sign_tests(tpm) - _verify_tests(tpm) - _keytest_tests(tpm) - _keygen_tests(tpm) - _primegen_tests(tpm) - _x509_verify_tests(tpm) + """Run RSA tests""" + _encrypt_tests(tpm) + _sign_tests(tpm) + _verify_tests(tpm) + _keytest_tests(tpm) + _keygen_tests(tpm) + _primegen_tests(tpm) + _x509_verify_tests(tpm) diff --git a/test/tpm_test/subcmd.py b/test/tpm_test/subcmd.py index 7260df0dd5..9b3c3c9368 100644 --- a/test/tpm_test/subcmd.py +++ b/test/tpm_test/subcmd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -16,4 +16,4 @@ ECIES = 6 DRBG_TEST = 50 # The same exception class used by all tpmtest modules. class TpmTestError(Exception): - pass + """TpmTestError exception class""" diff --git a/test/tpm_test/tpmtest.py b/test/tpm_test/tpmtest.py index 11218cbcc6..96f2587396 100755 --- a/test/tpm_test/tpmtest.py +++ b/test/tpm_test/tpmtest.py @@ -1,8 +1,8 @@ #!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. - """Module for initializing and driving a SPI TPM.""" from __future__ import print_function @@ -15,9 +15,9 @@ import traceback # Suppressing pylint warning about an import not at the top of the file. The # path needs to be set *before* the last import. -# pylint: disable=C6204 -root_dir = os.path.dirname(os.path.abspath(sys.argv[0])) -sys.path.append(os.path.join(root_dir, '..', '..', 'build', 'tpm_test')) +# pylint: disable=wrong-import-position +ROOT_DIR = os.path.dirname(os.path.abspath(sys.argv[0])) +sys.path.append(os.path.join(ROOT_DIR, '..', '..', 'build', 'tpm_test')) import crypto_test import drbg_test @@ -36,144 +36,162 @@ EXT_CMD = 0xbaccd00a class TPM(object): - """TPM accessor class. - - Object of this class allows to send valid and extended TPM commands (using - the command() method. The wrap_command/unwrap_response methods provide a - means of encapsulating extended commands in proper TPM data packets, as well - as extracting extended command responses. - - Attributes: - _handle: a ftdi_spi_tpm object, a USB/FTDI/SPI driver which allows - communicate with a TPM connected over USB dongle. - """ - - HEADER_FMT = '>H2IH' - STARTUP_CMD = '80 01 00 00 00 0c 00 00 01 44 00 00' - STARTUP_RSP = ('80 01 00 00 00 0a 00 00 00 00', - '80 01 00 00 00 0a 00 00 01 00') - - def __init__(self, freq=800*1000, debug_mode=False): - self._debug_enabled = debug_mode - self._handle = ftdi_spi_tpm - if not self._handle.FtdiSpiInit(freq, debug_mode): - raise subcmd.TpmTestError('Failed to connect') - - def validate(self, data_blob, response_mode=False): - """Check if a data blob complies with TPM command/response header format.""" - (tag, size, cmd_code, _) = struct.unpack_from( - self.HEADER_FMT, data_blob + ' ') - prefix = 'Misformatted blob: ' - if tag not in (0x8001, 0x8002): - raise subcmd.TpmTestError(prefix + 'bad tag value 0x%4.4x' % tag) - if size != len(data_blob): - raise subcmd.TpmTestError(prefix + 'size mismatch: header %d, actual %d' - % (size, len(data_blob))) - if size > 4096: - raise subcmd.TpmTestError(prefix + 'invalid size %d' % size) - if response_mode: - # Startup response code, extension or vendor command response code - if cmd_code == 0x100 or cmd_code == 0 or cmd_code == 0x500: - return - else: - raise subcmd.TpmTestError( - prefix + 'invalid response code 0x%x' % cmd_code) - if cmd_code >= 0x11f and cmd_code <= 0x18f: - return # This is a valid command - if cmd_code == EXT_CMD: - return # This is an extension command - if cmd_code >= 0x20000000 and cmd_code <= 0x200001ff: - return # this is vendor command - raise subcmd.TpmTestError(prefix + 'invalid command code 0x%x' % cmd_code) - - def command(self, cmd_data): - # Verify command header - self.validate(cmd_data) - response = self._handle.FtdiSendCommandAndWait(cmd_data) - self.validate(response, response_mode=True) - return response - - def wrap_ext_command(self, subcmd_code, cmd_body): - return struct.pack(self.HEADER_FMT, 0x8001, - len(cmd_body) + struct.calcsize(self.HEADER_FMT), - EXT_CMD, subcmd_code) + cmd_body - - def unwrap_ext_response(self, expected_subcmd, response): - """Verify basic validity and strip off TPM extended command header. - - Get the response generated by the device, as it came off the wire, verify - that header fields match expectations, then strip off the extension - command header and return the payload to the caller. - - Args: - expected_subcmd: an int, up to 16 bits in size, the extension command - this response is supposed to be for. - response: a binary string, the actual response received over the wire. - Returns: - the binary string of the response payload, if validation succeeded. - Raises: - subcmd.TpmTestError: in case there are any validation problems, the - error message describes the problem. + """TPM accessor class. + + Object of this class allows to send valid and extended TPM commands (using + the command() method. The wrap_command/unwrap_response methods provide a + means of encapsulating extended commands in proper TPM data packets, as well + as extracting extended command responses. + + Attributes: + _handle: a ftdi_spi_tpm object, a USB/FTDI/SPI driver which allows + communicate with a TPM connected over USB dongle. """ - header_size = struct.calcsize(self.HEADER_FMT) - tag, size, cmd, sub = struct.unpack(self.HEADER_FMT, - response[:header_size]) - if tag != 0x8001: - raise subcmd.TpmTestError('Wrong response tag: %4.4x' % tag) - if cmd: - raise subcmd.TpmTestError('Unexpected response command field: %8.8x' % - cmd) - if sub != expected_subcmd: - raise subcmd.TpmTestError('Unexpected response subcommand field: %2.2x' % - sub) - if size != len(response): - raise subcmd.TpmTestError('Size mismatch: header %d, actual %d' % ( - size, len(response))) - return response[header_size:] - - def debug_enabled(self): - return self._debug_enabled + + HEADER_FMT = '>H2IH' + STARTUP_CMD = '80 01 00 00 00 0c 00 00 01 44 00 00' + STARTUP_RSP = ('80 01 00 00 00 0a 00 00 00 00', + '80 01 00 00 00 0a 00 00 01 00') + + def __init__(self, freq=800*1000, debug_mode=False): + self._debug_enabled = debug_mode + self._handle = ftdi_spi_tpm + if not self._handle.FtdiSpiInit(freq, debug_mode): + raise subcmd.TpmTestError('Failed to connect') + + def validate(self, data_blob, response_mode=False): + """Validate TPM header format + + Check if a data blob complies with TPM command/response + header format. + """ + + (tag, size, cmd_code, _) = struct.unpack_from( + self.HEADER_FMT, data_blob + ' ') + prefix = 'Misformatted blob: ' + if tag not in (0x8001, 0x8002): + raise subcmd.TpmTestError(prefix + 'bad tag value 0x%4.4x' % tag) + if size != len(data_blob): + raise subcmd.TpmTestError(prefix + + 'size mismatch: header %d, actual %d' + % (size, len(data_blob))) + if size > 4096: + raise subcmd.TpmTestError(prefix + 'invalid size %d' % size) + if response_mode: + # Startup response code, extension or vendor command response code + if cmd_code not in (0, 0x100, 0x500): + raise subcmd.TpmTestError( + prefix + 'invalid response code 0x%x' % cmd_code) + return + if 0x11f <= cmd_code <= 0x18f: + return # This is a valid command + if cmd_code == EXT_CMD: + return # This is an extension command + if 0x20000000 <= cmd_code <= 0x200001ff: + return # this is vendor command + raise subcmd.TpmTestError(prefix + 'invalid command code 0x%x' + % cmd_code) + + def command(self, cmd_data): + """Verify command header""" + self.validate(cmd_data) + response = self._handle.FtdiSendCommandAndWait(cmd_data) + self.validate(response, response_mode=True) + return response + + def wrap_ext_command(self, subcmd_code, cmd_body): + """Wrap TPM command into extension command header""" + return struct.pack(self.HEADER_FMT, 0x8001, + len(cmd_body) + struct.calcsize(self.HEADER_FMT), + EXT_CMD, subcmd_code) + cmd_body + + def unwrap_ext_response(self, expected_subcmd, response): + """Verify basic validity and strip off TPM extended command header. + + Get the response generated by the device, as it came off the wire, + verify that header fields match expectations, then strip off the + extension command header and return the payload to the caller. + + Args: + expected_subcmd: an int, up to 16 bits in size, the extension + command this response is supposed to be for. + response: a binary string, the actual response received + over the wire. + + Returns: + the binary string of the response payload, + if validation succeeded. + + Raises: + subcmd.TpmTestError: in case there are any validation problems, + the error message describes the problem. + """ + header_size = struct.calcsize(self.HEADER_FMT) + tag, size, cmd, sub = struct.unpack(self.HEADER_FMT, + response[:header_size]) + if tag != 0x8001: + raise subcmd.TpmTestError('Wrong response tag: %4.4x' % tag) + if cmd: + raise subcmd.TpmTestError('Unexpected response command' + ' field: %8.8x' % cmd) + if sub != expected_subcmd: + raise subcmd.TpmTestError('Unexpected response subcommand' + ' field: %2.2x' % sub) + if size != len(response): + raise subcmd.TpmTestError('Size mismatch: header %d, actual %d' % + (size, len(response))) + return response[header_size:] + + def debug_enabled(self): + """Return status of debugging""" + return self._debug_enabled def usage(): - print ('Syntax: tpmtest.py [-d | -t | -h ]\n' - ' -d - prints additional debug information during tests\n' - ' -t - dump raw output from TRNG to /tmp/trng_output\n' - ' -h - this help\n') - return + """Print usage information""" + print('Syntax: tpmtest.py [-d | -t | -h ]\n' + ' -d - prints additional debug information during tests\n' + ' -t - dump raw output from TRNG to /tmp/trng_output\n' + ' -h - this help\n') + +def main(): + """Run TPM tests""" + try: + opts, _ = getopt.getopt(sys.argv[1:], 'dth', 'help') + except getopt.GetoptError as err: + print(str(err)) + usage() + sys.exit(2) + debug_needed = False + trng_only = False + for option, _ in opts: + if option == '-d': + debug_needed = True + elif option == '-t': + trng_only = True + elif option in ('-h', '--help'): + usage() + sys.exit(0) + try: + tpm_object = TPM(debug_mode=debug_needed) + if trng_only: + trng_test.trng_test(tpm_object) + sys.exit(1) + crypto_test.crypto_tests(tpm_object, os.path.join(ROOT_DIR, + 'crypto_test.xml')) + drbg_test.drbg_test(tpm_object) + ecc_test.ecc_test(tpm_object) + ecies_test.ecies_test(tpm_object) + hash_test.hash_test(tpm_object) + hkdf_test.hkdf_test(tpm_object) + rsa_test.rsa_test(tpm_object) + upgrade_test.upgrade(tpm_object) + except subcmd.TpmTestError as tpm_exc: + exc_file, exc_line = traceback.extract_tb(sys.exc_traceback)[-1][:2] + print('\nError in %s:%s: ' % (os.path.basename(exc_file), exc_line), + tpm_exc) + if debug_needed: + traceback.print_exc() + sys.exit(1) if __name__ == '__main__': - try: - opts, args = getopt.getopt(sys.argv[1:], 'dth','help') - except getopt.GetoptError as err: - print(str(err)) - usage() - sys.exit(2) - debug_needed = False - trng_only = False - for o, a in opts: - if o == '-d': - debug_needed = True - elif o == '-t': - trng_only = True - elif o == '-h' or o == '--help': - usage() - sys.exit(0) - try: - t = TPM(debug_mode=debug_needed) - if trng_only: - trng_test.trng_test(t) - sys.exit(1) - crypto_test.crypto_tests(t, os.path.join(root_dir, 'crypto_test.xml')) - drbg_test.drbg_test(t) - ecc_test.ecc_test(t) - ecies_test.ecies_test(t) - hash_test.hash_test(t) - hkdf_test.hkdf_test(t) - rsa_test.rsa_test(t) - upgrade_test.upgrade(t) - except subcmd.TpmTestError as e: - exc_file, exc_line = traceback.extract_tb(sys.exc_traceback)[-1][:2] - print('\nError in %s:%s: ' % (os.path.basename(exc_file), exc_line), e) - if debug_needed: - traceback.print_exc() - sys.exit(1) + main() diff --git a/test/tpm_test/trng_test.py b/test/tpm_test/trng_test.py index aac2803076..ddfc659339 100644 --- a/test/tpm_test/trng_test.py +++ b/test/tpm_test/trng_test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2019 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -16,15 +16,18 @@ TRNG_SAMPLE_SIZE = 1000 # minimal recommended by NIST is 1000 bytes per sample TRNG_SAMPLE_COUNT = 1000 # NIST require at least 1000000 of 8-bit samples def get_random_command(size): - return struct.pack(TRNG_TEST_FMT, size) + """Encode get_random command""" + return struct.pack(TRNG_TEST_FMT, size) def get_random_command_rsp(size): - return struct.pack(TRNG_TEST_RSP_FMT, 0x8001, - struct.calcsize(TRNG_TEST_RSP_FMT) + size, 0, TRNG_TEST_CC) + """Create expected response to get_random""" + return struct.pack(TRNG_TEST_RSP_FMT, 0x8001, + struct.calcsize(TRNG_TEST_RSP_FMT) + size, + 0, TRNG_TEST_CC) def trng_test(tpm): - """Download entropy samples from TRNG + """Download entropy samples from TRNG Command structure, shared out of band with the test running on the target: @@ -32,19 +35,19 @@ def trng_test(tpm): =================================================================== text_len | 2 | size of the text to process, big endian - Args: - tpm: a tpm object used to communicate with the device - - Raises: - subcmd.TpmTestError: on unexpected target responses - """ - with open('/tmp/trng_output', 'wb') as f: - for x in range(0, TRNG_SAMPLE_COUNT): - wrapped_response = tpm.command(tpm.wrap_ext_command(TRNG_TEST_CC, - get_random_command(TRNG_SAMPLE_SIZE))) - if wrapped_response[:12] != get_random_command_rsp(TRNG_SAMPLE_SIZE): - raise subcmd.TpmTestError("Unexpected response to '%s': %s" % - ("trng", utils.hex_dump(wrapped_response))) - f.write(wrapped_response[12:]) - print('%s %d%%\r' %( utils.cursor_back(), (x/10)), end=""), - print('%sSUCCESS: %s' % (utils.cursor_back(), 'trng')) + Args: + tpm: a tpm object used to communicate with the device + + Raises: + subcmd.TpmTestError: on unexpected target responses + """ + with open('/tmp/trng_output', 'wb') as out_file: + for block in range(0, TRNG_SAMPLE_COUNT): + response = tpm.command(tpm.wrap_ext_command(TRNG_TEST_CC, + get_random_command(TRNG_SAMPLE_SIZE))) + if response[:12] != get_random_command_rsp(TRNG_SAMPLE_SIZE): + raise subcmd.TpmTestError("Unexpected response to \'%s\': %s" % + ('trng', utils.hex_dump(response))) + out_file.write(response[12:]) + print('%s %d%%\r' % (utils.cursor_back(), (block/10)), end='') + print('%sSUCCESS: %s' % (utils.cursor_back(), 'trng')) diff --git a/test/tpm_test/upgrade_test.py b/test/tpm_test/upgrade_test.py index 35f8405ba7..2cf03cc8ea 100644 --- a/test/tpm_test/upgrade_test.py +++ b/test/tpm_test/upgrade_test.py @@ -1,7 +1,8 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. +"""Firmware upgrade tests""" from __future__ import print_function @@ -14,55 +15,55 @@ import utils def upgrade(tpm): - """Exercise the upgrade command. + """Exercise the upgrade command. - The target expect the upgrade extension command to have the following - structure: + The target expect the upgrade extension command to have the following + structure: - cmd 1 value of FW_UPGRADE - digest 4 first 4 bytes of sha1 of the remainder of the message - block_base 4 address of the block to write - data var + cmd 1 value of FW_UPGRADE + digest 4 first 4 bytes of sha1 of the remainder of the message + block_base 4 address of the block to write + data var + + Args: + tpm: a properly initialized tpmtest.TPM object - Args: - tpm: a properly initialized tpmtest.TPM object Raises: - subcmd.TpmTestError: In case of various test problems - """ - cmd = struct.pack('>I', 0) # address - cmd += struct.pack('>I', 0) # data (a noop) - wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.FW_UPGRADE, cmd)) - base_str = tpm.unwrap_ext_response(subcmd.FW_UPGRADE, wrapped_response) - if len(base_str) < 4: - raise subcmd.TpmTestError('Initialization error %d' % - ord(base_str[0])) - base = struct.unpack_from('>4I', base_str)[3] - if base == 0x44000: - fname = 'build/cr50/RW/ec.RW_B.flat' - elif base == 0x4000: - fname = 'build/cr50/RW/ec.RW.flat' - else: - raise subcmd.TpmTestError('Unknown base address 0x%x' % base) - fname = os.path.join(os.path.dirname(__file__), '../..', fname) - data = open(fname, 'r').read()[:2000] - transferred = 0 - block_size = 1024 + subcmd.TpmTestError: In case of various test problems + """ + cmd = struct.pack('>II', 0, 0) # address, data (a noop) + wrapped_response = tpm.command(tpm.wrap_ext_command(subcmd.FW_UPGRADE, cmd)) + base_str = tpm.unwrap_ext_response(subcmd.FW_UPGRADE, wrapped_response) + if len(base_str) < 4: + raise subcmd.TpmTestError('Initialization error %d' % + ord(base_str[0])) + base = struct.unpack_from('>4I', base_str)[3] + if base == 0x44000: + fname = 'build/cr50/RW/ec.RW_B.flat' + elif base == 0x4000: + fname = 'build/cr50/RW/ec.RW.flat' + else: + raise subcmd.TpmTestError('Unknown base address 0x%x' % base) + fname = os.path.join(os.path.dirname(__file__), '../..', fname) + data = open(fname, 'r').read()[:2000] + transferred = 0 + block_size = 1024 - while transferred < len(data): - tx_size = min(block_size, len(data) - transferred) - chunk = data[transferred:transferred+tx_size] - cmd = struct.pack('>I', base) # address - h = hashlib.sha1() - h.update(cmd) - h.update(chunk) - cmd = h.digest()[0:4] + cmd + chunk - resp = tpm.unwrap_ext_response(subcmd.FW_UPGRADE, - tpm.command(tpm.wrap_ext_command( - subcmd.FW_UPGRADE, cmd))) - code = ord(resp[0]) - if code: - raise subcmd.TpmTestError('%x - resp %d' % (base, code)) - base += tx_size - transferred += tx_size + while transferred < len(data): + tx_size = min(block_size, len(data) - transferred) + chunk = data[transferred:transferred+tx_size] + cmd = struct.pack('>I', base) # address + hash_block = hashlib.sha1() + hash_block.update(cmd) + hash_block.update(chunk) + cmd = hash_block.digest()[0:4] + cmd + chunk + resp = tpm.unwrap_ext_response(subcmd.FW_UPGRADE, + tpm.command(tpm.wrap_ext_command( + subcmd.FW_UPGRADE, cmd))) + code = ord(resp[0]) + if code: + raise subcmd.TpmTestError('%x - resp %d' % (base, code)) + base += tx_size + transferred += tx_size - print('%sSUCCESS: Firmware upgrade' % (utils.cursor_back())) + print('%sSUCCESS: Firmware upgrade' % (utils.cursor_back())) diff --git a/test/tpm_test/utils.py b/test/tpm_test/utils.py index 38cda2a1e2..7b42bcd508 100644 --- a/test/tpm_test/utils.py +++ b/test/tpm_test/utils.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +# -*- coding: utf-8 -*- # Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -8,31 +8,30 @@ import sys if hasattr(sys.stdout, 'isatty') and sys.stdout.isatty(): - cursor_back_cmd = '\x1b[1D' # Move one space to the left. + CURSOR_BACK_CMD = '\x1b[1D' # Move one space to the left. else: - cursor_back_cmd = '' + CURSOR_BACK_CMD = '' def cursor_back(): - """Return a string which would move cursor one space left, if available. + """Return a string which would move cursor one space left, if available. - This is used to remove the remaining 'spinner' character after the test - completes and its result is printed on the same line where the 'spinner' was - spinning. - - """ - return cursor_back_cmd + This is used to remove the remaining 'spinner' character after the test + completes and its result is printed on the same line where the 'spinner' was + spinning. + """ + return CURSOR_BACK_CMD def hex_dump(binstr): - """Convert binary string into its multiline hex representation.""" - - dump_lines = ['',] - i = 0 - while i < len(binstr): - strsize = min(16, len(binstr) - i) - hexstr = ' '.join('%2.2x' % ord(x) for x in binstr[i:i+strsize]) - dump_lines.append(hexstr) - i += strsize - dump_lines.append('') - return '\n'.join(dump_lines) + """Convert binary string into its multiline hex representation.""" + + dump_lines = ['',] + i = 0 + while i < len(binstr): + strsize = min(16, len(binstr) - i) + hexstr = ' '.join('%2.2x' % ord(x) for x in binstr[i:i+strsize]) + dump_lines.append(hexstr) + i += strsize + dump_lines.append('') + return '\n'.join(dump_lines) -- cgit v1.2.1