summaryrefslogtreecommitdiff
path: root/cxmanage_api/tests
diff options
context:
space:
mode:
authorGeorge Kraft <george.kraft@calxeda.com>2013-12-04 10:39:57 -0600
committerGeorge Kraft <george.kraft@calxeda.com>2013-12-04 10:39:57 -0600
commitbcebe16d59cd1a24321ac4b2bd2931f3df5ce102 (patch)
tree73cff0eaf90927ec673b910bf28ebd407be3bbd4 /cxmanage_api/tests
parentd96b517e34b02becab36d692803617bc39fdfee3 (diff)
downloadcxmanage-bcebe16d59cd1a24321ac4b2bd2931f3df5ce102.tar.gz
CXMAN-264: Rename cxmanage_test to cxmanage_api.tests
So everything is a subpackage of cxmanage_api! Whoo
Diffstat (limited to 'cxmanage_api/tests')
-rw-r--r--cxmanage_api/tests/__init__.py60
-rw-r--r--cxmanage_api/tests/fabric_test.py712
-rw-r--r--cxmanage_api/tests/image_test.py92
-rw-r--r--cxmanage_api/tests/node_test.py1107
-rw-r--r--cxmanage_api/tests/tasks_test.py81
-rw-r--r--cxmanage_api/tests/tftp_test.py142
6 files changed, 2194 insertions, 0 deletions
diff --git a/cxmanage_api/tests/__init__.py b/cxmanage_api/tests/__init__.py
new file mode 100644
index 0000000..d8d5307
--- /dev/null
+++ b/cxmanage_api/tests/__init__.py
@@ -0,0 +1,60 @@
+"""Calxeda: __init__.py"""
+
+
+# Copyright (c) 2012-2013, Calxeda Inc.
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# * Neither the name of Calxeda Inc. nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+
+
+import os
+import random
+import tempfile
+
+from cxmanage_api.image import Image
+
+def random_file(size):
+ """ Create a random file """
+ contents = "".join([chr(random.randint(0, 255)) for _ in range(size)])
+ file_, filename = tempfile.mkstemp(prefix='cxmanage_test-')
+ with os.fdopen(file_, "w") as file_handle:
+ file_handle.write(contents)
+
+ return filename
+
+class TestImage(Image):
+ """TestImage Class."""
+ def verify(self):
+ return True
+
+# pylint: disable=R0903
+class TestSensor(object):
+ """ Sensor result from bmc/target """
+ def __init__(self, sensor_name, sensor_reading):
+ self.sensor_name = sensor_name
+ self.sensor_reading = sensor_reading
diff --git a/cxmanage_api/tests/fabric_test.py b/cxmanage_api/tests/fabric_test.py
new file mode 100644
index 0000000..03a253c
--- /dev/null
+++ b/cxmanage_api/tests/fabric_test.py
@@ -0,0 +1,712 @@
+"""Calxeda: fabric_test.py """
+
+# Copyright (c) 2012-2013, Calxeda Inc.
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# * Neither the name of Calxeda Inc. nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+
+import random
+import time
+import unittest
+
+from cxmanage_api.fabric import Fabric
+from cxmanage_api.tftp import InternalTftp, ExternalTftp
+from cxmanage_api.firmware_package import FirmwarePackage
+from cxmanage_api.ubootenv import UbootEnv
+from cxmanage_api.cx_exceptions import CommandFailedError
+from cxmanage_api.tests import TestSensor
+from cxmanage_api.tests.node_test import DummyBMC
+from pyipmi import make_bmc
+
+NUM_NODES = 128
+ADDRESSES = ["192.168.100.%i" % x for x in range(1, NUM_NODES + 1)]
+
+
+# pylint: disable=R0904
+class FabricTest(unittest.TestCase):
+ """ Test the various Fabric commands """
+ def setUp(self):
+ # Set up the controller and add targets
+ self.fabric = Fabric("192.168.100.1", node=DummyNode)
+ self.nodes = [DummyNode(i) for i in ADDRESSES]
+ # pylint: disable=W0212
+ self.fabric._nodes = dict((i, self.nodes[i])
+ for i in xrange(NUM_NODES))
+
+ def test_tftp(self):
+ """ Test the tftp property """
+ tftp = InternalTftp()
+ self.fabric.tftp = tftp
+ self.assertTrue(self.fabric.tftp is tftp)
+ for node in self.nodes:
+ self.assertTrue(node.tftp is tftp)
+
+ tftp = ExternalTftp("127.0.0.1")
+ self.fabric.tftp = tftp
+ self.assertTrue(self.fabric.tftp is tftp)
+ for node in self.nodes:
+ self.assertTrue(node.tftp is tftp)
+
+ def test_get_mac_addresses(self):
+ """ Test get_mac_addresses command """
+ self.fabric.get_mac_addresses()
+ self.assertEqual(self.nodes[0].executed, ["get_fabric_macaddrs"])
+ for node in self.nodes[1:]:
+ self.assertEqual(node.executed, [])
+
+ def test_get_uplink_info(self):
+ """ Test get_uplink_info command """
+ self.fabric.get_uplink_info()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_uplink_info"])
+
+ def test_get_uplink_speed(self):
+ """ Test get_uplink_speed command """
+ self.fabric.get_uplink_speed()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_uplink_speed"])
+
+ def test_get_uplink(self):
+ """ Test get_uplink command """
+ self.assertEqual(self.fabric.get_uplink(iface=0), 0)
+
+ def test_set_uplink(self):
+ """ Test set_uplink command """
+ iface, uplink = 0, 0
+ self.fabric.set_uplink(iface=iface, uplink=uplink)
+ self.assertEqual(
+ [("fabric_config_set_uplink", iface, uplink)],
+ self.nodes[0].bmc.executed
+ )
+
+ def test_get_sensors(self):
+ """ Test get_sensors command """
+ self.fabric.get_sensors()
+ self.fabric.get_sensors("Node Power")
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_sensors", "get_sensors"])
+
+ def test_get_firmware_info(self):
+ """ Test get_firmware_info command """
+ self.fabric.get_firmware_info()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_firmware_info"])
+
+ def test_is_updatable(self):
+ """ Test is_updatable command """
+ package = FirmwarePackage()
+ self.fabric.is_updatable(package)
+ for node in self.nodes:
+ self.assertEqual(node.executed, [("is_updatable", package)])
+
+ def test_update_firmware(self):
+ """ Test update_firmware command """
+ package = FirmwarePackage()
+ self.fabric.update_firmware(package)
+ for node in self.nodes:
+ self.assertEqual(node.executed, [("update_firmware", package)])
+
+ def test_config_reset(self):
+ """ Test config_reset command """
+ self.fabric.config_reset()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["config_reset"])
+
+ def test_set_boot_order(self):
+ """ Test set_boot_order command """
+ boot_args = "disk0,pxe,retry"
+ self.fabric.set_boot_order(boot_args)
+ for node in self.nodes:
+ self.assertEqual(node.executed, [("set_boot_order", boot_args)])
+
+ def test_get_boot_order(self):
+ """ Test get_boot_order command """
+ self.fabric.get_boot_order()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_boot_order"])
+
+ def test_set_pxe_interface(self):
+ """ Test set_pxe_interface command """
+ self.fabric.set_pxe_interface("eth0")
+ for node in self.nodes:
+ self.assertEqual(node.executed, [("set_pxe_interface", "eth0")])
+
+ def test_get_pxe_interface(self):
+ """ Test get_pxe_interface command """
+ self.fabric.get_pxe_interface()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_pxe_interface"])
+
+ def test_get_versions(self):
+ """ Test get_versions command """
+ self.fabric.get_versions()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_versions"])
+
+ def test_get_ubootenv(self):
+ """ Test get_ubootenv command """
+ self.fabric.get_ubootenv()
+ for node in self.nodes:
+ self.assertEqual(node.executed, ["get_ubootenv"])
+
+ def test_ipmitool_command(self):
+ """ Test ipmitool_command command """
+ ipmitool_args = "power status"
+ self.fabric.ipmitool_command(ipmitool_args)
+ for node in self.nodes:
+ self.assertEqual(
+ node.executed, [("ipmitool_command", ipmitool_args)]
+ )
+
+ def test_get_server_ip(self):
+ """ Test get_server_ip command """
+ self.fabric.get_server_ip("interface", "ipv6", "user", "password",
+ "aggressive")
+ for node in self.nodes:
+ self.assertEqual(node.executed, [("get_server_ip", "interface",
+ "ipv6", "user", "password", "aggressive")])
+
+ def test_failed_command(self):
+ """ Test a failed command """
+ fail_nodes = [DummyFailNode(i) for i in ADDRESSES]
+ # pylint: disable=W0212
+ self.fabric._nodes = dict(
+ (i, fail_nodes[i]) for i in xrange(NUM_NODES)
+ )
+ try:
+ self.fabric.get_power()
+ self.fail()
+ except CommandFailedError:
+ for node in fail_nodes:
+ self.assertEqual(node.executed, ["get_power"])
+
+ def test_primary_node(self):
+ """Test the primary_node property
+
+ Currently it should always return node 0.
+ """
+ self.assertEqual(self.fabric.primary_node, self.nodes[0])
+
+ def test_get_ipsrc(self):
+ """Test the get_ipsrc method
+
+ """
+ self.fabric.get_ipsrc()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_get_ip_src', bmc.executed)
+
+ def test_set_ipsrc(self):
+ """Test the set_ipsrc method"""
+
+ ipsrc = random.randint(1, 2)
+
+ self.fabric.set_ipsrc(ipsrc)
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_set_ip_src', bmc.executed)
+
+ # fabric_ipsrc is just part of DummyBMC - not a real bmc attribute
+ # it's there to make sure the ipsrc_mode value gets passed to the bmc.
+ self.assertEqual(bmc.fabric_ipsrc, ipsrc)
+
+ def test_apply_fdc(self):
+ """Test the apply_factory_default_config method"""
+
+ self.fabric.apply_factory_default_config()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_factory_default', bmc.executed)
+
+ def test_get_ipaddr_base(self):
+ """Test the get_ipaddr_base method"""
+ ipaddr_base = self.fabric.get_ipaddr_base()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_get_ip_addr_base', bmc.executed)
+ self.assertEqual(bmc.ipaddr_base, ipaddr_base)
+
+ def test_update_config(self):
+ """Test the update_config method
+
+ """
+ self.fabric.update_config()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_update_config', bmc.executed)
+
+ def test_get_linkspeed(self):
+ """Test the get_linkspeed method
+
+ """
+ self.fabric.get_linkspeed()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_get_linkspeed', bmc.executed)
+
+ def test_set_linkspeed(self):
+ """Test the set_linkspeed method"""
+
+ valid_linkspeeds = [1, 2.5, 5, 7.5, 10]
+ linkspeed = random.choice(valid_linkspeeds)
+
+ self.fabric.set_linkspeed(linkspeed)
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_set_linkspeed', bmc.executed)
+
+ # fabric_linkspeed is just part of DummyBMC - not a real bmc attribute
+ # it's there to make sure the ipsrc_mode value gets passed to the bmc.
+ self.assertEqual(bmc.fabric_linkspeed, linkspeed)
+
+ def test_get_linkspeed_policy(self):
+ """Test the get_linkspeed_policy method
+
+ """
+ self.fabric.get_linkspeed_policy()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_get_linkspeed_policy', bmc.executed)
+
+ def test_set_linkspeed_policy(self):
+ """Test the set_linkspeed_policy method"""
+
+ ls_policy = random.randint(0, 1)
+
+ self.fabric.set_linkspeed_policy(ls_policy)
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_set_linkspeed_policy', bmc.executed)
+
+ # fabric_ls_policy is just part of DummyBMC - not a real bmc attribute
+ # it's there to make sure the ipsrc_mode value gets passed to the bmc.
+ self.assertEqual(bmc.fabric_ls_policy, ls_policy)
+
+ def test_get_link_stats(self):
+ """Test the get_link_stats() method."""
+ for i in range(0, 5):
+ self.fabric.get_link_stats(i)
+ for node in self.fabric.nodes.values():
+ self.assertIn(('get_link_stats', i), node.executed)
+
+ def test_get_linkmap(self):
+ """Test the get_linkmap method"""
+ self.fabric.get_linkmap()
+ for node in self.fabric.nodes.values():
+ self.assertIn('get_linkmap', node.executed)
+
+ def test_get_routing_table(self):
+ """Test the get_routing_table method"""
+ self.fabric.get_routing_table()
+ for node in self.fabric.nodes.values():
+ self.assertIn('get_routing_table', node.executed)
+
+ def test_get_depth_chart(self):
+ """Test the depth_chart method"""
+ self.fabric.get_depth_chart()
+ for node in self.fabric.nodes.values():
+ self.assertIn('get_depth_chart', node.executed)
+
+ def test_get_link_users_factor(self):
+ """Test the get_link_users_factor method
+
+ """
+ self.fabric.get_link_users_factor()
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_get_link_users_factor', bmc.executed)
+
+ def test_set_link_users_factor(self):
+ """Test the set_link_users_factor method"""
+
+ lu_factor = random.randint(5, 50)
+
+ self.fabric.set_link_users_factor(lu_factor)
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn('fabric_config_set_link_users_factor', bmc.executed)
+
+ # fabric_lu_factor is just part of DummyBMC - not a real bmc attribute
+ # it's there to make sure the ipsrc_mode value gets passed to the bmc.
+ self.assertEqual(bmc.fabric_lu_factor, lu_factor)
+
+ def test_add_macaddr (self):
+ """Test the add_macaddr method"""
+
+ valid_nodeids = [0, 1, 2, 3]
+ t_nodeid = random.choice(valid_nodeids)
+
+ valid_ifaces = [0, 1, 2]
+ t_iface = random.choice(valid_ifaces)
+
+ t_macaddr = "66:55:44:33:22:11"
+
+ self.fabric.add_macaddr (t_nodeid, t_iface, t_macaddr)
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn ('fabric_add_macaddr', bmc.executed)
+
+ def test_rm_macaddr (self):
+ """Test the rm_macaddr method"""
+
+ valid_nodeids = [0, 1, 2, 3]
+ t_nodeid = random.choice(valid_nodeids)
+
+ valid_ifaces = [0, 1, 2]
+ t_iface = random.choice(valid_ifaces)
+
+ t_macaddr = "66:55:44:33:22:11"
+
+ self.fabric.rm_macaddr (t_nodeid, t_iface, t_macaddr)
+ bmc = self.fabric.primary_node.bmc
+ self.assertIn ('fabric_rm_macaddr', bmc.executed)
+
+ def test_set_macaddr_base(self):
+ """Test the set_macaddr_base method"""
+ self.fabric.set_macaddr_base("00:11:22:33:44:55")
+ for node in self.fabric.nodes.values():
+ if node == self.fabric.primary_node:
+ self.assertEqual(
+ node.bmc.executed,
+ [("fabric_config_set_macaddr_base", "00:11:22:33:44:55")]
+ )
+ else:
+ self.assertEqual(node.bmc.executed, [])
+
+ def test_get_macaddr_base(self):
+ """Test the get_macaddr_base method"""
+ self.assertEqual(self.fabric.get_macaddr_base(), "00:00:00:00:00:00")
+ for node in self.fabric.nodes.values():
+ if node == self.fabric.primary_node:
+ self.assertEqual(
+ node.bmc.executed,
+ ["fabric_config_get_macaddr_base"]
+ )
+ else:
+ self.assertEqual(node.bmc.executed, [])
+
+ def test_set_macaddr_mask(self):
+ """Test the set_macaddr_mask method"""
+ self.fabric.set_macaddr_mask("00:11:22:33:44:55")
+ for node in self.fabric.nodes.values():
+ if node == self.fabric.primary_node:
+ self.assertEqual(
+ node.bmc.executed,
+ [("fabric_config_set_macaddr_mask", "00:11:22:33:44:55")]
+ )
+ else:
+ self.assertEqual(node.bmc.executed, [])
+
+ def test_get_macaddr_mask(self):
+ """Test the get_macaddr_mask method"""
+ self.assertEqual(self.fabric.get_macaddr_mask(), "00:00:00:00:00:00")
+ for node in self.fabric.nodes.values():
+ if node == self.fabric.primary_node:
+ self.assertEqual(
+ node.bmc.executed,
+ ["fabric_config_get_macaddr_mask"]
+ )
+ else:
+ self.assertEqual(node.bmc.executed, [])
+
+ def test_composite_bmc(self):
+ """ Test the CompositeBMC member """
+ with self.assertRaises(AttributeError):
+ self.fabric.cbmc.fake_method()
+
+ self.fabric.cbmc.set_chassis_power("off")
+ results = self.fabric.cbmc.get_chassis_status()
+
+ self.assertEqual(len(results), len(self.fabric.nodes))
+ for node_id in self.fabric.nodes:
+ self.assertFalse(results[node_id].power_on)
+
+ for node in self.fabric.nodes.values():
+ self.assertEqual(node.bmc.executed, [
+ ("set_chassis_power", "off"),
+ "get_chassis_status"
+ ])
+
+
+class DummyNode(object):
+ """ Dummy node for the nodemanager tests """
+
+ # pylint: disable=W0613
+ def __init__(self, ip_address, username="admin", password="admin",
+ tftp=None, *args, **kwargs):
+ self.executed = []
+ self.power_state = False
+ self.ip_address = ip_address
+ self.tftp = tftp
+ self.sel = []
+ self.bmc = make_bmc(DummyBMC, hostname=ip_address, username=username,
+ password=password, verbose=False)
+ #
+ # For now, we hard code this to 0 ...
+ #
+ self._chassis_id = 0
+
+ @property
+ def guid(self):
+ """Returns the node GUID"""
+ return self.bmc.guid().system_guid
+
+ @property
+ def chassis_id(self):
+ """Returns the chasis ID."""
+ return self._chassis_id
+
+ def get_sel(self):
+ """Simulate get_sel()"""
+ self.executed.append('get_sel')
+ return self.sel
+
+ def get_power(self):
+ """Simulate get_power(). """
+ self.executed.append("get_power")
+ return self.power_state
+
+ def set_power(self, mode):
+ """Simulate set_power(). """
+ self.executed.append(("set_power", mode))
+
+ def get_power_policy(self):
+ """Simulate get_power_policy(). """
+ self.executed.append("get_power_policy")
+ return "always-off"
+
+ def set_power_policy(self, mode):
+ """Simulate set_power_policy(). """
+ self.executed.append(("set_power_policy", mode))
+
+ def mc_reset(self):
+ """Simulate mc_reset(). """
+ self.executed.append("mc_reset")
+
+ def get_firmware_info(self):
+ """Simulate get_firmware_info(). """
+ self.executed.append("get_firmware_info")
+
+ def is_updatable(self, package, partition_arg="INACTIVE", priority=None):
+ """Simulate is_updateable(). """
+ self.executed.append(("is_updatable", package))
+
+ def update_firmware(self, package, partition_arg="INACTIVE",
+ priority=None):
+ """Simulate update_firmware(). """
+ self.executed.append(("update_firmware", package))
+ time.sleep(random.randint(0, 2))
+
+ def get_sensors(self, name=""):
+ """Simulate get_sensors(). """
+ self.executed.append("get_sensors")
+ power_value = "%f (+/- 0) Watts" % random.uniform(0, 10)
+ temp_value = "%f (+/- 0) degrees C" % random.uniform(30, 50)
+ sensors = [
+ TestSensor("Node Power", power_value),
+ TestSensor("Board Temp", temp_value)
+ ]
+ return [s for s in sensors if name.lower() in s.sensor_name.lower()]
+
+ def config_reset(self):
+ """Simulate config_reset(). """
+ self.executed.append("config_reset")
+
+ def set_boot_order(self, boot_args):
+ """Simulate set_boot_order()."""
+ self.executed.append(("set_boot_order", boot_args))
+
+ def get_boot_order(self):
+ """Simulate get_boot_order(). """
+ self.executed.append("get_boot_order")
+ return ["disk", "pxe"]
+
+ def set_pxe_interface(self, interface):
+ """Simulate set_pxe_interface(). """
+ self.executed.append(("set_pxe_interface", interface))
+
+ def get_pxe_interface(self):
+ """Simulate get_pxe_interface(). """
+ self.executed.append("get_pxe_interface")
+ return "eth0"
+
+ def get_versions(self):
+ """Simulate get_versions(). """
+ self.executed.append("get_versions")
+
+ # pylint: disable=R0902, R0903
+ class Result(object):
+ """Result Class. """
+ def __init__(self):
+ self.header = "Calxeda SoC (0x0096CD)"
+ self.hardware_version = "TestBoard X00"
+ self.firmware_version = "v0.0.0"
+ self.ecme_version = "v0.0.0"
+ self.ecme_timestamp = "0"
+ self.a9boot_version = "v0.0.0"
+ self.uboot_version = "v0.0.0"
+ self.chip_name = "Unknown"
+ return Result()
+
+ def ipmitool_command(self, ipmitool_args):
+ """Simulate ipmitool_command(). """
+ self.executed.append(("ipmitool_command", ipmitool_args))
+ return "Dummy output"
+
+ def get_ubootenv(self):
+ """Simulate get_ubootenv(). """
+ self.executed.append("get_ubootenv")
+
+ ubootenv = UbootEnv()
+ ubootenv.variables["bootcmd0"] = "run bootcmd_default"
+ ubootenv.variables["bootcmd_default"] = "run bootcmd_sata"
+ return ubootenv
+
+ @staticmethod
+ def get_fabric_ipinfo():
+ """Simulates get_fabric_ipinfo(). """
+ return {}
+
+ # pylint: disable=R0913
+ def get_server_ip(self, interface=None, ipv6=False, user="user1",
+ password="1Password", aggressive=False):
+ """Simulate get_server_ip(). """
+ self.executed.append(("get_server_ip", interface, ipv6, user, password,
+ aggressive))
+ return "192.168.200.1"
+
+ def get_fabric_macaddrs(self):
+ """Simulate get_fabric_macaddrs(). """
+ self.executed.append("get_fabric_macaddrs")
+ result = {}
+ for node in range(NUM_NODES):
+ result[node] = {}
+ for port in range(3):
+ address = "00:00:00:00:%02x:%02x" % (node, port)
+ result[node][port] = address
+ return result
+
+ def get_fabric_uplink_info(self):
+ """Simulate get_fabric_uplink_info(). """
+ self.executed.append('get_fabric_uplink_info')
+ results = {}
+ for nid in range(1, NUM_NODES):
+ results[nid] = {'eth0': 0, 'eth1': 0, 'mgmt': 0}
+ return results
+
+ def get_uplink_info(self):
+ """Simulate get_uplink_info(). """
+ self.executed.append('get_uplink_info')
+ return 'Node 0: eth0 0, eth1 0, mgmt 0'
+
+ def get_uplink_speed(self):
+ """Simulate get_uplink_speed(). """
+ self.executed.append('get_uplink_speed')
+ return 1
+
+ def get_link_stats(self, link=0):
+ """Simulate get_link_stats(). """
+ self.executed.append(('get_link_stats', link))
+ return {
+ 'FS_LC%s_BYTE_CNT_0' % link: '0x0',
+ 'FS_LC%s_BYTE_CNT_1' % link: '0x0',
+ 'FS_LC%s_CFG_0' % link: '0x1030107f',
+ 'FS_LC%s_CFG_1' % link: '0x104f',
+ 'FS_LC%s_CM_RXDATA_0' % link: '0x0',
+ 'FS_LC%s_CM_RXDATA_1' % link: '0x0',
+ 'FS_LC%s_CM_TXDATA_0' % link: '0x0',
+ 'FS_LC%s_CM_TXDATA_1' % link: '0x0',
+ 'FS_LC%s_PKT_CNT_0' % link: '0x0',
+ 'FS_LC%s_PKT_CNT_1' % link: '0x0',
+ 'FS_LC%s_RDRPSCNT' % link: '0x0',
+ 'FS_LC%s_RERRSCNT' % link: '0x0',
+ 'FS_LC%sRMCSCNT' % link: '0x0',
+ 'FS_LC%s_RPKTSCNT' % link: '0x0',
+ 'FS_LC%s_RUCSCNT' % link: '0x0',
+ 'FS_LC%s_SC_STAT' % link: '0x0',
+ 'FS_LC%s_STATE' % link: '0x1033',
+ 'FS_LC%s_TDRPSCNT' % link: '0x0',
+ 'FS_LC%s_TPKTSCNT' % link: '0x1'
+ }
+
+ def get_linkmap(self):
+ """Simulate get_linkmap(). """
+ self.executed.append('get_linkmap')
+ results = {}
+ for nid in range(0, NUM_NODES):
+ results[nid] = {nid: {1: 2, 3: 1, 4: 3}}
+ return results
+
+ def get_routing_table(self):
+ """Simulate get_routing_table(). """
+ self.executed.append('get_routing_table')
+ results = {}
+ for nid in range(0, NUM_NODES):
+ results[nid] = {nid: {1: [0, 0, 0, 3, 0],
+ 2: [0, 3, 0, 0, 2],
+ 3: [0, 2, 0, 0, 3]}}
+ return results
+
+ def get_depth_chart(self):
+ """Simulate get_depth_chart(). """
+ self.executed.append('get_depth_chart')
+ results = {}
+ for nid in range(0, NUM_NODES):
+ results[nid] = {nid: {1: {'shortest': (0, 0)},
+ 2: {'hops': [(3, 1)], 'shortest': (0, 0)},
+ 3: {'hops': [(2, 1)], 'shortest': (0, 0)}}}
+ return results
+
+ def get_uplink(self, iface):
+ """Simulate get_uplink(). """
+ self.executed.append(('get_uplink', iface))
+ return 0
+
+ def set_uplink(self, uplink, iface):
+ """Simulate set_uplink(). """
+ self.executed.append(('set_uplink', uplink, iface))
+
+ def get_node_fru_version(self):
+ """Simulate get_node_fru_version(). """
+ self.executed.append("get_node_fru_version")
+ return "0.0"
+
+ def get_slot_fru_version(self):
+ """Simulate get_slot_fru_version(). """
+ self.executed.append("get_slot_fru_version")
+ return "0.0"
+
+
+class DummyFailNode(DummyNode):
+ """ Dummy node that should fail on some commands """
+
+ class DummyFailError(Exception):
+ """Dummy Fail Error class."""
+ pass
+
+ def get_power(self):
+ """Simulate get_power(). """
+ self.executed.append("get_power")
+ raise DummyFailNode.DummyFailError
+
+# pylint: disable=R0903
+class DummyImage(object):
+ """Dummy Image class."""
+
+ def __init__(self, filename, image_type, *args):
+ self.filename = filename
+ self.type = image_type
+ self.args = args
diff --git a/cxmanage_api/tests/image_test.py b/cxmanage_api/tests/image_test.py
new file mode 100644
index 0000000..f84f5c9
--- /dev/null
+++ b/cxmanage_api/tests/image_test.py
@@ -0,0 +1,92 @@
+"""Calxeda: image_test.py"""
+
+
+# Copyright (c) 2012-2013, Calxeda Inc.
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# * Neither the name of Calxeda Inc. nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+
+
+import os
+import shutil
+import tempfile
+import unittest
+
+from cxmanage_api.simg import get_simg_header
+from cxmanage_api.tftp import InternalTftp
+from cxmanage_api.tests import random_file, TestImage
+
+
+# pylint: disable=R0904
+class ImageTest(unittest.TestCase):
+ """ Tests involving cxmanage images
+
+ These will rely on an internally hosted TFTP server. """
+
+ def setUp(self):
+ # Set up an internal server
+ self.tftp = InternalTftp()
+ self.work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+
+ def tearDown(self):
+ shutil.rmtree(self.work_dir)
+
+ def test_render_to_simg(self):
+ """ Test image creation and upload """
+ imglen = 1024
+ priority = 1
+ daddr = 12345
+
+ # Create image
+ filename = random_file(imglen)
+ contents = open(filename).read()
+ image = TestImage(filename, "RAW")
+
+ # Render and examine image
+ filename = image.render_to_simg(priority, daddr)
+ simg = open(filename).read()
+ header = get_simg_header(simg)
+ self.assertEqual(header.priority, priority)
+ self.assertEqual(header.imglen, imglen)
+ self.assertEqual(header.daddr, daddr)
+ self.assertEqual(simg[header.imgoff:], contents)
+
+ @staticmethod
+ def test_multiple_uploads():
+ """ Test to make sure FDs are being closed """
+ # Create image
+ filename = random_file(1024)
+ image = TestImage(filename, "RAW")
+
+ for _ in xrange(2048):
+ image.render_to_simg(0, 0)
+
+ os.remove(filename)
+
+# End of file: ./image_test.py
+
diff --git a/cxmanage_api/tests/node_test.py b/cxmanage_api/tests/node_test.py
new file mode 100644
index 0000000..cdd2dae
--- /dev/null
+++ b/cxmanage_api/tests/node_test.py
@@ -0,0 +1,1107 @@
+# pylint: disable=C0302
+"""Unit tests for the Node class."""
+
+
+# Copyright (c) 2012-2013, Calxeda Inc.
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# * Neither the name of Calxeda Inc. nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+
+
+import random
+import shutil
+import tempfile
+import unittest
+
+from pyipmi import IpmiError
+from pyipmi.bmc import LanBMC
+
+from cxmanage_api.tests import TestImage, TestSensor, random_file
+from cxmanage_api import temp_file
+from cxmanage_api.simg import create_simg, get_simg_header
+from cxmanage_api.node import Node
+from cxmanage_api.tftp import InternalTftp, ExternalTftp
+from cxmanage_api.ubootenv import UbootEnv
+from cxmanage_api.firmware_package import FirmwarePackage
+from cxmanage_api.cx_exceptions import IPDiscoveryError
+
+
+NUM_NODES = 4
+ADDRESSES = ["192.168.100.%i" % n for n in range(1, NUM_NODES + 1)]
+TFTP = InternalTftp()
+
+
+# pylint: disable=R0904, W0201
+class NodeTest(unittest.TestCase):
+ """ Tests involving cxmanage Nodes """
+
+ def setUp(self):
+ self.nodes = [Node(ip_address=ip, tftp=TFTP, bmc=DummyBMC,
+ image=TestImage, ubootenv=DummyUbootEnv,
+ ipretriever=DummyIPRetriever, verbose=True)
+ for ip in ADDRESSES]
+
+ self.server_ip = None
+ self.fabric_ipsrc = None
+ self.fabric_ls_policy = None
+ self.fabric_linkspeed = None
+ self.fabric_lu_factor = None
+
+ # Give each node a node_id
+ count = 0
+ for node in self.nodes:
+ node.node_id = count
+ count = count + 1
+
+ # Set up an internal server
+ self.work_dir = tempfile.mkdtemp(prefix="cxmanage_node_test-")
+
+ def tearDown(self):
+ shutil.rmtree(self.work_dir, ignore_errors=True)
+
+ def test_get_power(self):
+ """ Test node.get_power method """
+ for node in self.nodes:
+ result = node.get_power()
+
+ self.assertEqual(node.bmc.executed, ["get_chassis_status"])
+ self.assertEqual(result, False)
+
+ def test_set_power(self):
+ """ Test node.set_power method """
+ for node in self.nodes:
+ modes = ["off", "on", "reset", "off"]
+ for mode in modes:
+ node.set_power(mode)
+
+ self.assertEqual(node.bmc.executed,
+ [("set_chassis_power", x) for x in modes])
+
+ def test_get_power_policy(self):
+ """ Test node.get_power_policy method """
+ for node in self.nodes:
+ result = node.get_power_policy()
+
+ self.assertEqual(node.bmc.executed, ["get_chassis_status"])
+ self.assertEqual(result, "always-off")
+
+ def test_set_power_policy(self):
+ """ Test node.set_power_policy method """
+ for node in self.nodes:
+ modes = ["always-on", "previous", "always-off"]
+ for mode in modes:
+ node.set_power_policy(mode)
+
+ self.assertEqual(node.bmc.executed,
+ [("set_chassis_policy", x) for x in modes])
+
+ def test_get_sensors(self):
+ """ Test node.get_sensors method """
+ for node in self.nodes:
+ result = node.get_sensors()
+
+ self.assertEqual(node.bmc.executed, ["sdr_list"])
+
+ self.assertEqual(len(result), 2)
+ self.assertTrue(
+ result["Node Power"].sensor_reading.endswith("Watts")
+ )
+ self.assertTrue(
+ result["Board Temp"].sensor_reading.endswith("degrees C")
+ )
+
+ def test_is_updatable(self):
+ """ Test node.is_updatable method """
+ for node in self.nodes:
+ max_size = 12288 - 60
+ filename = random_file(max_size)
+ images = [
+ TestImage(filename, "SOC_ELF"),
+ TestImage(filename, "CDB"),
+ TestImage(filename, "UBOOTENV")
+ ]
+
+ # should pass
+ package = FirmwarePackage()
+ package.images = images
+ self.assertTrue(node.is_updatable(package))
+
+ # should fail if the firmware version is wrong
+ package = FirmwarePackage()
+ package.images = images
+ package.version = "ECX-31415-v0.0.0"
+ self.assertFalse(node.is_updatable(package))
+
+ # should fail if we specify a socman version
+ package = FirmwarePackage()
+ package.images = images
+ package.required_socman_version = "0.0.1"
+ self.assertFalse(node.is_updatable(package))
+
+ # should fail if we try to upload a slot2
+ package = FirmwarePackage()
+ package.images = images
+ package.config = "slot2"
+ self.assertFalse(node.is_updatable(package))
+
+ # should fail if we upload an image that's too large
+ package = FirmwarePackage()
+ package.images = [TestImage(random_file(max_size + 1), "UBOOTENV")]
+ self.assertFalse(node.is_updatable(package))
+
+ # should fail if we upload to a CDB partition that's in use
+ package = FirmwarePackage()
+ package.images = images
+ self.assertFalse(node.is_updatable(package, partition_arg="ACTIVE"))
+
+ def test_update_firmware(self):
+ """ Test node.update_firmware method """
+ filename = "%s/%s" % (self.work_dir, "image.bin")
+ open(filename, "w").write("")
+
+ package = FirmwarePackage()
+ package.images = [
+ TestImage(filename, "SOC_ELF"),
+ TestImage(filename, "CDB"),
+ TestImage(filename, "UBOOTENV")
+ ]
+ package.version = "0.0.1"
+
+ for node in self.nodes:
+ node.update_firmware(package)
+
+ partitions = node.bmc.partitions
+ unchanged_partitions = [partitions[x] for x in [0, 1, 4]]
+ changed_partitions = [partitions[x] for x in [2, 3, 6]]
+ ubootenv_partition = partitions[5]
+
+ for partition in unchanged_partitions:
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ for partition in changed_partitions:
+ self.assertEqual(partition.updates, 1)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 2)
+ self.assertEqual(partition.activates, 1)
+
+ self.assertEqual(ubootenv_partition.updates, 1)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 2)
+ self.assertEqual(ubootenv_partition.activates, 1)
+
+ self.assertTrue(("set_firmware_version", "0.0.1")
+ in node.bmc.executed)
+
+ def test_config_reset(self):
+ """ Test node.config_reset method """
+ for node in self.nodes:
+ node.config_reset()
+
+ # Assert config reset
+ executed = node.bmc.executed
+ self.assertEqual(
+ len([x for x in executed if x == "reset_firmware"]), 1)
+
+ # Assert sel clear
+ self.assertEqual(
+ len([x for x in executed if x == "sel_clear"]), 1)
+
+ # Assert ubootenv changes
+ active = node.bmc.partitions[5]
+ inactive = node.bmc.partitions[6]
+ self.assertEqual(active.updates, 1)
+ self.assertEqual(active.retrieves, 0)
+ self.assertEqual(active.checks, 1)
+ self.assertEqual(active.activates, 1)
+ self.assertEqual(inactive.updates, 0)
+ self.assertEqual(inactive.retrieves, 1)
+ self.assertEqual(inactive.checks, 0)
+ self.assertEqual(inactive.activates, 0)
+
+ def test_set_boot_order(self):
+ """ Test node.set_boot_order method """
+ boot_args = ["disk", "pxe", "retry"]
+ for node in self.nodes:
+ node.set_boot_order(boot_args)
+
+ partitions = node.bmc.partitions
+ ubootenv_partition = partitions[5]
+ unchanged_partitions = [x for x in partitions
+ if x != ubootenv_partition]
+
+ self.assertEqual(ubootenv_partition.updates, 1)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 1)
+ self.assertEqual(ubootenv_partition.activates, 1)
+
+ for partition in unchanged_partitions:
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ def test_get_boot_order(self):
+ """ Test node.get_boot_order method """
+ for node in self.nodes:
+ result = node.get_boot_order()
+
+ partitions = node.bmc.partitions
+ ubootenv_partition = partitions[5]
+ unchanged_partitions = [x for x in partitions
+ if x != ubootenv_partition]
+
+ self.assertEqual(ubootenv_partition.updates, 0)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 0)
+ self.assertEqual(ubootenv_partition.activates, 0)
+
+ for partition in unchanged_partitions:
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ self.assertEqual(result, ["disk", "pxe"])
+
+ def test_set_pxe_interface(self):
+ """ Test node.set_pxe_interface method """
+ for node in self.nodes:
+ node.set_pxe_interface("eth0")
+
+ partitions = node.bmc.partitions
+ ubootenv_partition = partitions[5]
+ unchanged_partitions = [x for x in partitions
+ if x != ubootenv_partition]
+
+ self.assertEqual(ubootenv_partition.updates, 1)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 1)
+ self.assertEqual(ubootenv_partition.activates, 1)
+
+ for partition in unchanged_partitions:
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ def test_get_pxe_interface(self):
+ """ Test node.get_pxe_interface method """
+ for node in self.nodes:
+ result = node.get_pxe_interface()
+
+ partitions = node.bmc.partitions
+ ubootenv_partition = partitions[5]
+ unchanged_partitions = [x for x in partitions
+ if x != ubootenv_partition]
+
+ self.assertEqual(ubootenv_partition.updates, 0)
+ self.assertEqual(ubootenv_partition.retrieves, 1)
+ self.assertEqual(ubootenv_partition.checks, 0)
+ self.assertEqual(ubootenv_partition.activates, 0)
+
+ for partition in unchanged_partitions:
+ self.assertEqual(partition.updates, 0)
+ self.assertEqual(partition.retrieves, 0)
+ self.assertEqual(partition.checks, 0)
+ self.assertEqual(partition.activates, 0)
+
+ self.assertEqual(result, "eth0")
+
+ def test_get_versions(self):
+ """ Test node.get_versions method """
+ for node in self.nodes:
+ result = node.get_versions()
+
+ self.assertEqual(node.bmc.executed, ["get_info_basic",
+ "get_firmware_info", "info_card"])
+ for attr in ["iana", "firmware_version", "ecme_version",
+ "ecme_timestamp"]:
+ self.assertTrue(hasattr(result, attr))
+
+ def test_get_fabric_ipinfo(self):
+ """ Test node.get_fabric_ipinfo method """
+ for node in self.nodes:
+ result = node.get_fabric_ipinfo()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "fabric_config_get_ip_info")
+
+ self.assertEqual(result, dict([(i, ADDRESSES[i])
+ for i in range(NUM_NODES)]))
+
+ def test_get_fabric_macaddrs(self):
+ """ Test node.get_fabric_macaddrs method """
+ for node in self.nodes:
+ result = node.get_fabric_macaddrs()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "fabric_config_get_mac_addresses")
+
+ self.assertEqual(len(result), NUM_NODES)
+ for node_id in xrange(NUM_NODES):
+ self.assertEqual(len(result[node_id]), 3)
+ for port in result[node_id]:
+ expected_macaddr = "00:00:00:00:%x:%x" % (node_id, port)
+ self.assertEqual(result[node_id][port], [expected_macaddr])
+
+ def test_get_fabric_uplink_info(self):
+ """ Test node.get_fabric_uplink_info method """
+ for node in self.nodes:
+ node.get_fabric_uplink_info()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "fabric_config_get_uplink_info")
+
+ def test_get_uplink_info(self):
+ """ Test node.get_uplink_info method """
+ for node in self.nodes:
+ node.get_uplink_info()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "get_uplink_info")
+
+ def test_get_uplink_speed(self):
+ """ Test node.get_uplink_info method """
+ for node in self.nodes:
+ node.get_uplink_speed()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "get_uplink_speed")
+
+
+ def test_get_linkmap(self):
+ """ Test node.get_linkmap method """
+ for node in self.nodes:
+ node.get_linkmap()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "fabric_info_get_link_map")
+
+ def test_get_routing_table(self):
+ """ Test node.get_routing_table method """
+ for node in self.nodes:
+ node.get_routing_table()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "fabric_info_get_routing_table")
+
+ def test_get_depth_chart(self):
+ """ Test node.get_depth_chart method """
+ for node in self.nodes:
+ node.get_depth_chart()
+
+ for found in node.bmc.executed:
+ self.assertEqual(found, "fabric_info_get_depth_chart")
+
+ def test_get_link_stats(self):
+ """ Test node.get_link_stats() """
+ for node in self.nodes:
+ node.get_link_stats()
+ self.assertEqual(node.bmc.executed[0], ('fabric_get_linkstats', 0))
+
+ def test_get_server_ip(self):
+ """ Test node.get_server_ip method """
+ for node in self.nodes:
+ result = node.get_server_ip()
+ self.assertEqual(result, "192.168.200.1")
+
+ def test_get_linkspeed(self):
+ """ Test node.get_linkspeed method """
+ for node in self.nodes:
+ result = node.get_linkspeed()
+ self.assertEqual(result, 1)
+
+ def test_get_uplink(self):
+ """ Test node.get_uplink method"""
+ for node in self.nodes:
+ result = node.get_uplink(iface=0)
+ self.assertEqual(result, 0)
+
+ def test_set_uplink(self):
+ """ Test node.set_uplink method """
+ for node in self.nodes:
+ node.set_uplink(iface=0, uplink=0)
+ self.assertEqual(node.get_uplink(iface=0), 0)
+
+# pylint: disable=R0902
+class DummyBMC(LanBMC):
+ """ Dummy BMC for the node tests """
+
+
+ GUID_UNIQUE = 0
+
+
+ def __init__(self, **kwargs):
+ super(DummyBMC, self).__init__(**kwargs)
+ self.executed = []
+ self.partitions = [
+ Partition(0, 3, 0, 393216, in_use=True), # socman
+ Partition(1, 10, 393216, 196608, in_use=True), # factory cdb
+ Partition(2, 3, 589824, 393216, in_use=False), # socman
+ Partition(3, 10, 983040, 196608, in_use=False), # factory cdb
+ Partition(4, 10, 1179648, 196608, in_use=True), # running cdb
+ Partition(5, 11, 1376256, 12288), # ubootenv
+ Partition(6, 11, 1388544, 12288) # ubootenv
+ ]
+ self.ipaddr_base = '192.168.100.1'
+ self.unique_guid = 'FAKEGUID%s' % DummyBMC.GUID_UNIQUE
+ self.sel = self.generate_sel(with_errors=False)
+
+ DummyBMC.GUID_UNIQUE += 1
+
+ def guid(self):
+ """Returns the GUID"""
+ self.executed.append("guid")
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self, dummybmc):
+ self.system_guid = dummybmc.unique_guid
+ self.time_stamp = None
+ return Result(self)
+
+ def set_chassis_power(self, mode):
+ """ Set chassis power """
+ self.executed.append(("set_chassis_power", mode))
+
+ def get_chassis_status(self):
+ """ Get chassis status """
+ self.executed.append("get_chassis_status")
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ self.power_on = False
+ self.power_restore_policy = "always-off"
+ return Result()
+
+ def set_chassis_policy(self, mode):
+ """ Set chassis restore policy """
+ self.executed.append(("set_chassis_policy", mode))
+
+ def mc_reset(self, mode):
+ """ Reset the MC """
+ self.executed.append(("mc_reset", mode))
+
+ def reset_firmware(self):
+ """ Reset the running CDB """
+ self.executed.append("reset_firmware")
+
+ def sel_clear(self):
+ """ Clear SEL """
+ self.executed.append("sel_clear")
+
+ def sel_elist(self):
+ """ List SEL. with_errors=True simulates a SEL that contains errors """
+ self.executed.append("sel_elist")
+ return self.sel
+
+ @staticmethod
+ def generate_sel(with_errors=False):
+ """ Generates a SEL table for a Node """
+ if (with_errors):
+ return [
+ '1 | 11/20/2013 | 20:26:18 | Memory | Correctable ECC | Asserted',
+ '2 | 11/20/2013 | 20:26:43 | Processor | IERR | Asserted',
+ '83 | 11/14/2013 | 18:01:35 | OS Stop/Shutdown OS Stop Reason | ' +
+ 'Error during system startup | Asserted'
+ ]
+ else:
+ return [
+ '88 | 11/14/2013 | 18:02:29 | System Boot Initiated OS Boot ' +
+ 'Reason | Initiated by power up | Asserted',
+ '91 | 11/14/2013 | 19:24:25 | System Event BMC Status |',
+ ]
+
+ def get_firmware_info(self):
+ """ Get partition and simg info """
+ self.executed.append("get_firmware_info")
+
+ return [x.fwinfo for x in self.partitions]
+
+ def update_firmware(self, filename, partition, image_type, tftp_addr):
+ """ Download a file from a TFTP server to a given partition.
+
+ Make sure the image type matches. """
+ self.executed.append(("update_firmware", filename,
+ partition, image_type, tftp_addr))
+ self.partitions[partition].updates += 1
+
+ localfile = temp_file()
+ TFTP.get_file(filename, localfile)
+
+ contents = open(localfile).read()
+ simg = get_simg_header(contents)
+ self.partitions[partition].fwinfo.offset = "%8x" % simg.imgoff
+ self.partitions[partition].fwinfo.size = "%8x" % simg.imglen
+ self.partitions[partition].fwinfo.priority = "%8x" % simg.priority
+ self.partitions[partition].fwinfo.daddr = "%8x" % simg.daddr
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ """Default constructor for the Result class."""
+ self.tftp_handle_id = 0
+ return Result()
+
+ def retrieve_firmware(self, filename, partition, image_type, tftp_addr):
+ self.executed.append(("retrieve_firmware", filename,
+ partition, image_type, tftp_addr))
+ self.partitions[partition].retrieves += 1
+
+ # Upload blank image to tftp
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+ open("%s/%s" % (work_dir, filename), "w").write(create_simg(""))
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+ shutil.rmtree(work_dir)
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ self.tftp_handle_id = 0
+ return Result()
+
+ def register_firmware_read(self, filename, partition, image_type):
+ self.executed.append(("register_firmware_read", filename, partition,
+ image_type))
+ raise IpmiError()
+
+ def register_firmware_write(self, filename, partition, image_type):
+ self.executed.append(("register_firmware_write", filename, partition,
+ image_type))
+ raise IpmiError()
+
+ def get_firmware_status(self, handle):
+ self.executed.append("get_firmware_status")
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ self.status = "Complete"
+
+ del handle
+
+ return Result()
+
+ def check_firmware(self, partition):
+ self.executed.append(("check_firmware", partition))
+ self.partitions[partition].checks += 1
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ self.crc32 = 0
+ self.error = None
+ return Result()
+
+ def activate_firmware(self, partition):
+ self.executed.append(("activate_firmware", partition))
+ self.partitions[partition].activates += 1
+
+ def set_firmware_version(self, version):
+ self.executed.append(("set_firmware_version", version))
+
+ def sdr_list(self):
+ """ Get sensor info from the node. """
+ self.executed.append("sdr_list")
+
+ power_value = "%f (+/- 0) Watts" % random.uniform(0, 10)
+ temp_value = "%f (+/- 0) degrees C" % random.uniform(30, 50)
+ sensors = [
+ TestSensor("Node Power", power_value),
+ TestSensor("Board Temp", temp_value)
+ ]
+
+ return sensors
+
+ def get_info_basic(self):
+ """ Get basic SoC info from this node """
+ self.executed.append("get_info_basic")
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ self.iana = int("0x0096CD", 16)
+ self.firmware_version = "ECX-0000-v0.0.0"
+ self.ecme_version = "v0.0.0"
+ self.ecme_timestamp = 0
+ return Result()
+
+ def get_info_card(self):
+ self.executed.append("info_card")
+
+ # pylint: disable=R0903
+ class Result(object):
+ """Results class."""
+ def __init__(self):
+ self.type = "TestBoard"
+ self.revision = "0"
+ return Result()
+
+ node_count = 0
+ def fabric_get_node_id(self):
+ self.executed.append('get_node_id')
+ result = DummyBMC.node_count
+ DummyBMC.node_count += 1
+ return result
+
+ def fabric_info_get_link_map(self, filename, tftp_addr=None):
+ """Upload a link_map file from the node to TFTP"""
+ self.executed.append('fabric_info_get_link_map')
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ link_map = []
+ link_map.append('Link 1: Node 2')
+ link_map.append('Link 3: Node 1')
+ link_map.append('Link 4: Node 3')
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+ with open('%s/%s' % (work_dir, filename), 'w') as lm_file:
+ for lmap in link_map:
+ lm_file.write(lmap + '\n')
+ lm_file.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ def fabric_info_get_routing_table(self, filename, tftp_addr=None):
+ """Upload a routing_table file from the node to TFTP"""
+ self.executed.append('fabric_info_get_routing_table')
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ routing_table = []
+ routing_table.append('Node 1: rt - 0.2.0.3.2')
+ routing_table.append('Node 2: rt - 0.3.0.1.2')
+ routing_table.append('Node 3: rt - 0.2.0.1.3')
+ routing_table.append('Node 12: rt - 0.2.0.0.1')
+ routing_table.append('Node 13: rt - 0.2.0.0.1')
+ routing_table.append('Node 14: rt - 0.2.0.0.1')
+ routing_table.append('Node 15: rt - 0.2.0.0.1')
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+ with open('%s/%s' % (work_dir, filename), 'w') as rt_file:
+ for rtable in routing_table:
+ rt_file.write(rtable + '\n')
+ rt_file.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ def fabric_info_get_depth_chart(self, filename, tftp_addr=None):
+ """Upload a depth_chart file from the node to TFTP"""
+ self.executed.append('fabric_info_get_depth_chart')
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ depth_chart = []
+ depth_chart.append(
+ 'Node 1: Shortest Distance 0 hops via neighbor 0: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 2: Shortest Distance 0 hops via neighbor 0: ' +
+ 'other hops/neighbors - 1/3'
+ )
+ depth_chart.append(
+ 'Node 3: Shortest Distance 0 hops via neighbor 0: ' +
+ 'other hops/neighbors - 1/2'
+ )
+ depth_chart.append(
+ 'Node 4: Shortest Distance 2 hops via neighbor 6: ' +
+ 'other hops/neighbors - 3/7'
+ )
+ depth_chart.append(
+ 'Node 5: Shortest Distance 3 hops via neighbor 4: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 6: Shortest Distance 1 hops via neighbor 2: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 7: Shortest Distance 2 hops via neighbor 6: ' +
+ 'other hops/neighbors - 3/4'
+ )
+ depth_chart.append(
+ 'Node 8: Shortest Distance 3 hops via neighbor 10: ' +
+ 'other hops/neighbors - 4/11'
+ )
+ depth_chart.append(
+ 'Node 9: Shortest Distance 4 hops via neighbor 8: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 10: Shortest Distance 2 hops via neighbor 6: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 11: Shortest Distance 3 hops via neighbor 10: ' +
+ 'other hops/neighbors - 4/8'
+ )
+ depth_chart.append(
+ 'Node 12: Shortest Distance 4 hops via neighbor 14: ' +
+ 'other hops/neighbors - 5/15'
+ )
+ depth_chart.append(
+ 'Node 13: Shortest Distance 5 hops via neighbor 12: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 14: Shortest Distance 3 hops via neighbor 10: ' +
+ 'other hops/neighbors -'
+ )
+ depth_chart.append(
+ 'Node 15: Shortest Distance 4 hops via neighbor 14: ' +
+ 'other hops/neighbors - 5/12'
+ )
+
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+ with open('%s/%s' % (work_dir, filename), 'w') as dc_file:
+ for dchart in depth_chart:
+ dc_file.write(dchart + '\n')
+ dc_file.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ # pylint: disable=W0222
+ def fabric_get_linkstats(self, filename, tftp_addr=None,
+ link=None):
+ """Upload a link_stats file from the node to TFTP"""
+ self.executed.append(('fabric_get_linkstats', link))
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ link_stats = []
+ link_stats.append('Packet Counts for Link %s:' % link)
+ link_stats.append('Link0 StatspFS_LCn_CFG_0(link) = 0x1030d07f')
+ link_stats.append('pFS_LCn_CFG_1 = 0x105f')
+ link_stats.append('pFS_LCn_STATE = 0x1033')
+ link_stats.append('pFS_LCn_SC_STAT = 0x0')
+ link_stats.append('pFS_LCn_PKT_CNT_0 = 0x0')
+ link_stats.append('pFS_LCn_PKT_CNT_1 = 0x0')
+ link_stats.append('pFS_LCn_BYTE_CNT_0 = 0x0')
+ link_stats.append('pFS_LCn_BYTE_CNT_1 = 0x0')
+ link_stats.append('pFS_LCn_CM_TXDATA_0 = 0x82000000')
+ link_stats.append('pFS_LCn_CM_TXDATA_1 = 0x0')
+ link_stats.append('pFS_LCn_CM_RXDATA_0 = 0x0')
+ link_stats.append('pFS_LCn_CM_RXDATA_1 = 0x0')
+ link_stats.append('pFS_LCn_PKT_CNT_0 = 0x0')
+ link_stats.append('pFS_LCn_PKT_CNT_1 = 0x0')
+ link_stats.append('pFS_LCn_RMCSCNT = 0x1428')
+ link_stats.append('pFS_LCn_RUCSCNT = 0x116')
+ link_stats.append('pFS_LCn_RERRSCNT = 0x0')
+ link_stats.append('pFS_LCn_RDRPSCNT = 0xb4')
+ link_stats.append('pFS_LCn_RPKTSCNT = 0x0')
+ link_stats.append('pFS_LCn_TPKTSCNT = 0x1')
+ link_stats.append('pFS_LCn_TDRPSCNT = 0x0')
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+ with open('%s/%s' % (work_dir, filename), 'w') as ls_file:
+ for stat in link_stats:
+ ls_file.write(stat + '\n')
+ ls_file.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ def fabric_config_get_ip_info(self, filename, tftp_addr=None):
+ """ Upload an ipinfo file from the node to TFTP"""
+ self.executed.append("fabric_config_get_ip_info")
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+
+ # Create IP info file
+ ipinfo = open("%s/%s" % (work_dir, filename), "w")
+ for i in range(len(ADDRESSES)):
+ ipinfo.write("Node %i: %s\n" % (i, ADDRESSES[i]))
+ ipinfo.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ def fabric_config_get_uplink_info(self, filename, tftp_addr=None):
+ self.executed.append("fabric_config_get_uplink_info")
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+ # Create uplink info file
+ ulinfo = open("%s/%s" % (work_dir, filename), "w")
+ for i in range(1, NUM_NODES):
+ ulinfo.write("Node %i: eth0 0, eth1 0, mgmt 0\n" % i)
+ ulinfo.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ def fabric_config_get_uplink(self, iface):
+ self.executed.append(("fabric_config_get_uplink", iface))
+ return 0
+
+ def fabric_config_set_uplink(self, uplink, iface):
+ self.executed.append(("fabric_config_set_uplink", uplink, iface))
+
+ def fabric_config_get_mac_addresses(self, filename, tftp_addr=None):
+ """ Upload a macaddrs file from the node to TFTP"""
+ self.executed.append("fabric_config_get_mac_addresses")
+
+ if not(tftp_addr):
+ raise IpmiError('No tftp address!')
+
+ work_dir = tempfile.mkdtemp(prefix="cxmanage_test-")
+
+ # Create macaddrs file
+ macaddrs = open("%s/%s" % (work_dir, filename), "w")
+ for i in range(len(ADDRESSES)):
+ for port in range(3):
+ macaddr = "00:00:00:00:%x:%x" % (i, port)
+ macaddrs.write("Node %i, Port %i: %s\n" % (i, port, macaddr))
+ macaddrs.close()
+
+ # Upload to tftp
+ address, port = tftp_addr.split(":")
+ port = int(port)
+ tftp = ExternalTftp(address, port)
+ tftp.put_file("%s/%s" % (work_dir, filename), filename)
+
+ shutil.rmtree(work_dir)
+
+ def fabric_config_get_ip_src(self):
+ self.executed.append('fabric_config_get_ip_src')
+ return 2
+
+ def fabric_config_set_ip_src(self, ipsrc_mode):
+ self.fabric_ipsrc = ipsrc_mode
+ self.executed.append('fabric_config_set_ip_src')
+
+ def fabric_config_factory_default(self):
+ self.executed.append('fabric_config_factory_default')
+
+ def fabric_config_get_ip_addr_base(self):
+ """Provide a fake base IP addr"""
+ self.executed.append('fabric_config_get_ip_addr_base')
+ return self.ipaddr_base
+
+ def fabric_config_update_config(self):
+ self.executed.append('fabric_config_update_config')
+
+ def fabric_get_linkspeed(self, link="", actual=""):
+ self.executed.append('fabric_get_linkspeed')
+ return 1
+
+ def fabric_config_get_linkspeed(self):
+ self.executed.append('fabric_config_get_linkspeed')
+ return 1
+
+ def fabric_config_set_linkspeed(self, linkspeed):
+ self.fabric_linkspeed = linkspeed
+ self.executed.append('fabric_config_set_linkspeed')
+
+ def fabric_config_get_linkspeed_policy(self):
+ self.executed.append('fabric_config_get_linkspeed_policy')
+ return 1
+
+ def fabric_config_set_linkspeed_policy(self, ls_policy):
+ self.fabric_ls_policy = ls_policy
+ self.executed.append('fabric_config_set_linkspeed_policy')
+
+ def fabric_config_get_link_users_factor(self):
+ self.executed.append('fabric_config_get_link_users_factor')
+ return 1
+
+ def fabric_config_set_link_users_factor(self, lu_factor):
+ self.fabric_lu_factor = lu_factor
+ self.executed.append('fabric_config_set_link_users_factor')
+
+ def fabric_config_set_macaddr_base(self, macaddr):
+ self.executed.append(('fabric_config_set_macaddr_base', macaddr))
+
+ def fabric_config_get_macaddr_base(self):
+ self.executed.append('fabric_config_get_macaddr_base')
+ return "00:00:00:00:00:00"
+
+ def fabric_config_set_macaddr_mask(self, mask):
+ self.executed.append(('fabric_config_set_macaddr_mask', mask))
+
+ def fabric_config_get_macaddr_mask(self):
+ self.executed.append('fabric_config_get_macaddr_mask')
+ return "00:00:00:00:00:00"
+
+ def fabric_add_macaddr(self, nodeid=0, iface=0, macaddr=None):
+ self.executed.append('fabric_add_macaddr')
+
+ def fabric_rm_macaddr(self, nodeid=0, iface=0, macaddr=None):
+ self.executed.append('fabric_rm_macaddr')
+
+ def fabric_get_uplink_info(self):
+ """Corresponds to Node.get_uplink_info()"""
+ self.executed.append('get_uplink_info')
+ return 'Node 0: eth0 0, eth1 0, mgmt 0'
+
+ def fabric_get_uplink_speed(self):
+ """Corresponds to Node.get_uplink_speed()"""
+ self.executed.append('get_uplink_speed')
+ return 1
+
+ def fru_read(self, fru_number, filename):
+ if fru_number == 81:
+ self.executed.append('node_fru_read')
+ elif fru_number == 82:
+ self.executed.append('slot_fru_read')
+ else:
+ self.executed.append('fru_read')
+
+ with open(filename, "w") as fru_image:
+ # Writes a fake FRU image with version "0.0"
+ fru_image.write("x00" * 516 + "0.0" + "x00"*7673)
+
+ def pmic_get_version(self):
+ return "0"
+
+
+# pylint: disable=R0913, R0903
+class Partition(object):
+ """Partition class."""
+ def __init__(self, partition, type_, offset=0,
+ size=0, priority=0, daddr=0, in_use=None):
+ self.updates = 0
+ self.retrieves = 0
+ self.checks = 0
+ self.activates = 0
+ self.fwinfo = FWInfoEntry(partition, type_, offset, size, priority,
+ daddr, in_use)
+
+
+class FWInfoEntry(object):
+ """ Firmware info for a single partition """
+
+ def __init__(self, partition, type_, offset=0, size=0, priority=0, daddr=0,
+ in_use=None):
+ self.partition = "%2i" % partition
+ self.type = {
+ 2: "02 (S2_ELF)",
+ 3: "03 (SOC_ELF)",
+ 10: "0a (CDB)",
+ 11: "0b (UBOOTENV)"
+ }[type_]
+ self.offset = "%8x" % offset
+ self.size = "%8x" % size
+ self.priority = "%8x" % priority
+ self.daddr = "%8x" % daddr
+ self.in_use = {None: "Unknown", True: "1", False: "0"}[in_use]
+ self.flags = "fffffffd"
+ self.version = "v0.0.0"
+
+
+class DummyUbootEnv(UbootEnv):
+ """UbootEnv info."""
+
+ def get_boot_order(self):
+ """Hard coded boot order for testing."""
+ return ["disk", "pxe"]
+
+ def set_boot_order(self, boot_args):
+ """ Do nothing """
+ pass
+
+# pylint: disable=R0903
+class DummyIPRetriever(object):
+ """ Dummy IP retriever """
+
+ def __init__(self, ecme_ip, aggressive=False, verbosity=0, **kwargs):
+ self.executed = False
+ self.ecme_ip = ecme_ip
+ self.aggressive = aggressive
+ self.verbosity = verbosity
+ for name, value in kwargs.iteritems():
+ setattr(self, name, value)
+
+ def run(self):
+ """ Set the server_ip variable. Raises an error if called more than
+ once. """
+ if self.executed:
+ raise IPDiscoveryError("DummyIPRetriever.run() was called twice!")
+ self.executed = True
+ self.server_ip = "192.168.200.1"
+
+# End of file: node_test.py
diff --git a/cxmanage_api/tests/tasks_test.py b/cxmanage_api/tests/tasks_test.py
new file mode 100644
index 0000000..2d7b9a3
--- /dev/null
+++ b/cxmanage_api/tests/tasks_test.py
@@ -0,0 +1,81 @@
+"""Calxeda: task_test.py"""
+
+
+# Copyright (c) 2012-2013, Calxeda Inc.
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# * Neither the name of Calxeda Inc. nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+
+
+import unittest
+import time
+
+from cxmanage_api.tasks import TaskQueue
+
+
+# pylint: disable=R0904
+class TaskTest(unittest.TestCase):
+ """Test for the TaskQueue Class."""
+
+ def test_task_queue(self):
+ """ Test the task queue """
+ task_queue = TaskQueue()
+ counters = [Counter() for _ in xrange(128)]
+ tasks = [task_queue.put(counters[i].add, i) for i in xrange(128)]
+
+ for task in tasks:
+ task.join()
+
+ for i in xrange(128):
+ self.assertEqual(counters[i].value, i)
+
+ def test_sequential_delay(self):
+ """ Test that a single thread delays between tasks """
+ task_queue = TaskQueue(threads=1, delay=0.25)
+ counters = [Counter() for x in xrange(8)]
+
+ start = time.time()
+
+ tasks = [task_queue.put(x.add, 1) for x in counters]
+ for task in tasks:
+ task.join()
+
+ finish = time.time()
+
+ self.assertGreaterEqual(finish - start, 2.0)
+
+
+# pylint: disable=R0903
+class Counter(object):
+ """ Simple counter object for testing purposes """
+ def __init__(self):
+ self.value = 0
+
+ def add(self, value):
+ """ Increment this counter's value by some amount """
+ self.value += value
diff --git a/cxmanage_api/tests/tftp_test.py b/cxmanage_api/tests/tftp_test.py
new file mode 100644
index 0000000..ccf2859
--- /dev/null
+++ b/cxmanage_api/tests/tftp_test.py
@@ -0,0 +1,142 @@
+"""Calxeda: tftp_test.py"""
+
+
+# Copyright (c) 2012-2013, Calxeda Inc.
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# * Neither the name of Calxeda Inc. nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+
+
+#
+# Unit tests for the Internal and External Tftp classes.
+#
+
+
+import os
+import socket
+import unittest
+
+from cxmanage_api.tests import random_file
+from cxmanage_api.tftp import InternalTftp, ExternalTftp
+
+
+def _get_relative_host():
+ """Returns the test machine ip as a relative host to pass to the
+ InternalTftp server.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ # RFC863 defines port 9 as the UDP discard port, so we use it to
+ # find out our local ip to pass as a relative_host
+ return sock.getsockname()[0]
+
+ except socket.error:
+ raise
+
+# pylint: disable=R0904
+class InternalTftpTest(unittest.TestCase):
+ """ Tests the functions of the InternalTftp class."""
+
+ def setUp(self):
+ """Create local Internal TFTP objects to test with."""
+ self.tftp1 = InternalTftp()
+ self.tftp2 = InternalTftp(ip_address='127.0.0.254')
+
+ def test_put_and_get(self):
+ """ Test file transfers on an internal host """
+
+ # Create file
+ filename = random_file(1024)
+ contents = open(filename).read()
+
+ # Upload and remove
+ basename = os.path.basename(filename)
+ self.tftp1.put_file(filename, basename)
+ os.remove(filename)
+ self.assertFalse(os.path.exists(filename))
+
+ # Download
+ self.tftp1.get_file(basename, filename)
+
+ # Verify match
+ self.assertEqual(open(filename).read(), contents)
+ os.remove(filename)
+
+ def test_get_address_with_relhost(self):
+ """Tests the get_address(relative_host) function with a relative_host
+ specified.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ # RFC863 defines port 9 as the UDP discard port, so we use it to
+ # find out our local ip to pass as a relative_host
+ sock.connect((socket.gethostname(), 9))
+ relative_host = sock.getsockname()[0]
+
+ except socket.error:
+ raise
+ self.assertEqual(self.tftp2.ip_address,
+ self.tftp2.get_address(relative_host=relative_host))
+ sock.close()
+
+
+# pylint: disable=R0904
+class ExternalTftpTest(unittest.TestCase):
+ """Tests the ExternalTftp class.
+ ..note:
+ * For testing purposes the 'external' server points to an internally
+ hosted one, but it allows us to make sure the actual TFTP protocol is
+ working.
+ """
+
+ def setUp(self):
+ """Create an ExternalTftp object to test with."""
+ self.itftp = InternalTftp(ip_address='127.0.0.250')
+ self.etftp = ExternalTftp(
+ self.itftp.get_address(relative_host=_get_relative_host()),
+ self.itftp.port)
+
+ def test_put_and_get(self):
+ """Test the put_file(src, dest) function. Test the get_file(src,dest)
+ function by movign local files around using the TFT Protocol.
+ """
+ # Create file
+ filename = random_file(1024)
+ contents = open(filename).read()
+ # Upload and remove original file
+ basename = os.path.basename(filename)
+ self.etftp.put_file(src=filename, dest=basename)
+ os.remove(filename)
+ self.assertFalse(os.path.exists(filename))
+ # Download
+ self.etftp.get_file(src=basename, dest=filename)
+ # Verify match
+ self.assertEqual(open(filename).read(), contents)
+ os.remove(filename)
+
+# End of file: ./tftp_test.py