summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Kraft <george.kraft@calxeda.com>2012-07-09 14:32:07 -0500
committerGeorge Kraft <george.kraft@calxeda.com>2012-07-09 14:32:07 -0500
commitee073453981379111db4997305c742d0b9aef3b1 (patch)
treee43e8436a406729c7743429e3243dc6ff932401f
parent50db3fafd0fd7a62390989956ce225456688745b (diff)
downloadcxmanage-ee073453981379111db4997305c742d0b9aef3b1.tar.gz
tests: Refactor target tests, add config_boot tests
-rw-r--r--cxmanage/target.py18
-rw-r--r--cxmanage_test/__init__.py39
-rw-r--r--cxmanage_test/image_test.py9
-rw-r--r--cxmanage_test/target_test.py529
-rwxr-xr-xrun_tests2
5 files changed, 292 insertions, 305 deletions
diff --git a/cxmanage/target.py b/cxmanage/target.py
index cc2bb56..907076a 100644
--- a/cxmanage/target.py
+++ b/cxmanage/target.py
@@ -18,15 +18,16 @@ class Target:
""" Contains info for a single target. A target consists of a hostname,
an username, and a password. """
- def __init__(self, address, username="admin",
- password="admin", verbosity=0):
+ def __init__(self, address, username="admin", password="admin",
+ verbosity=0, bmc_class=LanBMC, ubootenv_class=UbootEnv):
self.address = address
self.username = username
self.password = password
self.verbosity = verbosity
+ self.ubootenv_class = ubootenv_class
verbose = verbosity >= 2
- self.bmc = make_bmc(LanBMC, hostname=address,
+ self.bmc = make_bmc(bmc_class, hostname=address,
username=username, password=password, verbose=verbose)
def get_ipinfo(self, work_dir, tftp):
@@ -186,7 +187,7 @@ class Target:
except IpmiError:
raise CxmanageError("Failed to retrieve firmware info")
- def update_firmware(self, work_dir, tftp, images, slot_arg):
+ def update_firmware(self, work_dir, tftp, images, slot_arg="INACTIVE"):
""" Update firmware on this target. """
fwinfo = self.get_firmware_info()
@@ -204,10 +205,11 @@ class Target:
old_ubootenv = self._download_ubootenv(work_dir,
tftp, active_slot)
bootcmd = old_ubootenv.get_variable("bootcmd0")
+ contents = open(image.filename).read()
if image.simg:
- ubootenv = UbootEnv(open(image.filename).read()[28:])
- else:
- ubootenv = UbootEnv(open(image.filename).read())
+ contents = contents[28:]
+ ubootenv = self.ubootenv_class(contents)
+
ubootenv.set_variable("bootcmd0", bootcmd)
self._upload_ubootenv(work_dir, tftp, active_slot, ubootenv)
@@ -396,7 +398,7 @@ class Target:
# Open the file
simg = open(image.filename).read()
- return UbootEnv(simg[28:])
+ return self.ubootenv_class(simg[28:])
def _wait_for_transfer(self, handle):
""" Wait for a firmware transfer to finish"""
diff --git a/cxmanage_test/__init__.py b/cxmanage_test/__init__.py
index 9e1c15e..25ea0dd 100644
--- a/cxmanage_test/__init__.py
+++ b/cxmanage_test/__init__.py
@@ -5,32 +5,39 @@
import random
import tempfile
+from cxmanage.image import Image
+
+def random_file(work_dir, size):
+ """ Create a random file """
+ contents = "".join([chr(random.randint(0, 255))
+ for a in range(size)])
+ filename = tempfile.mkstemp(prefix="%s/" % work_dir)[1]
+ open(filename, "w").write(contents)
+ return filename
+
+class TestImage(Image):
+ def valid_type(self):
+ return True
+
+class TestSensor:
+ """ Sensor result from bmc/target """
+ def __init__(self, sensor_name, sensor_reading):
+ self.sensor_name = sensor_name
+ self.sensor_reading = sensor_reading
+
class TestSlot:
""" Slot info for a partition """
- def __init__(self, slot=0, slot_type=2, offset=0,
+ def __init__(self, slot, slot_type, offset=0,
size=0, version=0, daddr=0, in_use=None):
self.slot = "%2i" % slot
self.type = {
2: "02 (S2_ELF)",
3: "03 (SOC_ELF)",
- 10: "0a (CDB)"
+ 10: "0a (CDB)",
+ 11: "0b (UBOOTENV)"
}[slot_type]
self.offset = "%8x" % offset
self.size = "%8x" % size
self.version = "%8x" % version
self.daddr = "%8x" % daddr
self.in_use = {None: "Unknown", True: "1", False: "0"}[in_use]
-
-class TestSensor:
- """ Sensor result from bmc/target """
- def __init__(self, sensor_name, sensor_reading):
- self.sensor_name = sensor_name
- self.sensor_reading = sensor_reading
-
-def random_file(work_dir, size):
- """ Create a random file """
- contents = "".join([chr(random.randint(0, 255))
- for a in range(size)])
- filename = tempfile.mkstemp(prefix="%s/" % work_dir)[1]
- open(filename, "w").write(contents)
- return filename
diff --git a/cxmanage_test/image_test.py b/cxmanage_test/image_test.py
index b60e0e7..4b8a89a 100644
--- a/cxmanage_test/image_test.py
+++ b/cxmanage_test/image_test.py
@@ -6,10 +6,9 @@ import tempfile
import unittest
from cxmanage.simg import get_simg_header
-from cxmanage.image import Image
from cxmanage.tftp import InternalTftp
-from cxmanage_test import TestSlot, random_file
+from cxmanage_test import random_file, TestImage, TestSlot
class ImageTest(unittest.TestCase):
""" Tests involving cxmanage images
@@ -29,10 +28,6 @@ class ImageTest(unittest.TestCase):
def test_upload(self):
""" Test image creation and upload """
- class TestImage(Image):
- def valid_type(self):
- return True
-
imglen = 1024
daddr = 12345
new_version = 1
@@ -43,7 +38,7 @@ class ImageTest(unittest.TestCase):
image = TestImage(filename, "RAW")
# Create slot
- slot = TestSlot(size=imglen + 28, daddr=daddr)
+ slot = TestSlot(0, 2, size=imglen + 28, daddr=daddr)
# Upload image and delete file
image_filename = image.upload(self.work_dir,
diff --git a/cxmanage_test/target_test.py b/cxmanage_test/target_test.py
index 5d83ed7..e949969 100644
--- a/cxmanage_test/target_test.py
+++ b/cxmanage_test/target_test.py
@@ -5,31 +5,26 @@ import shutil
import tempfile
import unittest
-from pyipmi import IpmiError
+from pyipmi.bmc import LanBMC
-from cxmanage import CxmanageError
-from cxmanage.image import Image
-from cxmanage.simg import create_simg, valid_simg, get_simg_header
+from cxmanage.simg import create_simg
from cxmanage.target import Target
from cxmanage.tftp import InternalTftp, ExternalTftp
+from cxmanage.ubootenv import UbootEnv
-from cxmanage_test import TestSlot, TestSensor, random_file
+from cxmanage_test import TestImage, TestSensor, TestSlot
+
+num_nodes = 4
+addresses = ["192.168.100.%i" % a for a in range(1, num_nodes+1)]
class TargetTest(unittest.TestCase):
""" Tests involving cxmanage targets """
def setUp(self):
- class TestTarget(Target):
- """ A target that uses a test BMC """
- def __init__(self, node):
- Target.__init__(self, node.address, "admin", "admin", 0)
- self.bmc = TestBMC(node)
-
self.work_dir = tempfile.mkdtemp()
- # Set up a cluster
- self.cluster = TestCluster(4)
- self.targets = [TestTarget(node) for node in self.cluster.nodes]
+ self.targets = [Target(x, verbosity=0, bmc_class=DummyBMC,
+ ubootenv_class=DummyUbootEnv) for x in addresses]
# Set up an internal server
self.tftp = InternalTftp()
@@ -38,347 +33,335 @@ class TargetTest(unittest.TestCase):
shutil.rmtree(self.work_dir)
self.tftp.kill()
- def test_ipinfo(self):
- """ Test ipinfo command """
- # Get IP info
- ipinfo = self.targets[0].get_ipinfo(self.work_dir, self.tftp)
+ def test_get_ipinfo(self):
+ """ Test get_ipinfo command """
+ for target in self.targets:
+ result = target.get_ipinfo(self.work_dir, self.tftp)
+
+ executed = target.bmc.executed
+ self.assertEqual(len(executed), 1)
+ self.assertEqual(executed[0], "get_fabric_ipinfo")
- # Verify
- for i in range(len(self.targets)):
- self.assertEqual(i, ipinfo[i][0])
- self.assertEqual(self.targets[i].address, ipinfo[i][1])
+ self.assertEqual(len(result), num_nodes)
+ for i in range(num_nodes):
+ self.assertEqual(result[i], (i, addresses[i]))
def test_power(self):
- """ Test power commands """
+ """ Test power command """
for target in self.targets:
- # Power should be off initially
- self.assertEqual(target.power_status(), "off")
-
- # Turn power on
- target.power("on")
- self.assertEqual(target.power_status(), "on")
-
- # Reset
- target.power("reset")
- self.assertEqual(target.power_status(), "on")
-
- # Turn power off
- target.power("off")
- self.assertEqual(target.power_status(), "off")
-
- # Try to reset -- should raise an error
- try:
- target.power("reset")
- self.fail()
- except CxmanageError:
- self.assertEqual(target.power_status(), "off")
-
- def test_policy(self):
- """ Test power policy commands """
+ modes = ["off", "on", "reset", "off"]
+ for mode in modes:
+ target.power(mode)
+
+ executed = target.bmc.executed
+ self.assertEqual(len(executed), len(modes))
+ for i in range(len(modes)):
+ self.assertEqual(executed[i], ("set_chassis_power", modes[i]))
+
+ def test_power_status(self):
+ """ Test power_status command """
for target in self.targets:
- self.assertEqual(target.power_policy_status(), "always-off")
+ result = target.power_status()
- target.power_policy("always-on")
- self.assertEqual(target.power_policy_status(), "always-on")
+ executed = target.bmc.executed
+ self.assertEqual(len(executed), 1)
+ self.assertEqual(executed[0], "get_chassis_status")
+ self.assertEqual(result, "off")
- target.power_policy("previous")
- self.assertEqual(target.power_policy_status(), "previous")
+ def test_power_policy(self):
+ """ Test power_policy command """
+ for target in self.targets:
+ modes = ["always-on", "previous", "always-off"]
+ for mode in modes:
+ target.power_policy(mode)
- target.power_policy("always-off")
- self.assertEqual(target.power_policy_status(), "always-off")
+ executed = target.bmc.executed
+ self.assertEqual(len(executed), len(modes))
+ for i in range(len(modes)):
+ self.assertEqual(executed[i], ("set_chassis_policy", modes[i]))
+
+ def test_power_policy_status(self):
+ """ Test power_policy_status command """
+ for target in self.targets:
+ result = target.power_policy_status()
+
+ executed = target.bmc.executed
+ self.assertEqual(len(executed), 1)
+ self.assertEqual(executed[0], "get_chassis_status")
+ self.assertEqual(result, "always-off")
def test_sensor(self):
""" Test sensor read command """
for target in self.targets:
- sensors = target.get_sensors()
-
- # Read node power
- reading = [x.sensor_reading for x in sensors
- if x.sensor_name == "Node Power"][0]
- value = float(reading.split()[0])
- suffix = reading.lstrip("%f " % value)
- self.assertEqual(suffix, "(+/- 0) Watts")
-
- # Read board temp
- reading = [x.sensor_reading for x in sensors
- if x.sensor_name == "Board Temp"][0]
- value = float(reading.split()[0])
- suffix = reading.lstrip("%f " % value)
- self.assertEqual(suffix, "(+/- 0) degrees C")
+ result = target.get_sensors()
- def test_config_reset(self):
- """ Test config reset command """
- for target in self.targets:
- # Write random stuff to the partition
- partition = [x for x in target.bmc.node.partitions
- if x.image_type == 10][-1]
- size = partition.size
- raw_contents = "".join([chr(random.randint(0, 255))
- for a in range(size - 28)])
- partition.contents = partition.contents[:28] + raw_contents
- self.assertEqual(partition.contents[28:], raw_contents)
-
- # Reset firmware and check
- target.config_reset()
- raw_contents = "".join([chr(0xFF) for a in range(size - 28)])
- self.assertEqual(partition.contents[28:], raw_contents)
+ executed = target.bmc.executed
+ self.assertEqual(len(executed), 1)
+ self.assertEqual(executed[0], "sdr_list")
+
+ self.assertEqual(len(result), 2)
+ self.assertEqual(result[0].sensor_name, "Node Power")
+ self.assertTrue(result[0].sensor_reading.endswith("Watts"))
+ self.assertEqual(result[1].sensor_name, "Board Temp")
+ self.assertTrue(result[1].sensor_reading.endswith("degrees C"))
def test_update_firmware(self):
""" Test firmware update command """
- class TestImage(Image):
- def valid_type(self):
- return True
+ filename = "%s/%s" % (self.work_dir, "image.bin")
+ open(filename, "w").write("")
- def create_image(partition, image_type):
- filename = random_file(self.work_dir, partition.size - 28)
- contents = open(filename).read()
- return contents, TestImage(filename, image_type, False)
+ images = [
+ TestImage(filename, "SOC_ELF"),
+ TestImage(filename, "CDB"),
+ TestImage(filename, "UBOOTENV")
+ ]
for target in self.targets:
- # Create random images
- s2_partition = [x for x in target.bmc.node.partitions
- if x.image_type == 2][1]
- soc_partition = [x for x in target.bmc.node.partitions
- if x.image_type == 3][1]
- cdb_partition = [x for x in target.bmc.node.partitions
- if x.image_type == 10][1]
- s2_contents, s2_image = create_image(s2_partition, "S2_ELF")
- soc_contents, soc_image = create_image(soc_partition, "SOC_ELF")
- cdb_contents, cdb_image = create_image(cdb_partition, "CDB")
-
- # Execute firmware update
- images = [s2_image, soc_image, cdb_image]
- target.update_firmware(self.work_dir,
- self.tftp, images, "INACTIVE")
-
- # Examine headers
- changed_partitions = [s2_partition, soc_partition, cdb_partition]
- unchanged_partitions = [x for x in target.bmc.node.partitions
- if not x in changed_partitions]
+ target.update_firmware(self.work_dir, self.tftp, images)
+
+ partitions = target.bmc.partitions
+ unchanged_partitions = [partitions[x] for x in [0, 1, 4]]
+ changed_partitions = [partitions[x] for x in [2, 3, 6]]
+ ubootenv_partition = partitions[5]
+
+ for partition in unchanged_partitions:
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
for partition in changed_partitions:
- header = get_simg_header(partition.contents)
- self.assertTrue(valid_simg(partition.contents))
- self.assertEqual(header.version, 1)
+ self.assertEqual(partition.updates, 1)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 1)
+ self.assertEqual(partition.activates, 1)
+
+ self.assertEqual(ubootenv_partition.updates, 1)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 1)
+ self.assertEqual(ubootenv_partition.activates, 1)
+
+ def test_config_reset(self):
+ """ Test config_reset command """
+ for target in self.targets:
+ target.config_reset(self.work_dir, self.tftp)
+
+ # Assert config reset
+ executed = target.bmc.executed
+ self.assertEqual(
+ len([x for x in executed if x == "reset_firmware"]), 1)
+
+ # Assert sel clear
+ self.assertEqual(
+ len([x for x in executed if x == "sel_clear"]), 1)
+
+ # Assert ubootenv changes
+ active = target.bmc.partitions[5]
+ inactive = target.bmc.partitions[6]
+ self.assertEqual(active.updates, 1)
+ self.assertEqual(active.retrieves, 0)
+ self.assertEqual(active.checks, 1)
+ self.assertEqual(active.activates, 1)
+ self.assertEqual(inactive.updates, 0)
+ self.assertEqual(inactive.retrieves, 1)
+ self.assertEqual(inactive.checks, 0)
+ self.assertEqual(inactive.activates, 0)
+
+ def test_config_boot(self):
+ """ Test config_boot command """
+ boot_args = ["disk", "pxe", "retry"]
+ for target in self.targets:
+ target.config_boot(self.work_dir, self.tftp, boot_args)
+
+ partitions = target.bmc.partitions
+ ubootenv_partition = partitions[5]
+ unchanged_partitions = [x for x in partitions
+ if x != ubootenv_partition]
+
+ self.assertEqual(ubootenv_partition.updates, 1)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 1)
+ self.assertEqual(ubootenv_partition.activates, 1)
+
for partition in unchanged_partitions:
- header = get_simg_header(partition.contents)
- self.assertTrue(valid_simg(partition.contents))
- self.assertEqual(header.version, 0)
-
- # Examine contents
- self.assertEqual(s2_contents, s2_partition.contents[28:])
- self.assertEqual(soc_contents, soc_partition.contents[28:])
- self.assertEqual(cdb_contents, cdb_partition.contents[28:])
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ def test_config_boot_status(self):
+ """ Test config_boot_status command """
+ for target in self.targets:
+ result = target.config_boot_status(self.work_dir, self.tftp)
+
+ partitions = target.bmc.partitions
+ ubootenv_partition = partitions[5]
+ unchanged_partitions = [x for x in partitions
+ if x != ubootenv_partition]
+
+ self.assertEqual(ubootenv_partition.updates, 0)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 0)
+ self.assertEqual(ubootenv_partition.activates, 0)
+
for partition in unchanged_partitions:
- size = partition.size - 28
- contents = "".join([chr(0xFF) for a in range(size)])
- self.assertEqual(partition.contents[28:], contents)
-
-
-class TestPartition:
- """ A partition on a node """
- def __init__(self, image_type, offset, size, in_use=None, contents=None):
- self.image_type = image_type
- self.offset = offset
- self.size = size
- self.in_use = in_use
- if contents == None:
- raw_contents = "".join([chr(0xFF) for a in range(size - 28)])
- self.contents = create_simg(raw_contents)
- else:
- self.contents = contents
-
-
-class TestNode:
- """ A virtual node, with an IP address, partitions, and chassis power """
-
- def __init__(self, address="192.168.100.250", cluster=None):
- self.address = address
- self.cluster = cluster
- self.partitions = []
- self.power = "off"
- self.policy = "always-off"
-
- def add_partition(image_type, size, in_use=None):
- """ Add a partition to this node """
- if len(self.partitions) == 0:
- offset = 0
- else:
- offset = self.partitions[-1].offset + self.partitions[-1].size
-
- partition = TestPartition(image_type, offset, size, in_use)
-
- self.partitions.append(partition)
-
- # Add partitions: two S2_ELF, two SOC_ELF/CDB pairs, one running CDB
- add_partition(image_type=2, size=20480)
- add_partition(image_type=2, size=20480)
- add_partition(image_type=3, size=393216, in_use=True)
- add_partition(image_type=10, size=196608)
- add_partition(image_type=3, size=393216, in_use=False)
- add_partition(image_type=10, size=196608)
- add_partition(image_type=10, size=196608)
-
-
-class TestCluster:
- """ A cluster of nodes """
- def __init__(self, num_nodes = 1):
- self.nodes = []
- for a in range(num_nodes):
- address = "192.168.100.%i" % (100 + a)
- node = TestNode(address, self)
- self.nodes.append(node)
-
-
-class TestBMC:
- """ BMC handle for a virtual node """
- def __init__(self, node):
- self.node = node
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ self.assertEqual(result, ["disk", "pxe"])
+
+class DummyBMC(LanBMC):
+ """ Dummy BMC for the target tests """
+ def __init__(self, **kwargs):
+ LanBMC.__init__(self, **kwargs)
+
+ self.executed = []
+
+ class Partition:
+ def __init__(self, slot, slot_type, offset=0,
+ size=0, version=0, daddr=0, in_use=None):
+ self.updates = 0
+ self.retrieves = 0
+ self.checks = 0
+ self.activates = 0
+ self.slot = TestSlot(slot, slot_type, offset,
+ size, version, daddr, in_use)
+
+ self.partitions = [
+ Partition(0, 3, 0, 393216, in_use=True), # socman
+ Partition(1, 10, 393216, 196608), # factory cdb
+ Partition(2, 3, 589824, 393216, in_use=False), # socman
+ Partition(3, 10, 983040, 196608), # factory cdb
+ Partition(4, 10, 1179648, 196608), # running cdb
+ Partition(5, 11, 1376256, 12288), # ubootenv
+ Partition(6, 11, 1388544, 12288) # ubootenv
+ ]
def get_fabric_ipinfo(self, filename, tftp_address):
""" Upload an ipinfo file from the node to TFTP"""
- # Set up TFTP client
+ self.executed.append("get_fabric_ipinfo")
+
+ work_dir = tempfile.mkdtemp()
+
+ # Create IP info file
+ ipinfo = open("%s/%s" % (work_dir, filename), "w")
+ for i in range(len(addresses)):
+ ipinfo.write("Node %i: %s\n" % (i, addresses[i]))
+ ipinfo.close()
+
+ # Upload to tftp
address, port = tftp_address.split(":")
port = int(port)
tftp = ExternalTftp(address, port)
-
- # Create file
- work_dir = tempfile.mkdtemp()
- ipinfo_file = open("%s/%s" % (work_dir, filename), "w")
- nodes = self.node.cluster.nodes
- for i in range(len(nodes)):
- address = nodes[i].address
- ipinfo_file.write("Node %i: %s\n" % (i, address))
- ipinfo_file.close()
-
- # Upload file
tftp.put_file("%s/%s" % (work_dir, filename), filename)
- # Clean up
shutil.rmtree(work_dir)
def set_chassis_power(self, mode):
""" Set chassis power """
- if mode == "reset":
- if self.node.power == "off":
- raise IpmiError
- else:
- self.node.power = mode
+ self.executed.append(("set_chassis_power", mode))
def get_chassis_status(self):
- """ Get chassis power """
- class Result:
- def __init__(self, power, policy):
- self.power_on = (power == "on")
- self.power_restore_policy = policy
+ """ Get chassis status """
+ self.executed.append("get_chassis_status")
- return Result(self.node.power, self.node.policy)
+ class Result:
+ def __init__(self):
+ self.power_on = False
+ self.power_restore_policy = "always-off"
+ return Result()
def set_chassis_policy(self, mode):
""" Set chassis restore policy """
- self.node.policy = mode
+ self.executed.append(("set_chassis_policy", mode))
def mc_reset(self, mode):
""" Reset the MC """
- if self.node.policy == "always-off":
- self.node.power = "off"
- elif self.node.policy == "always-on":
- self.node.power = "on"
-
- class Result:
- pass
-
- return Result()
+ self.executed.append(("mc_reset", mode))
def reset_firmware(self):
""" Reset the running CDB """
- partition = [x for x in self.node.partitions if x.image_type == 10][-1]
- size = partition.size
- raw_contents = "".join([chr(0xFF) for a in range(size - 28)])
- partition.contents = (partition.contents[:28] + raw_contents)
+ self.executed.append("reset_firmware")
def sel_clear(self):
""" Clear SEL """
- # TODO
- pass
+ self.executed.append("sel_clear")
def get_firmware_info(self):
""" Get partition and simg info """
- results = []
- for a in range(len(self.node.partitions)):
- partition = self.node.partitions[a]
- header = get_simg_header(partition.contents)
- result = TestSlot(a, partition.image_type, partition.offset,
- partition.size, header.version, header.daddr,
- partition.in_use)
- results.append(result)
+ self.executed.append("get_firmware_info")
- return results
+ return [x.slot for x in self.partitions]
def update_firmware(self, filename, slot_id, image_type, tftp_address):
""" Download a file from a TFTP server to a given slot.
Make sure the image type matches. """
+ self.executed.append(("update_firmware", filename,
+ slot_id, image_type, tftp_address))
+ self.partitions[slot_id].updates += 1
- work_dir = tempfile.mkdtemp()
+ class Result:
+ def __init__(self):
+ self.tftp_handle_id = 0
+ return Result()
- partition = self.node.partitions[slot_id]
- image_type = {"S2_ELF": 2, "SOC_ELF": 3, "CDB": 10}[image_type]
- if partition.image_type != image_type:
- raise IpmiError
+ def retrieve_firmware(self, filename, slot_id, image_type, tftp_address):
+ self.executed.append(("retrieve_firmware", filename,
+ slot_id, image_type, tftp_address))
+ self.partitions[slot_id].retrieves += 1
- # Download from TFTP server
+ # Upload blank image to tftp
+ work_dir = tempfile.mkdtemp()
+ open("%s/%s" % (work_dir, filename), "w").write(create_simg(""))
address, port = tftp_address.split(":")
port = int(port)
tftp = ExternalTftp(address, port)
- tftp.get_file(filename, "%s/%s" % (work_dir, filename))
-
- # Update partition and clean up
- partition.contents = open("%s/%s" % (work_dir, filename)).read()
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
shutil.rmtree(work_dir)
- # Return result
class Result:
- def __init__(self, handle=0):
- self.tftp_handle_id = handle
+ def __init__(self):
+ self.tftp_handle_id = 0
return Result()
def get_firmware_status(self, handle):
+ self.executed.append("get_firmware_status")
+
class Result:
- def __init__(self, status=None):
- if status == None:
- self.status = random.choice(["In progress", "Complete"])
- else:
- self.status = status
+ def __init__(self):
+ self.status = "Complete"
return Result()
def check_firmware(self, slot_id):
- class Result:
- def __init__(self, partition):
- header = get_simg_header(partition.contents)
- if valid_simg(partition.contents):
- self.crc32 = header.crc32
- self.error = None
- else:
- # TODO: what's the real error?
- self.error = True
+ self.executed.append(("check_firmware", slot_id))
+ self.partitions[slot_id].checks += 1
- return Result(self.node.partitions[slot_id])
+ class Result:
+ def __init__(self):
+ self.crc32 = 0
+ self.error = None
+ return Result()
def activate_firmware(self, slot_id):
- partition = self.node.partitions[slot_id]
- header = get_simg_header(partition.contents)
- header.flags = header.flags & 0xFFFFFFFE
- partition.contents = str(header) + partition.contents[28:]
+ self.executed.append(("activate_firmware", slot_id))
+ self.partitions[slot_id].activates += 1
def sdr_list(self):
- """ Get sensor info from the node.
+ """ Get sensor info from the node. """
+ self.executed.append("sdr_list")
- Virtual node doesn't currently have any sensors, so just make some up.
- """
- power_value = "%f (+/- 0) Watts" % random.uniform(0.0, 10.0)
- temp_value = "%f (+/- 0) degrees C" % random.uniform(30.0, 50.0)
+ power_value = "%f (+/- 0) Watts" % random.uniform(0, 10)
+ temp_value = "%f (+/- 0) degrees C" % random.uniform(30, 50)
sensors = [
TestSensor("Node Power", power_value),
TestSensor("Board Temp", temp_value)
]
+
return sensors
+
+class DummyUbootEnv(UbootEnv):
+ def get_boot_order(self):
+ return ["disk", "pxe"]
diff --git a/run_tests b/run_tests
index 51266c5..1d90495 100755
--- a/run_tests
+++ b/run_tests
@@ -5,7 +5,7 @@
import unittest
from cxmanage_test import tftp_test, image_test, target_test, controller_test
-test_modules = [tftp_test, image_test, controller_test]
+test_modules = [tftp_test, image_test, target_test, controller_test]
def main():
""" Load and run tests """