diff options
author | George Kraft <george.kraft@calxeda.com> | 2012-07-09 14:32:07 -0500 |
---|---|---|
committer | George Kraft <george.kraft@calxeda.com> | 2012-07-09 14:32:07 -0500 |
commit | ee073453981379111db4997305c742d0b9aef3b1 (patch) | |
tree | e43e8436a406729c7743429e3243dc6ff932401f | |
parent | 50db3fafd0fd7a62390989956ce225456688745b (diff) | |
download | cxmanage-ee073453981379111db4997305c742d0b9aef3b1.tar.gz |
tests: Refactor target tests, add config_boot tests
-rw-r--r-- | cxmanage/target.py | 18 | ||||
-rw-r--r-- | cxmanage_test/__init__.py | 39 | ||||
-rw-r--r-- | cxmanage_test/image_test.py | 9 | ||||
-rw-r--r-- | cxmanage_test/target_test.py | 529 | ||||
-rwxr-xr-x | run_tests | 2 |
5 files changed, 292 insertions, 305 deletions
diff --git a/cxmanage/target.py b/cxmanage/target.py index cc2bb56..907076a 100644 --- a/cxmanage/target.py +++ b/cxmanage/target.py @@ -18,15 +18,16 @@ class Target: """ Contains info for a single target. A target consists of a hostname, an username, and a password. """ - def __init__(self, address, username="admin", - password="admin", verbosity=0): + def __init__(self, address, username="admin", password="admin", + verbosity=0, bmc_class=LanBMC, ubootenv_class=UbootEnv): self.address = address self.username = username self.password = password self.verbosity = verbosity + self.ubootenv_class = ubootenv_class verbose = verbosity >= 2 - self.bmc = make_bmc(LanBMC, hostname=address, + self.bmc = make_bmc(bmc_class, hostname=address, username=username, password=password, verbose=verbose) def get_ipinfo(self, work_dir, tftp): @@ -186,7 +187,7 @@ class Target: except IpmiError: raise CxmanageError("Failed to retrieve firmware info") - def update_firmware(self, work_dir, tftp, images, slot_arg): + def update_firmware(self, work_dir, tftp, images, slot_arg="INACTIVE"): """ Update firmware on this target. """ fwinfo = self.get_firmware_info() @@ -204,10 +205,11 @@ class Target: old_ubootenv = self._download_ubootenv(work_dir, tftp, active_slot) bootcmd = old_ubootenv.get_variable("bootcmd0") + contents = open(image.filename).read() if image.simg: - ubootenv = UbootEnv(open(image.filename).read()[28:]) - else: - ubootenv = UbootEnv(open(image.filename).read()) + contents = contents[28:] + ubootenv = self.ubootenv_class(contents) + ubootenv.set_variable("bootcmd0", bootcmd) self._upload_ubootenv(work_dir, tftp, active_slot, ubootenv) @@ -396,7 +398,7 @@ class Target: # Open the file simg = open(image.filename).read() - return UbootEnv(simg[28:]) + return self.ubootenv_class(simg[28:]) def _wait_for_transfer(self, handle): """ Wait for a firmware transfer to finish""" diff --git a/cxmanage_test/__init__.py b/cxmanage_test/__init__.py index 9e1c15e..25ea0dd 100644 --- a/cxmanage_test/__init__.py +++ b/cxmanage_test/__init__.py @@ -5,32 +5,39 @@ import random import tempfile +from cxmanage.image import Image + +def random_file(work_dir, size): + """ Create a random file """ + contents = "".join([chr(random.randint(0, 255)) + for a in range(size)]) + filename = tempfile.mkstemp(prefix="%s/" % work_dir)[1] + open(filename, "w").write(contents) + return filename + +class TestImage(Image): + def valid_type(self): + return True + +class TestSensor: + """ Sensor result from bmc/target """ + def __init__(self, sensor_name, sensor_reading): + self.sensor_name = sensor_name + self.sensor_reading = sensor_reading + class TestSlot: """ Slot info for a partition """ - def __init__(self, slot=0, slot_type=2, offset=0, + def __init__(self, slot, slot_type, offset=0, size=0, version=0, daddr=0, in_use=None): self.slot = "%2i" % slot self.type = { 2: "02 (S2_ELF)", 3: "03 (SOC_ELF)", - 10: "0a (CDB)" + 10: "0a (CDB)", + 11: "0b (UBOOTENV)" }[slot_type] self.offset = "%8x" % offset self.size = "%8x" % size self.version = "%8x" % version self.daddr = "%8x" % daddr self.in_use = {None: "Unknown", True: "1", False: "0"}[in_use] - -class TestSensor: - """ Sensor result from bmc/target """ - def __init__(self, sensor_name, sensor_reading): - self.sensor_name = sensor_name - self.sensor_reading = sensor_reading - -def random_file(work_dir, size): - """ Create a random file """ - contents = "".join([chr(random.randint(0, 255)) - for a in range(size)]) - filename = tempfile.mkstemp(prefix="%s/" % work_dir)[1] - open(filename, "w").write(contents) - return filename diff --git a/cxmanage_test/image_test.py b/cxmanage_test/image_test.py index b60e0e7..4b8a89a 100644 --- a/cxmanage_test/image_test.py +++ b/cxmanage_test/image_test.py @@ -6,10 +6,9 @@ import tempfile import unittest from cxmanage.simg import get_simg_header -from cxmanage.image import Image from cxmanage.tftp import InternalTftp -from cxmanage_test import TestSlot, random_file +from cxmanage_test import random_file, TestImage, TestSlot class ImageTest(unittest.TestCase): """ Tests involving cxmanage images @@ -29,10 +28,6 @@ class ImageTest(unittest.TestCase): def test_upload(self): """ Test image creation and upload """ - class TestImage(Image): - def valid_type(self): - return True - imglen = 1024 daddr = 12345 new_version = 1 @@ -43,7 +38,7 @@ class ImageTest(unittest.TestCase): image = TestImage(filename, "RAW") # Create slot - slot = TestSlot(size=imglen + 28, daddr=daddr) + slot = TestSlot(0, 2, size=imglen + 28, daddr=daddr) # Upload image and delete file image_filename = image.upload(self.work_dir, diff --git a/cxmanage_test/target_test.py b/cxmanage_test/target_test.py index 5d83ed7..e949969 100644 --- a/cxmanage_test/target_test.py +++ b/cxmanage_test/target_test.py @@ -5,31 +5,26 @@ import shutil import tempfile import unittest -from pyipmi import IpmiError +from pyipmi.bmc import LanBMC -from cxmanage import CxmanageError -from cxmanage.image import Image -from cxmanage.simg import create_simg, valid_simg, get_simg_header +from cxmanage.simg import create_simg from cxmanage.target import Target from cxmanage.tftp import InternalTftp, ExternalTftp +from cxmanage.ubootenv import UbootEnv -from cxmanage_test import TestSlot, TestSensor, random_file +from cxmanage_test import TestImage, TestSensor, TestSlot + +num_nodes = 4 +addresses = ["192.168.100.%i" % a for a in range(1, num_nodes+1)] class TargetTest(unittest.TestCase): """ Tests involving cxmanage targets """ def setUp(self): - class TestTarget(Target): - """ A target that uses a test BMC """ - def __init__(self, node): - Target.__init__(self, node.address, "admin", "admin", 0) - self.bmc = TestBMC(node) - self.work_dir = tempfile.mkdtemp() - # Set up a cluster - self.cluster = TestCluster(4) - self.targets = [TestTarget(node) for node in self.cluster.nodes] + self.targets = [Target(x, verbosity=0, bmc_class=DummyBMC, + ubootenv_class=DummyUbootEnv) for x in addresses] # Set up an internal server self.tftp = InternalTftp() @@ -38,347 +33,335 @@ class TargetTest(unittest.TestCase): shutil.rmtree(self.work_dir) self.tftp.kill() - def test_ipinfo(self): - """ Test ipinfo command """ - # Get IP info - ipinfo = self.targets[0].get_ipinfo(self.work_dir, self.tftp) + def test_get_ipinfo(self): + """ Test get_ipinfo command """ + for target in self.targets: + result = target.get_ipinfo(self.work_dir, self.tftp) + + executed = target.bmc.executed + self.assertEqual(len(executed), 1) + self.assertEqual(executed[0], "get_fabric_ipinfo") - # Verify - for i in range(len(self.targets)): - self.assertEqual(i, ipinfo[i][0]) - self.assertEqual(self.targets[i].address, ipinfo[i][1]) + self.assertEqual(len(result), num_nodes) + for i in range(num_nodes): + self.assertEqual(result[i], (i, addresses[i])) def test_power(self): - """ Test power commands """ + """ Test power command """ for target in self.targets: - # Power should be off initially - self.assertEqual(target.power_status(), "off") - - # Turn power on - target.power("on") - self.assertEqual(target.power_status(), "on") - - # Reset - target.power("reset") - self.assertEqual(target.power_status(), "on") - - # Turn power off - target.power("off") - self.assertEqual(target.power_status(), "off") - - # Try to reset -- should raise an error - try: - target.power("reset") - self.fail() - except CxmanageError: - self.assertEqual(target.power_status(), "off") - - def test_policy(self): - """ Test power policy commands """ + modes = ["off", "on", "reset", "off"] + for mode in modes: + target.power(mode) + + executed = target.bmc.executed + self.assertEqual(len(executed), len(modes)) + for i in range(len(modes)): + self.assertEqual(executed[i], ("set_chassis_power", modes[i])) + + def test_power_status(self): + """ Test power_status command """ for target in self.targets: - self.assertEqual(target.power_policy_status(), "always-off") + result = target.power_status() - target.power_policy("always-on") - self.assertEqual(target.power_policy_status(), "always-on") + executed = target.bmc.executed + self.assertEqual(len(executed), 1) + self.assertEqual(executed[0], "get_chassis_status") + self.assertEqual(result, "off") - target.power_policy("previous") - self.assertEqual(target.power_policy_status(), "previous") + def test_power_policy(self): + """ Test power_policy command """ + for target in self.targets: + modes = ["always-on", "previous", "always-off"] + for mode in modes: + target.power_policy(mode) - target.power_policy("always-off") - self.assertEqual(target.power_policy_status(), "always-off") + executed = target.bmc.executed + self.assertEqual(len(executed), len(modes)) + for i in range(len(modes)): + self.assertEqual(executed[i], ("set_chassis_policy", modes[i])) + + def test_power_policy_status(self): + """ Test power_policy_status command """ + for target in self.targets: + result = target.power_policy_status() + + executed = target.bmc.executed + self.assertEqual(len(executed), 1) + self.assertEqual(executed[0], "get_chassis_status") + self.assertEqual(result, "always-off") def test_sensor(self): """ Test sensor read command """ for target in self.targets: - sensors = target.get_sensors() - - # Read node power - reading = [x.sensor_reading for x in sensors - if x.sensor_name == "Node Power"][0] - value = float(reading.split()[0]) - suffix = reading.lstrip("%f " % value) - self.assertEqual(suffix, "(+/- 0) Watts") - - # Read board temp - reading = [x.sensor_reading for x in sensors - if x.sensor_name == "Board Temp"][0] - value = float(reading.split()[0]) - suffix = reading.lstrip("%f " % value) - self.assertEqual(suffix, "(+/- 0) degrees C") + result = target.get_sensors() - def test_config_reset(self): - """ Test config reset command """ - for target in self.targets: - # Write random stuff to the partition - partition = [x for x in target.bmc.node.partitions - if x.image_type == 10][-1] - size = partition.size - raw_contents = "".join([chr(random.randint(0, 255)) - for a in range(size - 28)]) - partition.contents = partition.contents[:28] + raw_contents - self.assertEqual(partition.contents[28:], raw_contents) - - # Reset firmware and check - target.config_reset() - raw_contents = "".join([chr(0xFF) for a in range(size - 28)]) - self.assertEqual(partition.contents[28:], raw_contents) + executed = target.bmc.executed + self.assertEqual(len(executed), 1) + self.assertEqual(executed[0], "sdr_list") + + self.assertEqual(len(result), 2) + self.assertEqual(result[0].sensor_name, "Node Power") + self.assertTrue(result[0].sensor_reading.endswith("Watts")) + self.assertEqual(result[1].sensor_name, "Board Temp") + self.assertTrue(result[1].sensor_reading.endswith("degrees C")) def test_update_firmware(self): """ Test firmware update command """ - class TestImage(Image): - def valid_type(self): - return True + filename = "%s/%s" % (self.work_dir, "image.bin") + open(filename, "w").write("") - def create_image(partition, image_type): - filename = random_file(self.work_dir, partition.size - 28) - contents = open(filename).read() - return contents, TestImage(filename, image_type, False) + images = [ + TestImage(filename, "SOC_ELF"), + TestImage(filename, "CDB"), + TestImage(filename, "UBOOTENV") + ] for target in self.targets: - # Create random images - s2_partition = [x for x in target.bmc.node.partitions - if x.image_type == 2][1] - soc_partition = [x for x in target.bmc.node.partitions - if x.image_type == 3][1] - cdb_partition = [x for x in target.bmc.node.partitions - if x.image_type == 10][1] - s2_contents, s2_image = create_image(s2_partition, "S2_ELF") - soc_contents, soc_image = create_image(soc_partition, "SOC_ELF") - cdb_contents, cdb_image = create_image(cdb_partition, "CDB") - - # Execute firmware update - images = [s2_image, soc_image, cdb_image] - target.update_firmware(self.work_dir, - self.tftp, images, "INACTIVE") - - # Examine headers - changed_partitions = [s2_partition, soc_partition, cdb_partition] - unchanged_partitions = [x for x in target.bmc.node.partitions - if not x in changed_partitions] + target.update_firmware(self.work_dir, self.tftp, images) + + partitions = target.bmc.partitions + unchanged_partitions = [partitions[x] for x in [0, 1, 4]] + changed_partitions = [partitions[x] for x in [2, 3, 6]] + ubootenv_partition = partitions[5] + + for partition in unchanged_partitions: + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + for partition in changed_partitions: - header = get_simg_header(partition.contents) - self.assertTrue(valid_simg(partition.contents)) - self.assertEqual(header.version, 1) + self.assertEqual(partition.updates, 1) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 1) + self.assertEqual(partition.activates, 1) + + self.assertEqual(ubootenv_partition.updates, 1) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 1) + self.assertEqual(ubootenv_partition.activates, 1) + + def test_config_reset(self): + """ Test config_reset command """ + for target in self.targets: + target.config_reset(self.work_dir, self.tftp) + + # Assert config reset + executed = target.bmc.executed + self.assertEqual( + len([x for x in executed if x == "reset_firmware"]), 1) + + # Assert sel clear + self.assertEqual( + len([x for x in executed if x == "sel_clear"]), 1) + + # Assert ubootenv changes + active = target.bmc.partitions[5] + inactive = target.bmc.partitions[6] + self.assertEqual(active.updates, 1) + self.assertEqual(active.retrieves, 0) + self.assertEqual(active.checks, 1) + self.assertEqual(active.activates, 1) + self.assertEqual(inactive.updates, 0) + self.assertEqual(inactive.retrieves, 1) + self.assertEqual(inactive.checks, 0) + self.assertEqual(inactive.activates, 0) + + def test_config_boot(self): + """ Test config_boot command """ + boot_args = ["disk", "pxe", "retry"] + for target in self.targets: + target.config_boot(self.work_dir, self.tftp, boot_args) + + partitions = target.bmc.partitions + ubootenv_partition = partitions[5] + unchanged_partitions = [x for x in partitions + if x != ubootenv_partition] + + self.assertEqual(ubootenv_partition.updates, 1) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 1) + self.assertEqual(ubootenv_partition.activates, 1) + for partition in unchanged_partitions: - header = get_simg_header(partition.contents) - self.assertTrue(valid_simg(partition.contents)) - self.assertEqual(header.version, 0) - - # Examine contents - self.assertEqual(s2_contents, s2_partition.contents[28:]) - self.assertEqual(soc_contents, soc_partition.contents[28:]) - self.assertEqual(cdb_contents, cdb_partition.contents[28:]) + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + def test_config_boot_status(self): + """ Test config_boot_status command """ + for target in self.targets: + result = target.config_boot_status(self.work_dir, self.tftp) + + partitions = target.bmc.partitions + ubootenv_partition = partitions[5] + unchanged_partitions = [x for x in partitions + if x != ubootenv_partition] + + self.assertEqual(ubootenv_partition.updates, 0) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 0) + self.assertEqual(ubootenv_partition.activates, 0) + for partition in unchanged_partitions: - size = partition.size - 28 - contents = "".join([chr(0xFF) for a in range(size)]) - self.assertEqual(partition.contents[28:], contents) - - -class TestPartition: - """ A partition on a node """ - def __init__(self, image_type, offset, size, in_use=None, contents=None): - self.image_type = image_type - self.offset = offset - self.size = size - self.in_use = in_use - if contents == None: - raw_contents = "".join([chr(0xFF) for a in range(size - 28)]) - self.contents = create_simg(raw_contents) - else: - self.contents = contents - - -class TestNode: - """ A virtual node, with an IP address, partitions, and chassis power """ - - def __init__(self, address="192.168.100.250", cluster=None): - self.address = address - self.cluster = cluster - self.partitions = [] - self.power = "off" - self.policy = "always-off" - - def add_partition(image_type, size, in_use=None): - """ Add a partition to this node """ - if len(self.partitions) == 0: - offset = 0 - else: - offset = self.partitions[-1].offset + self.partitions[-1].size - - partition = TestPartition(image_type, offset, size, in_use) - - self.partitions.append(partition) - - # Add partitions: two S2_ELF, two SOC_ELF/CDB pairs, one running CDB - add_partition(image_type=2, size=20480) - add_partition(image_type=2, size=20480) - add_partition(image_type=3, size=393216, in_use=True) - add_partition(image_type=10, size=196608) - add_partition(image_type=3, size=393216, in_use=False) - add_partition(image_type=10, size=196608) - add_partition(image_type=10, size=196608) - - -class TestCluster: - """ A cluster of nodes """ - def __init__(self, num_nodes = 1): - self.nodes = [] - for a in range(num_nodes): - address = "192.168.100.%i" % (100 + a) - node = TestNode(address, self) - self.nodes.append(node) - - -class TestBMC: - """ BMC handle for a virtual node """ - def __init__(self, node): - self.node = node + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + self.assertEqual(result, ["disk", "pxe"]) + +class DummyBMC(LanBMC): + """ Dummy BMC for the target tests """ + def __init__(self, **kwargs): + LanBMC.__init__(self, **kwargs) + + self.executed = [] + + class Partition: + def __init__(self, slot, slot_type, offset=0, + size=0, version=0, daddr=0, in_use=None): + self.updates = 0 + self.retrieves = 0 + self.checks = 0 + self.activates = 0 + self.slot = TestSlot(slot, slot_type, offset, + size, version, daddr, in_use) + + self.partitions = [ + Partition(0, 3, 0, 393216, in_use=True), # socman + Partition(1, 10, 393216, 196608), # factory cdb + Partition(2, 3, 589824, 393216, in_use=False), # socman + Partition(3, 10, 983040, 196608), # factory cdb + Partition(4, 10, 1179648, 196608), # running cdb + Partition(5, 11, 1376256, 12288), # ubootenv + Partition(6, 11, 1388544, 12288) # ubootenv + ] def get_fabric_ipinfo(self, filename, tftp_address): """ Upload an ipinfo file from the node to TFTP""" - # Set up TFTP client + self.executed.append("get_fabric_ipinfo") + + work_dir = tempfile.mkdtemp() + + # Create IP info file + ipinfo = open("%s/%s" % (work_dir, filename), "w") + for i in range(len(addresses)): + ipinfo.write("Node %i: %s\n" % (i, addresses[i])) + ipinfo.close() + + # Upload to tftp address, port = tftp_address.split(":") port = int(port) tftp = ExternalTftp(address, port) - - # Create file - work_dir = tempfile.mkdtemp() - ipinfo_file = open("%s/%s" % (work_dir, filename), "w") - nodes = self.node.cluster.nodes - for i in range(len(nodes)): - address = nodes[i].address - ipinfo_file.write("Node %i: %s\n" % (i, address)) - ipinfo_file.close() - - # Upload file tftp.put_file("%s/%s" % (work_dir, filename), filename) - # Clean up shutil.rmtree(work_dir) def set_chassis_power(self, mode): """ Set chassis power """ - if mode == "reset": - if self.node.power == "off": - raise IpmiError - else: - self.node.power = mode + self.executed.append(("set_chassis_power", mode)) def get_chassis_status(self): - """ Get chassis power """ - class Result: - def __init__(self, power, policy): - self.power_on = (power == "on") - self.power_restore_policy = policy + """ Get chassis status """ + self.executed.append("get_chassis_status") - return Result(self.node.power, self.node.policy) + class Result: + def __init__(self): + self.power_on = False + self.power_restore_policy = "always-off" + return Result() def set_chassis_policy(self, mode): """ Set chassis restore policy """ - self.node.policy = mode + self.executed.append(("set_chassis_policy", mode)) def mc_reset(self, mode): """ Reset the MC """ - if self.node.policy == "always-off": - self.node.power = "off" - elif self.node.policy == "always-on": - self.node.power = "on" - - class Result: - pass - - return Result() + self.executed.append(("mc_reset", mode)) def reset_firmware(self): """ Reset the running CDB """ - partition = [x for x in self.node.partitions if x.image_type == 10][-1] - size = partition.size - raw_contents = "".join([chr(0xFF) for a in range(size - 28)]) - partition.contents = (partition.contents[:28] + raw_contents) + self.executed.append("reset_firmware") def sel_clear(self): """ Clear SEL """ - # TODO - pass + self.executed.append("sel_clear") def get_firmware_info(self): """ Get partition and simg info """ - results = [] - for a in range(len(self.node.partitions)): - partition = self.node.partitions[a] - header = get_simg_header(partition.contents) - result = TestSlot(a, partition.image_type, partition.offset, - partition.size, header.version, header.daddr, - partition.in_use) - results.append(result) + self.executed.append("get_firmware_info") - return results + return [x.slot for x in self.partitions] def update_firmware(self, filename, slot_id, image_type, tftp_address): """ Download a file from a TFTP server to a given slot. Make sure the image type matches. """ + self.executed.append(("update_firmware", filename, + slot_id, image_type, tftp_address)) + self.partitions[slot_id].updates += 1 - work_dir = tempfile.mkdtemp() + class Result: + def __init__(self): + self.tftp_handle_id = 0 + return Result() - partition = self.node.partitions[slot_id] - image_type = {"S2_ELF": 2, "SOC_ELF": 3, "CDB": 10}[image_type] - if partition.image_type != image_type: - raise IpmiError + def retrieve_firmware(self, filename, slot_id, image_type, tftp_address): + self.executed.append(("retrieve_firmware", filename, + slot_id, image_type, tftp_address)) + self.partitions[slot_id].retrieves += 1 - # Download from TFTP server + # Upload blank image to tftp + work_dir = tempfile.mkdtemp() + open("%s/%s" % (work_dir, filename), "w").write(create_simg("")) address, port = tftp_address.split(":") port = int(port) tftp = ExternalTftp(address, port) - tftp.get_file(filename, "%s/%s" % (work_dir, filename)) - - # Update partition and clean up - partition.contents = open("%s/%s" % (work_dir, filename)).read() + tftp.put_file("%s/%s" % (work_dir, filename), filename) shutil.rmtree(work_dir) - # Return result class Result: - def __init__(self, handle=0): - self.tftp_handle_id = handle + def __init__(self): + self.tftp_handle_id = 0 return Result() def get_firmware_status(self, handle): + self.executed.append("get_firmware_status") + class Result: - def __init__(self, status=None): - if status == None: - self.status = random.choice(["In progress", "Complete"]) - else: - self.status = status + def __init__(self): + self.status = "Complete" return Result() def check_firmware(self, slot_id): - class Result: - def __init__(self, partition): - header = get_simg_header(partition.contents) - if valid_simg(partition.contents): - self.crc32 = header.crc32 - self.error = None - else: - # TODO: what's the real error? - self.error = True + self.executed.append(("check_firmware", slot_id)) + self.partitions[slot_id].checks += 1 - return Result(self.node.partitions[slot_id]) + class Result: + def __init__(self): + self.crc32 = 0 + self.error = None + return Result() def activate_firmware(self, slot_id): - partition = self.node.partitions[slot_id] - header = get_simg_header(partition.contents) - header.flags = header.flags & 0xFFFFFFFE - partition.contents = str(header) + partition.contents[28:] + self.executed.append(("activate_firmware", slot_id)) + self.partitions[slot_id].activates += 1 def sdr_list(self): - """ Get sensor info from the node. + """ Get sensor info from the node. """ + self.executed.append("sdr_list") - Virtual node doesn't currently have any sensors, so just make some up. - """ - power_value = "%f (+/- 0) Watts" % random.uniform(0.0, 10.0) - temp_value = "%f (+/- 0) degrees C" % random.uniform(30.0, 50.0) + power_value = "%f (+/- 0) Watts" % random.uniform(0, 10) + temp_value = "%f (+/- 0) degrees C" % random.uniform(30, 50) sensors = [ TestSensor("Node Power", power_value), TestSensor("Board Temp", temp_value) ] + return sensors + +class DummyUbootEnv(UbootEnv): + def get_boot_order(self): + return ["disk", "pxe"] @@ -5,7 +5,7 @@ import unittest from cxmanage_test import tftp_test, image_test, target_test, controller_test -test_modules = [tftp_test, image_test, controller_test] +test_modules = [tftp_test, image_test, target_test, controller_test] def main(): """ Load and run tests """ |