diff options
author | George Kraft <george.kraft@calxeda.com> | 2013-12-04 10:39:57 -0600 |
---|---|---|
committer | George Kraft <george.kraft@calxeda.com> | 2013-12-04 10:39:57 -0600 |
commit | bcebe16d59cd1a24321ac4b2bd2931f3df5ce102 (patch) | |
tree | 73cff0eaf90927ec673b910bf28ebd407be3bbd4 /cxmanage_api/tests | |
parent | d96b517e34b02becab36d692803617bc39fdfee3 (diff) | |
download | cxmanage-bcebe16d59cd1a24321ac4b2bd2931f3df5ce102.tar.gz |
CXMAN-264: Rename cxmanage_test to cxmanage_api.tests
So everything is a subpackage of cxmanage_api! Whoo
Diffstat (limited to 'cxmanage_api/tests')
-rw-r--r-- | cxmanage_api/tests/__init__.py | 60 | ||||
-rw-r--r-- | cxmanage_api/tests/fabric_test.py | 712 | ||||
-rw-r--r-- | cxmanage_api/tests/image_test.py | 92 | ||||
-rw-r--r-- | cxmanage_api/tests/node_test.py | 1107 | ||||
-rw-r--r-- | cxmanage_api/tests/tasks_test.py | 81 | ||||
-rw-r--r-- | cxmanage_api/tests/tftp_test.py | 142 |
6 files changed, 2194 insertions, 0 deletions
diff --git a/cxmanage_api/tests/__init__.py b/cxmanage_api/tests/__init__.py new file mode 100644 index 0000000..d8d5307 --- /dev/null +++ b/cxmanage_api/tests/__init__.py @@ -0,0 +1,60 @@ +"""Calxeda: __init__.py""" + + +# Copyright (c) 2012-2013, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + + +import os +import random +import tempfile + +from cxmanage_api.image import Image + +def random_file(size): + """ Create a random file """ + contents = "".join([chr(random.randint(0, 255)) for _ in range(size)]) + file_, filename = tempfile.mkstemp(prefix='cxmanage_test-') + with os.fdopen(file_, "w") as file_handle: + file_handle.write(contents) + + return filename + +class TestImage(Image): + """TestImage Class.""" + def verify(self): + return True + +# pylint: disable=R0903 +class TestSensor(object): + """ Sensor result from bmc/target """ + def __init__(self, sensor_name, sensor_reading): + self.sensor_name = sensor_name + self.sensor_reading = sensor_reading diff --git a/cxmanage_api/tests/fabric_test.py b/cxmanage_api/tests/fabric_test.py new file mode 100644 index 0000000..03a253c --- /dev/null +++ b/cxmanage_api/tests/fabric_test.py @@ -0,0 +1,712 @@ +"""Calxeda: fabric_test.py """ + +# Copyright (c) 2012-2013, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + +import random +import time +import unittest + +from cxmanage_api.fabric import Fabric +from cxmanage_api.tftp import InternalTftp, ExternalTftp +from cxmanage_api.firmware_package import FirmwarePackage +from cxmanage_api.ubootenv import UbootEnv +from cxmanage_api.cx_exceptions import CommandFailedError +from cxmanage_api.tests import TestSensor +from cxmanage_api.tests.node_test import DummyBMC +from pyipmi import make_bmc + +NUM_NODES = 128 +ADDRESSES = ["192.168.100.%i" % x for x in range(1, NUM_NODES + 1)] + + +# pylint: disable=R0904 +class FabricTest(unittest.TestCase): + """ Test the various Fabric commands """ + def setUp(self): + # Set up the controller and add targets + self.fabric = Fabric("192.168.100.1", node=DummyNode) + self.nodes = [DummyNode(i) for i in ADDRESSES] + # pylint: disable=W0212 + self.fabric._nodes = dict((i, self.nodes[i]) + for i in xrange(NUM_NODES)) + + def test_tftp(self): + """ Test the tftp property """ + tftp = InternalTftp() + self.fabric.tftp = tftp + self.assertTrue(self.fabric.tftp is tftp) + for node in self.nodes: + self.assertTrue(node.tftp is tftp) + + tftp = ExternalTftp("127.0.0.1") + self.fabric.tftp = tftp + self.assertTrue(self.fabric.tftp is tftp) + for node in self.nodes: + self.assertTrue(node.tftp is tftp) + + def test_get_mac_addresses(self): + """ Test get_mac_addresses command """ + self.fabric.get_mac_addresses() + self.assertEqual(self.nodes[0].executed, ["get_fabric_macaddrs"]) + for node in self.nodes[1:]: + self.assertEqual(node.executed, []) + + def test_get_uplink_info(self): + """ Test get_uplink_info command """ + self.fabric.get_uplink_info() + for node in self.nodes: + self.assertEqual(node.executed, ["get_uplink_info"]) + + def test_get_uplink_speed(self): + """ Test get_uplink_speed command """ + self.fabric.get_uplink_speed() + for node in self.nodes: + self.assertEqual(node.executed, ["get_uplink_speed"]) + + def test_get_uplink(self): + """ Test get_uplink command """ + self.assertEqual(self.fabric.get_uplink(iface=0), 0) + + def test_set_uplink(self): + """ Test set_uplink command """ + iface, uplink = 0, 0 + self.fabric.set_uplink(iface=iface, uplink=uplink) + self.assertEqual( + [("fabric_config_set_uplink", iface, uplink)], + self.nodes[0].bmc.executed + ) + + def test_get_sensors(self): + """ Test get_sensors command """ + self.fabric.get_sensors() + self.fabric.get_sensors("Node Power") + for node in self.nodes: + self.assertEqual(node.executed, ["get_sensors", "get_sensors"]) + + def test_get_firmware_info(self): + """ Test get_firmware_info command """ + self.fabric.get_firmware_info() + for node in self.nodes: + self.assertEqual(node.executed, ["get_firmware_info"]) + + def test_is_updatable(self): + """ Test is_updatable command """ + package = FirmwarePackage() + self.fabric.is_updatable(package) + for node in self.nodes: + self.assertEqual(node.executed, [("is_updatable", package)]) + + def test_update_firmware(self): + """ Test update_firmware command """ + package = FirmwarePackage() + self.fabric.update_firmware(package) + for node in self.nodes: + self.assertEqual(node.executed, [("update_firmware", package)]) + + def test_config_reset(self): + """ Test config_reset command """ + self.fabric.config_reset() + for node in self.nodes: + self.assertEqual(node.executed, ["config_reset"]) + + def test_set_boot_order(self): + """ Test set_boot_order command """ + boot_args = "disk0,pxe,retry" + self.fabric.set_boot_order(boot_args) + for node in self.nodes: + self.assertEqual(node.executed, [("set_boot_order", boot_args)]) + + def test_get_boot_order(self): + """ Test get_boot_order command """ + self.fabric.get_boot_order() + for node in self.nodes: + self.assertEqual(node.executed, ["get_boot_order"]) + + def test_set_pxe_interface(self): + """ Test set_pxe_interface command """ + self.fabric.set_pxe_interface("eth0") + for node in self.nodes: + self.assertEqual(node.executed, [("set_pxe_interface", "eth0")]) + + def test_get_pxe_interface(self): + """ Test get_pxe_interface command """ + self.fabric.get_pxe_interface() + for node in self.nodes: + self.assertEqual(node.executed, ["get_pxe_interface"]) + + def test_get_versions(self): + """ Test get_versions command """ + self.fabric.get_versions() + for node in self.nodes: + self.assertEqual(node.executed, ["get_versions"]) + + def test_get_ubootenv(self): + """ Test get_ubootenv command """ + self.fabric.get_ubootenv() + for node in self.nodes: + self.assertEqual(node.executed, ["get_ubootenv"]) + + def test_ipmitool_command(self): + """ Test ipmitool_command command """ + ipmitool_args = "power status" + self.fabric.ipmitool_command(ipmitool_args) + for node in self.nodes: + self.assertEqual( + node.executed, [("ipmitool_command", ipmitool_args)] + ) + + def test_get_server_ip(self): + """ Test get_server_ip command """ + self.fabric.get_server_ip("interface", "ipv6", "user", "password", + "aggressive") + for node in self.nodes: + self.assertEqual(node.executed, [("get_server_ip", "interface", + "ipv6", "user", "password", "aggressive")]) + + def test_failed_command(self): + """ Test a failed command """ + fail_nodes = [DummyFailNode(i) for i in ADDRESSES] + # pylint: disable=W0212 + self.fabric._nodes = dict( + (i, fail_nodes[i]) for i in xrange(NUM_NODES) + ) + try: + self.fabric.get_power() + self.fail() + except CommandFailedError: + for node in fail_nodes: + self.assertEqual(node.executed, ["get_power"]) + + def test_primary_node(self): + """Test the primary_node property + + Currently it should always return node 0. + """ + self.assertEqual(self.fabric.primary_node, self.nodes[0]) + + def test_get_ipsrc(self): + """Test the get_ipsrc method + + """ + self.fabric.get_ipsrc() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_get_ip_src', bmc.executed) + + def test_set_ipsrc(self): + """Test the set_ipsrc method""" + + ipsrc = random.randint(1, 2) + + self.fabric.set_ipsrc(ipsrc) + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_set_ip_src', bmc.executed) + + # fabric_ipsrc is just part of DummyBMC - not a real bmc attribute + # it's there to make sure the ipsrc_mode value gets passed to the bmc. + self.assertEqual(bmc.fabric_ipsrc, ipsrc) + + def test_apply_fdc(self): + """Test the apply_factory_default_config method""" + + self.fabric.apply_factory_default_config() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_factory_default', bmc.executed) + + def test_get_ipaddr_base(self): + """Test the get_ipaddr_base method""" + ipaddr_base = self.fabric.get_ipaddr_base() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_get_ip_addr_base', bmc.executed) + self.assertEqual(bmc.ipaddr_base, ipaddr_base) + + def test_update_config(self): + """Test the update_config method + + """ + self.fabric.update_config() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_update_config', bmc.executed) + + def test_get_linkspeed(self): + """Test the get_linkspeed method + + """ + self.fabric.get_linkspeed() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_get_linkspeed', bmc.executed) + + def test_set_linkspeed(self): + """Test the set_linkspeed method""" + + valid_linkspeeds = [1, 2.5, 5, 7.5, 10] + linkspeed = random.choice(valid_linkspeeds) + + self.fabric.set_linkspeed(linkspeed) + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_set_linkspeed', bmc.executed) + + # fabric_linkspeed is just part of DummyBMC - not a real bmc attribute + # it's there to make sure the ipsrc_mode value gets passed to the bmc. + self.assertEqual(bmc.fabric_linkspeed, linkspeed) + + def test_get_linkspeed_policy(self): + """Test the get_linkspeed_policy method + + """ + self.fabric.get_linkspeed_policy() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_get_linkspeed_policy', bmc.executed) + + def test_set_linkspeed_policy(self): + """Test the set_linkspeed_policy method""" + + ls_policy = random.randint(0, 1) + + self.fabric.set_linkspeed_policy(ls_policy) + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_set_linkspeed_policy', bmc.executed) + + # fabric_ls_policy is just part of DummyBMC - not a real bmc attribute + # it's there to make sure the ipsrc_mode value gets passed to the bmc. + self.assertEqual(bmc.fabric_ls_policy, ls_policy) + + def test_get_link_stats(self): + """Test the get_link_stats() method.""" + for i in range(0, 5): + self.fabric.get_link_stats(i) + for node in self.fabric.nodes.values(): + self.assertIn(('get_link_stats', i), node.executed) + + def test_get_linkmap(self): + """Test the get_linkmap method""" + self.fabric.get_linkmap() + for node in self.fabric.nodes.values(): + self.assertIn('get_linkmap', node.executed) + + def test_get_routing_table(self): + """Test the get_routing_table method""" + self.fabric.get_routing_table() + for node in self.fabric.nodes.values(): + self.assertIn('get_routing_table', node.executed) + + def test_get_depth_chart(self): + """Test the depth_chart method""" + self.fabric.get_depth_chart() + for node in self.fabric.nodes.values(): + self.assertIn('get_depth_chart', node.executed) + + def test_get_link_users_factor(self): + """Test the get_link_users_factor method + + """ + self.fabric.get_link_users_factor() + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_get_link_users_factor', bmc.executed) + + def test_set_link_users_factor(self): + """Test the set_link_users_factor method""" + + lu_factor = random.randint(5, 50) + + self.fabric.set_link_users_factor(lu_factor) + bmc = self.fabric.primary_node.bmc + self.assertIn('fabric_config_set_link_users_factor', bmc.executed) + + # fabric_lu_factor is just part of DummyBMC - not a real bmc attribute + # it's there to make sure the ipsrc_mode value gets passed to the bmc. + self.assertEqual(bmc.fabric_lu_factor, lu_factor) + + def test_add_macaddr (self): + """Test the add_macaddr method""" + + valid_nodeids = [0, 1, 2, 3] + t_nodeid = random.choice(valid_nodeids) + + valid_ifaces = [0, 1, 2] + t_iface = random.choice(valid_ifaces) + + t_macaddr = "66:55:44:33:22:11" + + self.fabric.add_macaddr (t_nodeid, t_iface, t_macaddr) + bmc = self.fabric.primary_node.bmc + self.assertIn ('fabric_add_macaddr', bmc.executed) + + def test_rm_macaddr (self): + """Test the rm_macaddr method""" + + valid_nodeids = [0, 1, 2, 3] + t_nodeid = random.choice(valid_nodeids) + + valid_ifaces = [0, 1, 2] + t_iface = random.choice(valid_ifaces) + + t_macaddr = "66:55:44:33:22:11" + + self.fabric.rm_macaddr (t_nodeid, t_iface, t_macaddr) + bmc = self.fabric.primary_node.bmc + self.assertIn ('fabric_rm_macaddr', bmc.executed) + + def test_set_macaddr_base(self): + """Test the set_macaddr_base method""" + self.fabric.set_macaddr_base("00:11:22:33:44:55") + for node in self.fabric.nodes.values(): + if node == self.fabric.primary_node: + self.assertEqual( + node.bmc.executed, + [("fabric_config_set_macaddr_base", "00:11:22:33:44:55")] + ) + else: + self.assertEqual(node.bmc.executed, []) + + def test_get_macaddr_base(self): + """Test the get_macaddr_base method""" + self.assertEqual(self.fabric.get_macaddr_base(), "00:00:00:00:00:00") + for node in self.fabric.nodes.values(): + if node == self.fabric.primary_node: + self.assertEqual( + node.bmc.executed, + ["fabric_config_get_macaddr_base"] + ) + else: + self.assertEqual(node.bmc.executed, []) + + def test_set_macaddr_mask(self): + """Test the set_macaddr_mask method""" + self.fabric.set_macaddr_mask("00:11:22:33:44:55") + for node in self.fabric.nodes.values(): + if node == self.fabric.primary_node: + self.assertEqual( + node.bmc.executed, + [("fabric_config_set_macaddr_mask", "00:11:22:33:44:55")] + ) + else: + self.assertEqual(node.bmc.executed, []) + + def test_get_macaddr_mask(self): + """Test the get_macaddr_mask method""" + self.assertEqual(self.fabric.get_macaddr_mask(), "00:00:00:00:00:00") + for node in self.fabric.nodes.values(): + if node == self.fabric.primary_node: + self.assertEqual( + node.bmc.executed, + ["fabric_config_get_macaddr_mask"] + ) + else: + self.assertEqual(node.bmc.executed, []) + + def test_composite_bmc(self): + """ Test the CompositeBMC member """ + with self.assertRaises(AttributeError): + self.fabric.cbmc.fake_method() + + self.fabric.cbmc.set_chassis_power("off") + results = self.fabric.cbmc.get_chassis_status() + + self.assertEqual(len(results), len(self.fabric.nodes)) + for node_id in self.fabric.nodes: + self.assertFalse(results[node_id].power_on) + + for node in self.fabric.nodes.values(): + self.assertEqual(node.bmc.executed, [ + ("set_chassis_power", "off"), + "get_chassis_status" + ]) + + +class DummyNode(object): + """ Dummy node for the nodemanager tests """ + + # pylint: disable=W0613 + def __init__(self, ip_address, username="admin", password="admin", + tftp=None, *args, **kwargs): + self.executed = [] + self.power_state = False + self.ip_address = ip_address + self.tftp = tftp + self.sel = [] + self.bmc = make_bmc(DummyBMC, hostname=ip_address, username=username, + password=password, verbose=False) + # + # For now, we hard code this to 0 ... + # + self._chassis_id = 0 + + @property + def guid(self): + """Returns the node GUID""" + return self.bmc.guid().system_guid + + @property + def chassis_id(self): + """Returns the chasis ID.""" + return self._chassis_id + + def get_sel(self): + """Simulate get_sel()""" + self.executed.append('get_sel') + return self.sel + + def get_power(self): + """Simulate get_power(). """ + self.executed.append("get_power") + return self.power_state + + def set_power(self, mode): + """Simulate set_power(). """ + self.executed.append(("set_power", mode)) + + def get_power_policy(self): + """Simulate get_power_policy(). """ + self.executed.append("get_power_policy") + return "always-off" + + def set_power_policy(self, mode): + """Simulate set_power_policy(). """ + self.executed.append(("set_power_policy", mode)) + + def mc_reset(self): + """Simulate mc_reset(). """ + self.executed.append("mc_reset") + + def get_firmware_info(self): + """Simulate get_firmware_info(). """ + self.executed.append("get_firmware_info") + + def is_updatable(self, package, partition_arg="INACTIVE", priority=None): + """Simulate is_updateable(). """ + self.executed.append(("is_updatable", package)) + + def update_firmware(self, package, partition_arg="INACTIVE", + priority=None): + """Simulate update_firmware(). """ + self.executed.append(("update_firmware", package)) + time.sleep(random.randint(0, 2)) + + def get_sensors(self, name=""): + """Simulate get_sensors(). """ + self.executed.append("get_sensors") + power_value = "%f (+/- 0) Watts" % random.uniform(0, 10) + temp_value = "%f (+/- 0) degrees C" % random.uniform(30, 50) + sensors = [ + TestSensor("Node Power", power_value), + TestSensor("Board Temp", temp_value) + ] + return [s for s in sensors if name.lower() in s.sensor_name.lower()] + + def config_reset(self): + """Simulate config_reset(). """ + self.executed.append("config_reset") + + def set_boot_order(self, boot_args): + """Simulate set_boot_order().""" + self.executed.append(("set_boot_order", boot_args)) + + def get_boot_order(self): + """Simulate get_boot_order(). """ + self.executed.append("get_boot_order") + return ["disk", "pxe"] + + def set_pxe_interface(self, interface): + """Simulate set_pxe_interface(). """ + self.executed.append(("set_pxe_interface", interface)) + + def get_pxe_interface(self): + """Simulate get_pxe_interface(). """ + self.executed.append("get_pxe_interface") + return "eth0" + + def get_versions(self): + """Simulate get_versions(). """ + self.executed.append("get_versions") + + # pylint: disable=R0902, R0903 + class Result(object): + """Result Class. """ + def __init__(self): + self.header = "Calxeda SoC (0x0096CD)" + self.hardware_version = "TestBoard X00" + self.firmware_version = "v0.0.0" + self.ecme_version = "v0.0.0" + self.ecme_timestamp = "0" + self.a9boot_version = "v0.0.0" + self.uboot_version = "v0.0.0" + self.chip_name = "Unknown" + return Result() + + def ipmitool_command(self, ipmitool_args): + """Simulate ipmitool_command(). """ + self.executed.append(("ipmitool_command", ipmitool_args)) + return "Dummy output" + + def get_ubootenv(self): + """Simulate get_ubootenv(). """ + self.executed.append("get_ubootenv") + + ubootenv = UbootEnv() + ubootenv.variables["bootcmd0"] = "run bootcmd_default" + ubootenv.variables["bootcmd_default"] = "run bootcmd_sata" + return ubootenv + + @staticmethod + def get_fabric_ipinfo(): + """Simulates get_fabric_ipinfo(). """ + return {} + + # pylint: disable=R0913 + def get_server_ip(self, interface=None, ipv6=False, user="user1", + password="1Password", aggressive=False): + """Simulate get_server_ip(). """ + self.executed.append(("get_server_ip", interface, ipv6, user, password, + aggressive)) + return "192.168.200.1" + + def get_fabric_macaddrs(self): + """Simulate get_fabric_macaddrs(). """ + self.executed.append("get_fabric_macaddrs") + result = {} + for node in range(NUM_NODES): + result[node] = {} + for port in range(3): + address = "00:00:00:00:%02x:%02x" % (node, port) + result[node][port] = address + return result + + def get_fabric_uplink_info(self): + """Simulate get_fabric_uplink_info(). """ + self.executed.append('get_fabric_uplink_info') + results = {} + for nid in range(1, NUM_NODES): + results[nid] = {'eth0': 0, 'eth1': 0, 'mgmt': 0} + return results + + def get_uplink_info(self): + """Simulate get_uplink_info(). """ + self.executed.append('get_uplink_info') + return 'Node 0: eth0 0, eth1 0, mgmt 0' + + def get_uplink_speed(self): + """Simulate get_uplink_speed(). """ + self.executed.append('get_uplink_speed') + return 1 + + def get_link_stats(self, link=0): + """Simulate get_link_stats(). """ + self.executed.append(('get_link_stats', link)) + return { + 'FS_LC%s_BYTE_CNT_0' % link: '0x0', + 'FS_LC%s_BYTE_CNT_1' % link: '0x0', + 'FS_LC%s_CFG_0' % link: '0x1030107f', + 'FS_LC%s_CFG_1' % link: '0x104f', + 'FS_LC%s_CM_RXDATA_0' % link: '0x0', + 'FS_LC%s_CM_RXDATA_1' % link: '0x0', + 'FS_LC%s_CM_TXDATA_0' % link: '0x0', + 'FS_LC%s_CM_TXDATA_1' % link: '0x0', + 'FS_LC%s_PKT_CNT_0' % link: '0x0', + 'FS_LC%s_PKT_CNT_1' % link: '0x0', + 'FS_LC%s_RDRPSCNT' % link: '0x0', + 'FS_LC%s_RERRSCNT' % link: '0x0', + 'FS_LC%sRMCSCNT' % link: '0x0', + 'FS_LC%s_RPKTSCNT' % link: '0x0', + 'FS_LC%s_RUCSCNT' % link: '0x0', + 'FS_LC%s_SC_STAT' % link: '0x0', + 'FS_LC%s_STATE' % link: '0x1033', + 'FS_LC%s_TDRPSCNT' % link: '0x0', + 'FS_LC%s_TPKTSCNT' % link: '0x1' + } + + def get_linkmap(self): + """Simulate get_linkmap(). """ + self.executed.append('get_linkmap') + results = {} + for nid in range(0, NUM_NODES): + results[nid] = {nid: {1: 2, 3: 1, 4: 3}} + return results + + def get_routing_table(self): + """Simulate get_routing_table(). """ + self.executed.append('get_routing_table') + results = {} + for nid in range(0, NUM_NODES): + results[nid] = {nid: {1: [0, 0, 0, 3, 0], + 2: [0, 3, 0, 0, 2], + 3: [0, 2, 0, 0, 3]}} + return results + + def get_depth_chart(self): + """Simulate get_depth_chart(). """ + self.executed.append('get_depth_chart') + results = {} + for nid in range(0, NUM_NODES): + results[nid] = {nid: {1: {'shortest': (0, 0)}, + 2: {'hops': [(3, 1)], 'shortest': (0, 0)}, + 3: {'hops': [(2, 1)], 'shortest': (0, 0)}}} + return results + + def get_uplink(self, iface): + """Simulate get_uplink(). """ + self.executed.append(('get_uplink', iface)) + return 0 + + def set_uplink(self, uplink, iface): + """Simulate set_uplink(). """ + self.executed.append(('set_uplink', uplink, iface)) + + def get_node_fru_version(self): + """Simulate get_node_fru_version(). """ + self.executed.append("get_node_fru_version") + return "0.0" + + def get_slot_fru_version(self): + """Simulate get_slot_fru_version(). """ + self.executed.append("get_slot_fru_version") + return "0.0" + + +class DummyFailNode(DummyNode): + """ Dummy node that should fail on some commands """ + + class DummyFailError(Exception): + """Dummy Fail Error class.""" + pass + + def get_power(self): + """Simulate get_power(). """ + self.executed.append("get_power") + raise DummyFailNode.DummyFailError + +# pylint: disable=R0903 +class DummyImage(object): + """Dummy Image class.""" + + def __init__(self, filename, image_type, *args): + self.filename = filename + self.type = image_type + self.args = args diff --git a/cxmanage_api/tests/image_test.py b/cxmanage_api/tests/image_test.py new file mode 100644 index 0000000..f84f5c9 --- /dev/null +++ b/cxmanage_api/tests/image_test.py @@ -0,0 +1,92 @@ +"""Calxeda: image_test.py""" + + +# Copyright (c) 2012-2013, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + + +import os +import shutil +import tempfile +import unittest + +from cxmanage_api.simg import get_simg_header +from cxmanage_api.tftp import InternalTftp +from cxmanage_api.tests import random_file, TestImage + + +# pylint: disable=R0904 +class ImageTest(unittest.TestCase): + """ Tests involving cxmanage images + + These will rely on an internally hosted TFTP server. """ + + def setUp(self): + # Set up an internal server + self.tftp = InternalTftp() + self.work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + + def tearDown(self): + shutil.rmtree(self.work_dir) + + def test_render_to_simg(self): + """ Test image creation and upload """ + imglen = 1024 + priority = 1 + daddr = 12345 + + # Create image + filename = random_file(imglen) + contents = open(filename).read() + image = TestImage(filename, "RAW") + + # Render and examine image + filename = image.render_to_simg(priority, daddr) + simg = open(filename).read() + header = get_simg_header(simg) + self.assertEqual(header.priority, priority) + self.assertEqual(header.imglen, imglen) + self.assertEqual(header.daddr, daddr) + self.assertEqual(simg[header.imgoff:], contents) + + @staticmethod + def test_multiple_uploads(): + """ Test to make sure FDs are being closed """ + # Create image + filename = random_file(1024) + image = TestImage(filename, "RAW") + + for _ in xrange(2048): + image.render_to_simg(0, 0) + + os.remove(filename) + +# End of file: ./image_test.py + diff --git a/cxmanage_api/tests/node_test.py b/cxmanage_api/tests/node_test.py new file mode 100644 index 0000000..cdd2dae --- /dev/null +++ b/cxmanage_api/tests/node_test.py @@ -0,0 +1,1107 @@ +# pylint: disable=C0302 +"""Unit tests for the Node class.""" + + +# Copyright (c) 2012-2013, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + + +import random +import shutil +import tempfile +import unittest + +from pyipmi import IpmiError +from pyipmi.bmc import LanBMC + +from cxmanage_api.tests import TestImage, TestSensor, random_file +from cxmanage_api import temp_file +from cxmanage_api.simg import create_simg, get_simg_header +from cxmanage_api.node import Node +from cxmanage_api.tftp import InternalTftp, ExternalTftp +from cxmanage_api.ubootenv import UbootEnv +from cxmanage_api.firmware_package import FirmwarePackage +from cxmanage_api.cx_exceptions import IPDiscoveryError + + +NUM_NODES = 4 +ADDRESSES = ["192.168.100.%i" % n for n in range(1, NUM_NODES + 1)] +TFTP = InternalTftp() + + +# pylint: disable=R0904, W0201 +class NodeTest(unittest.TestCase): + """ Tests involving cxmanage Nodes """ + + def setUp(self): + self.nodes = [Node(ip_address=ip, tftp=TFTP, bmc=DummyBMC, + image=TestImage, ubootenv=DummyUbootEnv, + ipretriever=DummyIPRetriever, verbose=True) + for ip in ADDRESSES] + + self.server_ip = None + self.fabric_ipsrc = None + self.fabric_ls_policy = None + self.fabric_linkspeed = None + self.fabric_lu_factor = None + + # Give each node a node_id + count = 0 + for node in self.nodes: + node.node_id = count + count = count + 1 + + # Set up an internal server + self.work_dir = tempfile.mkdtemp(prefix="cxmanage_node_test-") + + def tearDown(self): + shutil.rmtree(self.work_dir, ignore_errors=True) + + def test_get_power(self): + """ Test node.get_power method """ + for node in self.nodes: + result = node.get_power() + + self.assertEqual(node.bmc.executed, ["get_chassis_status"]) + self.assertEqual(result, False) + + def test_set_power(self): + """ Test node.set_power method """ + for node in self.nodes: + modes = ["off", "on", "reset", "off"] + for mode in modes: + node.set_power(mode) + + self.assertEqual(node.bmc.executed, + [("set_chassis_power", x) for x in modes]) + + def test_get_power_policy(self): + """ Test node.get_power_policy method """ + for node in self.nodes: + result = node.get_power_policy() + + self.assertEqual(node.bmc.executed, ["get_chassis_status"]) + self.assertEqual(result, "always-off") + + def test_set_power_policy(self): + """ Test node.set_power_policy method """ + for node in self.nodes: + modes = ["always-on", "previous", "always-off"] + for mode in modes: + node.set_power_policy(mode) + + self.assertEqual(node.bmc.executed, + [("set_chassis_policy", x) for x in modes]) + + def test_get_sensors(self): + """ Test node.get_sensors method """ + for node in self.nodes: + result = node.get_sensors() + + self.assertEqual(node.bmc.executed, ["sdr_list"]) + + self.assertEqual(len(result), 2) + self.assertTrue( + result["Node Power"].sensor_reading.endswith("Watts") + ) + self.assertTrue( + result["Board Temp"].sensor_reading.endswith("degrees C") + ) + + def test_is_updatable(self): + """ Test node.is_updatable method """ + for node in self.nodes: + max_size = 12288 - 60 + filename = random_file(max_size) + images = [ + TestImage(filename, "SOC_ELF"), + TestImage(filename, "CDB"), + TestImage(filename, "UBOOTENV") + ] + + # should pass + package = FirmwarePackage() + package.images = images + self.assertTrue(node.is_updatable(package)) + + # should fail if the firmware version is wrong + package = FirmwarePackage() + package.images = images + package.version = "ECX-31415-v0.0.0" + self.assertFalse(node.is_updatable(package)) + + # should fail if we specify a socman version + package = FirmwarePackage() + package.images = images + package.required_socman_version = "0.0.1" + self.assertFalse(node.is_updatable(package)) + + # should fail if we try to upload a slot2 + package = FirmwarePackage() + package.images = images + package.config = "slot2" + self.assertFalse(node.is_updatable(package)) + + # should fail if we upload an image that's too large + package = FirmwarePackage() + package.images = [TestImage(random_file(max_size + 1), "UBOOTENV")] + self.assertFalse(node.is_updatable(package)) + + # should fail if we upload to a CDB partition that's in use + package = FirmwarePackage() + package.images = images + self.assertFalse(node.is_updatable(package, partition_arg="ACTIVE")) + + def test_update_firmware(self): + """ Test node.update_firmware method """ + filename = "%s/%s" % (self.work_dir, "image.bin") + open(filename, "w").write("") + + package = FirmwarePackage() + package.images = [ + TestImage(filename, "SOC_ELF"), + TestImage(filename, "CDB"), + TestImage(filename, "UBOOTENV") + ] + package.version = "0.0.1" + + for node in self.nodes: + node.update_firmware(package) + + partitions = node.bmc.partitions + unchanged_partitions = [partitions[x] for x in [0, 1, 4]] + changed_partitions = [partitions[x] for x in [2, 3, 6]] + ubootenv_partition = partitions[5] + + for partition in unchanged_partitions: + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + for partition in changed_partitions: + self.assertEqual(partition.updates, 1) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 2) + self.assertEqual(partition.activates, 1) + + self.assertEqual(ubootenv_partition.updates, 1) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 2) + self.assertEqual(ubootenv_partition.activates, 1) + + self.assertTrue(("set_firmware_version", "0.0.1") + in node.bmc.executed) + + def test_config_reset(self): + """ Test node.config_reset method """ + for node in self.nodes: + node.config_reset() + + # Assert config reset + executed = node.bmc.executed + self.assertEqual( + len([x for x in executed if x == "reset_firmware"]), 1) + + # Assert sel clear + self.assertEqual( + len([x for x in executed if x == "sel_clear"]), 1) + + # Assert ubootenv changes + active = node.bmc.partitions[5] + inactive = node.bmc.partitions[6] + self.assertEqual(active.updates, 1) + self.assertEqual(active.retrieves, 0) + self.assertEqual(active.checks, 1) + self.assertEqual(active.activates, 1) + self.assertEqual(inactive.updates, 0) + self.assertEqual(inactive.retrieves, 1) + self.assertEqual(inactive.checks, 0) + self.assertEqual(inactive.activates, 0) + + def test_set_boot_order(self): + """ Test node.set_boot_order method """ + boot_args = ["disk", "pxe", "retry"] + for node in self.nodes: + node.set_boot_order(boot_args) + + partitions = node.bmc.partitions + ubootenv_partition = partitions[5] + unchanged_partitions = [x for x in partitions + if x != ubootenv_partition] + + self.assertEqual(ubootenv_partition.updates, 1) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 1) + self.assertEqual(ubootenv_partition.activates, 1) + + for partition in unchanged_partitions: + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + def test_get_boot_order(self): + """ Test node.get_boot_order method """ + for node in self.nodes: + result = node.get_boot_order() + + partitions = node.bmc.partitions + ubootenv_partition = partitions[5] + unchanged_partitions = [x for x in partitions + if x != ubootenv_partition] + + self.assertEqual(ubootenv_partition.updates, 0) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 0) + self.assertEqual(ubootenv_partition.activates, 0) + + for partition in unchanged_partitions: + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + self.assertEqual(result, ["disk", "pxe"]) + + def test_set_pxe_interface(self): + """ Test node.set_pxe_interface method """ + for node in self.nodes: + node.set_pxe_interface("eth0") + + partitions = node.bmc.partitions + ubootenv_partition = partitions[5] + unchanged_partitions = [x for x in partitions + if x != ubootenv_partition] + + self.assertEqual(ubootenv_partition.updates, 1) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 1) + self.assertEqual(ubootenv_partition.activates, 1) + + for partition in unchanged_partitions: + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + def test_get_pxe_interface(self): + """ Test node.get_pxe_interface method """ + for node in self.nodes: + result = node.get_pxe_interface() + + partitions = node.bmc.partitions + ubootenv_partition = partitions[5] + unchanged_partitions = [x for x in partitions + if x != ubootenv_partition] + + self.assertEqual(ubootenv_partition.updates, 0) + self.assertEqual(ubootenv_partition.retrieves, 1) + self.assertEqual(ubootenv_partition.checks, 0) + self.assertEqual(ubootenv_partition.activates, 0) + + for partition in unchanged_partitions: + self.assertEqual(partition.updates, 0) + self.assertEqual(partition.retrieves, 0) + self.assertEqual(partition.checks, 0) + self.assertEqual(partition.activates, 0) + + self.assertEqual(result, "eth0") + + def test_get_versions(self): + """ Test node.get_versions method """ + for node in self.nodes: + result = node.get_versions() + + self.assertEqual(node.bmc.executed, ["get_info_basic", + "get_firmware_info", "info_card"]) + for attr in ["iana", "firmware_version", "ecme_version", + "ecme_timestamp"]: + self.assertTrue(hasattr(result, attr)) + + def test_get_fabric_ipinfo(self): + """ Test node.get_fabric_ipinfo method """ + for node in self.nodes: + result = node.get_fabric_ipinfo() + + for found in node.bmc.executed: + self.assertEqual(found, "fabric_config_get_ip_info") + + self.assertEqual(result, dict([(i, ADDRESSES[i]) + for i in range(NUM_NODES)])) + + def test_get_fabric_macaddrs(self): + """ Test node.get_fabric_macaddrs method """ + for node in self.nodes: + result = node.get_fabric_macaddrs() + + for found in node.bmc.executed: + self.assertEqual(found, "fabric_config_get_mac_addresses") + + self.assertEqual(len(result), NUM_NODES) + for node_id in xrange(NUM_NODES): + self.assertEqual(len(result[node_id]), 3) + for port in result[node_id]: + expected_macaddr = "00:00:00:00:%x:%x" % (node_id, port) + self.assertEqual(result[node_id][port], [expected_macaddr]) + + def test_get_fabric_uplink_info(self): + """ Test node.get_fabric_uplink_info method """ + for node in self.nodes: + node.get_fabric_uplink_info() + + for found in node.bmc.executed: + self.assertEqual(found, "fabric_config_get_uplink_info") + + def test_get_uplink_info(self): + """ Test node.get_uplink_info method """ + for node in self.nodes: + node.get_uplink_info() + + for found in node.bmc.executed: + self.assertEqual(found, "get_uplink_info") + + def test_get_uplink_speed(self): + """ Test node.get_uplink_info method """ + for node in self.nodes: + node.get_uplink_speed() + + for found in node.bmc.executed: + self.assertEqual(found, "get_uplink_speed") + + + def test_get_linkmap(self): + """ Test node.get_linkmap method """ + for node in self.nodes: + node.get_linkmap() + + for found in node.bmc.executed: + self.assertEqual(found, "fabric_info_get_link_map") + + def test_get_routing_table(self): + """ Test node.get_routing_table method """ + for node in self.nodes: + node.get_routing_table() + + for found in node.bmc.executed: + self.assertEqual(found, "fabric_info_get_routing_table") + + def test_get_depth_chart(self): + """ Test node.get_depth_chart method """ + for node in self.nodes: + node.get_depth_chart() + + for found in node.bmc.executed: + self.assertEqual(found, "fabric_info_get_depth_chart") + + def test_get_link_stats(self): + """ Test node.get_link_stats() """ + for node in self.nodes: + node.get_link_stats() + self.assertEqual(node.bmc.executed[0], ('fabric_get_linkstats', 0)) + + def test_get_server_ip(self): + """ Test node.get_server_ip method """ + for node in self.nodes: + result = node.get_server_ip() + self.assertEqual(result, "192.168.200.1") + + def test_get_linkspeed(self): + """ Test node.get_linkspeed method """ + for node in self.nodes: + result = node.get_linkspeed() + self.assertEqual(result, 1) + + def test_get_uplink(self): + """ Test node.get_uplink method""" + for node in self.nodes: + result = node.get_uplink(iface=0) + self.assertEqual(result, 0) + + def test_set_uplink(self): + """ Test node.set_uplink method """ + for node in self.nodes: + node.set_uplink(iface=0, uplink=0) + self.assertEqual(node.get_uplink(iface=0), 0) + +# pylint: disable=R0902 +class DummyBMC(LanBMC): + """ Dummy BMC for the node tests """ + + + GUID_UNIQUE = 0 + + + def __init__(self, **kwargs): + super(DummyBMC, self).__init__(**kwargs) + self.executed = [] + self.partitions = [ + Partition(0, 3, 0, 393216, in_use=True), # socman + Partition(1, 10, 393216, 196608, in_use=True), # factory cdb + Partition(2, 3, 589824, 393216, in_use=False), # socman + Partition(3, 10, 983040, 196608, in_use=False), # factory cdb + Partition(4, 10, 1179648, 196608, in_use=True), # running cdb + Partition(5, 11, 1376256, 12288), # ubootenv + Partition(6, 11, 1388544, 12288) # ubootenv + ] + self.ipaddr_base = '192.168.100.1' + self.unique_guid = 'FAKEGUID%s' % DummyBMC.GUID_UNIQUE + self.sel = self.generate_sel(with_errors=False) + + DummyBMC.GUID_UNIQUE += 1 + + def guid(self): + """Returns the GUID""" + self.executed.append("guid") + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self, dummybmc): + self.system_guid = dummybmc.unique_guid + self.time_stamp = None + return Result(self) + + def set_chassis_power(self, mode): + """ Set chassis power """ + self.executed.append(("set_chassis_power", mode)) + + def get_chassis_status(self): + """ Get chassis status """ + self.executed.append("get_chassis_status") + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + self.power_on = False + self.power_restore_policy = "always-off" + return Result() + + def set_chassis_policy(self, mode): + """ Set chassis restore policy """ + self.executed.append(("set_chassis_policy", mode)) + + def mc_reset(self, mode): + """ Reset the MC """ + self.executed.append(("mc_reset", mode)) + + def reset_firmware(self): + """ Reset the running CDB """ + self.executed.append("reset_firmware") + + def sel_clear(self): + """ Clear SEL """ + self.executed.append("sel_clear") + + def sel_elist(self): + """ List SEL. with_errors=True simulates a SEL that contains errors """ + self.executed.append("sel_elist") + return self.sel + + @staticmethod + def generate_sel(with_errors=False): + """ Generates a SEL table for a Node """ + if (with_errors): + return [ + '1 | 11/20/2013 | 20:26:18 | Memory | Correctable ECC | Asserted', + '2 | 11/20/2013 | 20:26:43 | Processor | IERR | Asserted', + '83 | 11/14/2013 | 18:01:35 | OS Stop/Shutdown OS Stop Reason | ' + + 'Error during system startup | Asserted' + ] + else: + return [ + '88 | 11/14/2013 | 18:02:29 | System Boot Initiated OS Boot ' + + 'Reason | Initiated by power up | Asserted', + '91 | 11/14/2013 | 19:24:25 | System Event BMC Status |', + ] + + def get_firmware_info(self): + """ Get partition and simg info """ + self.executed.append("get_firmware_info") + + return [x.fwinfo for x in self.partitions] + + def update_firmware(self, filename, partition, image_type, tftp_addr): + """ Download a file from a TFTP server to a given partition. + + Make sure the image type matches. """ + self.executed.append(("update_firmware", filename, + partition, image_type, tftp_addr)) + self.partitions[partition].updates += 1 + + localfile = temp_file() + TFTP.get_file(filename, localfile) + + contents = open(localfile).read() + simg = get_simg_header(contents) + self.partitions[partition].fwinfo.offset = "%8x" % simg.imgoff + self.partitions[partition].fwinfo.size = "%8x" % simg.imglen + self.partitions[partition].fwinfo.priority = "%8x" % simg.priority + self.partitions[partition].fwinfo.daddr = "%8x" % simg.daddr + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + """Default constructor for the Result class.""" + self.tftp_handle_id = 0 + return Result() + + def retrieve_firmware(self, filename, partition, image_type, tftp_addr): + self.executed.append(("retrieve_firmware", filename, + partition, image_type, tftp_addr)) + self.partitions[partition].retrieves += 1 + + # Upload blank image to tftp + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + open("%s/%s" % (work_dir, filename), "w").write(create_simg("")) + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + shutil.rmtree(work_dir) + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + self.tftp_handle_id = 0 + return Result() + + def register_firmware_read(self, filename, partition, image_type): + self.executed.append(("register_firmware_read", filename, partition, + image_type)) + raise IpmiError() + + def register_firmware_write(self, filename, partition, image_type): + self.executed.append(("register_firmware_write", filename, partition, + image_type)) + raise IpmiError() + + def get_firmware_status(self, handle): + self.executed.append("get_firmware_status") + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + self.status = "Complete" + + del handle + + return Result() + + def check_firmware(self, partition): + self.executed.append(("check_firmware", partition)) + self.partitions[partition].checks += 1 + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + self.crc32 = 0 + self.error = None + return Result() + + def activate_firmware(self, partition): + self.executed.append(("activate_firmware", partition)) + self.partitions[partition].activates += 1 + + def set_firmware_version(self, version): + self.executed.append(("set_firmware_version", version)) + + def sdr_list(self): + """ Get sensor info from the node. """ + self.executed.append("sdr_list") + + power_value = "%f (+/- 0) Watts" % random.uniform(0, 10) + temp_value = "%f (+/- 0) degrees C" % random.uniform(30, 50) + sensors = [ + TestSensor("Node Power", power_value), + TestSensor("Board Temp", temp_value) + ] + + return sensors + + def get_info_basic(self): + """ Get basic SoC info from this node """ + self.executed.append("get_info_basic") + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + self.iana = int("0x0096CD", 16) + self.firmware_version = "ECX-0000-v0.0.0" + self.ecme_version = "v0.0.0" + self.ecme_timestamp = 0 + return Result() + + def get_info_card(self): + self.executed.append("info_card") + + # pylint: disable=R0903 + class Result(object): + """Results class.""" + def __init__(self): + self.type = "TestBoard" + self.revision = "0" + return Result() + + node_count = 0 + def fabric_get_node_id(self): + self.executed.append('get_node_id') + result = DummyBMC.node_count + DummyBMC.node_count += 1 + return result + + def fabric_info_get_link_map(self, filename, tftp_addr=None): + """Upload a link_map file from the node to TFTP""" + self.executed.append('fabric_info_get_link_map') + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + link_map = [] + link_map.append('Link 1: Node 2') + link_map.append('Link 3: Node 1') + link_map.append('Link 4: Node 3') + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + with open('%s/%s' % (work_dir, filename), 'w') as lm_file: + for lmap in link_map: + lm_file.write(lmap + '\n') + lm_file.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + def fabric_info_get_routing_table(self, filename, tftp_addr=None): + """Upload a routing_table file from the node to TFTP""" + self.executed.append('fabric_info_get_routing_table') + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + routing_table = [] + routing_table.append('Node 1: rt - 0.2.0.3.2') + routing_table.append('Node 2: rt - 0.3.0.1.2') + routing_table.append('Node 3: rt - 0.2.0.1.3') + routing_table.append('Node 12: rt - 0.2.0.0.1') + routing_table.append('Node 13: rt - 0.2.0.0.1') + routing_table.append('Node 14: rt - 0.2.0.0.1') + routing_table.append('Node 15: rt - 0.2.0.0.1') + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + with open('%s/%s' % (work_dir, filename), 'w') as rt_file: + for rtable in routing_table: + rt_file.write(rtable + '\n') + rt_file.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + def fabric_info_get_depth_chart(self, filename, tftp_addr=None): + """Upload a depth_chart file from the node to TFTP""" + self.executed.append('fabric_info_get_depth_chart') + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + depth_chart = [] + depth_chart.append( + 'Node 1: Shortest Distance 0 hops via neighbor 0: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 2: Shortest Distance 0 hops via neighbor 0: ' + + 'other hops/neighbors - 1/3' + ) + depth_chart.append( + 'Node 3: Shortest Distance 0 hops via neighbor 0: ' + + 'other hops/neighbors - 1/2' + ) + depth_chart.append( + 'Node 4: Shortest Distance 2 hops via neighbor 6: ' + + 'other hops/neighbors - 3/7' + ) + depth_chart.append( + 'Node 5: Shortest Distance 3 hops via neighbor 4: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 6: Shortest Distance 1 hops via neighbor 2: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 7: Shortest Distance 2 hops via neighbor 6: ' + + 'other hops/neighbors - 3/4' + ) + depth_chart.append( + 'Node 8: Shortest Distance 3 hops via neighbor 10: ' + + 'other hops/neighbors - 4/11' + ) + depth_chart.append( + 'Node 9: Shortest Distance 4 hops via neighbor 8: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 10: Shortest Distance 2 hops via neighbor 6: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 11: Shortest Distance 3 hops via neighbor 10: ' + + 'other hops/neighbors - 4/8' + ) + depth_chart.append( + 'Node 12: Shortest Distance 4 hops via neighbor 14: ' + + 'other hops/neighbors - 5/15' + ) + depth_chart.append( + 'Node 13: Shortest Distance 5 hops via neighbor 12: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 14: Shortest Distance 3 hops via neighbor 10: ' + + 'other hops/neighbors -' + ) + depth_chart.append( + 'Node 15: Shortest Distance 4 hops via neighbor 14: ' + + 'other hops/neighbors - 5/12' + ) + + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + with open('%s/%s' % (work_dir, filename), 'w') as dc_file: + for dchart in depth_chart: + dc_file.write(dchart + '\n') + dc_file.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + # pylint: disable=W0222 + def fabric_get_linkstats(self, filename, tftp_addr=None, + link=None): + """Upload a link_stats file from the node to TFTP""" + self.executed.append(('fabric_get_linkstats', link)) + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + link_stats = [] + link_stats.append('Packet Counts for Link %s:' % link) + link_stats.append('Link0 StatspFS_LCn_CFG_0(link) = 0x1030d07f') + link_stats.append('pFS_LCn_CFG_1 = 0x105f') + link_stats.append('pFS_LCn_STATE = 0x1033') + link_stats.append('pFS_LCn_SC_STAT = 0x0') + link_stats.append('pFS_LCn_PKT_CNT_0 = 0x0') + link_stats.append('pFS_LCn_PKT_CNT_1 = 0x0') + link_stats.append('pFS_LCn_BYTE_CNT_0 = 0x0') + link_stats.append('pFS_LCn_BYTE_CNT_1 = 0x0') + link_stats.append('pFS_LCn_CM_TXDATA_0 = 0x82000000') + link_stats.append('pFS_LCn_CM_TXDATA_1 = 0x0') + link_stats.append('pFS_LCn_CM_RXDATA_0 = 0x0') + link_stats.append('pFS_LCn_CM_RXDATA_1 = 0x0') + link_stats.append('pFS_LCn_PKT_CNT_0 = 0x0') + link_stats.append('pFS_LCn_PKT_CNT_1 = 0x0') + link_stats.append('pFS_LCn_RMCSCNT = 0x1428') + link_stats.append('pFS_LCn_RUCSCNT = 0x116') + link_stats.append('pFS_LCn_RERRSCNT = 0x0') + link_stats.append('pFS_LCn_RDRPSCNT = 0xb4') + link_stats.append('pFS_LCn_RPKTSCNT = 0x0') + link_stats.append('pFS_LCn_TPKTSCNT = 0x1') + link_stats.append('pFS_LCn_TDRPSCNT = 0x0') + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + with open('%s/%s' % (work_dir, filename), 'w') as ls_file: + for stat in link_stats: + ls_file.write(stat + '\n') + ls_file.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + def fabric_config_get_ip_info(self, filename, tftp_addr=None): + """ Upload an ipinfo file from the node to TFTP""" + self.executed.append("fabric_config_get_ip_info") + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + + # Create IP info file + ipinfo = open("%s/%s" % (work_dir, filename), "w") + for i in range(len(ADDRESSES)): + ipinfo.write("Node %i: %s\n" % (i, ADDRESSES[i])) + ipinfo.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + def fabric_config_get_uplink_info(self, filename, tftp_addr=None): + self.executed.append("fabric_config_get_uplink_info") + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + # Create uplink info file + ulinfo = open("%s/%s" % (work_dir, filename), "w") + for i in range(1, NUM_NODES): + ulinfo.write("Node %i: eth0 0, eth1 0, mgmt 0\n" % i) + ulinfo.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + def fabric_config_get_uplink(self, iface): + self.executed.append(("fabric_config_get_uplink", iface)) + return 0 + + def fabric_config_set_uplink(self, uplink, iface): + self.executed.append(("fabric_config_set_uplink", uplink, iface)) + + def fabric_config_get_mac_addresses(self, filename, tftp_addr=None): + """ Upload a macaddrs file from the node to TFTP""" + self.executed.append("fabric_config_get_mac_addresses") + + if not(tftp_addr): + raise IpmiError('No tftp address!') + + work_dir = tempfile.mkdtemp(prefix="cxmanage_test-") + + # Create macaddrs file + macaddrs = open("%s/%s" % (work_dir, filename), "w") + for i in range(len(ADDRESSES)): + for port in range(3): + macaddr = "00:00:00:00:%x:%x" % (i, port) + macaddrs.write("Node %i, Port %i: %s\n" % (i, port, macaddr)) + macaddrs.close() + + # Upload to tftp + address, port = tftp_addr.split(":") + port = int(port) + tftp = ExternalTftp(address, port) + tftp.put_file("%s/%s" % (work_dir, filename), filename) + + shutil.rmtree(work_dir) + + def fabric_config_get_ip_src(self): + self.executed.append('fabric_config_get_ip_src') + return 2 + + def fabric_config_set_ip_src(self, ipsrc_mode): + self.fabric_ipsrc = ipsrc_mode + self.executed.append('fabric_config_set_ip_src') + + def fabric_config_factory_default(self): + self.executed.append('fabric_config_factory_default') + + def fabric_config_get_ip_addr_base(self): + """Provide a fake base IP addr""" + self.executed.append('fabric_config_get_ip_addr_base') + return self.ipaddr_base + + def fabric_config_update_config(self): + self.executed.append('fabric_config_update_config') + + def fabric_get_linkspeed(self, link="", actual=""): + self.executed.append('fabric_get_linkspeed') + return 1 + + def fabric_config_get_linkspeed(self): + self.executed.append('fabric_config_get_linkspeed') + return 1 + + def fabric_config_set_linkspeed(self, linkspeed): + self.fabric_linkspeed = linkspeed + self.executed.append('fabric_config_set_linkspeed') + + def fabric_config_get_linkspeed_policy(self): + self.executed.append('fabric_config_get_linkspeed_policy') + return 1 + + def fabric_config_set_linkspeed_policy(self, ls_policy): + self.fabric_ls_policy = ls_policy + self.executed.append('fabric_config_set_linkspeed_policy') + + def fabric_config_get_link_users_factor(self): + self.executed.append('fabric_config_get_link_users_factor') + return 1 + + def fabric_config_set_link_users_factor(self, lu_factor): + self.fabric_lu_factor = lu_factor + self.executed.append('fabric_config_set_link_users_factor') + + def fabric_config_set_macaddr_base(self, macaddr): + self.executed.append(('fabric_config_set_macaddr_base', macaddr)) + + def fabric_config_get_macaddr_base(self): + self.executed.append('fabric_config_get_macaddr_base') + return "00:00:00:00:00:00" + + def fabric_config_set_macaddr_mask(self, mask): + self.executed.append(('fabric_config_set_macaddr_mask', mask)) + + def fabric_config_get_macaddr_mask(self): + self.executed.append('fabric_config_get_macaddr_mask') + return "00:00:00:00:00:00" + + def fabric_add_macaddr(self, nodeid=0, iface=0, macaddr=None): + self.executed.append('fabric_add_macaddr') + + def fabric_rm_macaddr(self, nodeid=0, iface=0, macaddr=None): + self.executed.append('fabric_rm_macaddr') + + def fabric_get_uplink_info(self): + """Corresponds to Node.get_uplink_info()""" + self.executed.append('get_uplink_info') + return 'Node 0: eth0 0, eth1 0, mgmt 0' + + def fabric_get_uplink_speed(self): + """Corresponds to Node.get_uplink_speed()""" + self.executed.append('get_uplink_speed') + return 1 + + def fru_read(self, fru_number, filename): + if fru_number == 81: + self.executed.append('node_fru_read') + elif fru_number == 82: + self.executed.append('slot_fru_read') + else: + self.executed.append('fru_read') + + with open(filename, "w") as fru_image: + # Writes a fake FRU image with version "0.0" + fru_image.write("x00" * 516 + "0.0" + "x00"*7673) + + def pmic_get_version(self): + return "0" + + +# pylint: disable=R0913, R0903 +class Partition(object): + """Partition class.""" + def __init__(self, partition, type_, offset=0, + size=0, priority=0, daddr=0, in_use=None): + self.updates = 0 + self.retrieves = 0 + self.checks = 0 + self.activates = 0 + self.fwinfo = FWInfoEntry(partition, type_, offset, size, priority, + daddr, in_use) + + +class FWInfoEntry(object): + """ Firmware info for a single partition """ + + def __init__(self, partition, type_, offset=0, size=0, priority=0, daddr=0, + in_use=None): + self.partition = "%2i" % partition + self.type = { + 2: "02 (S2_ELF)", + 3: "03 (SOC_ELF)", + 10: "0a (CDB)", + 11: "0b (UBOOTENV)" + }[type_] + self.offset = "%8x" % offset + self.size = "%8x" % size + self.priority = "%8x" % priority + self.daddr = "%8x" % daddr + self.in_use = {None: "Unknown", True: "1", False: "0"}[in_use] + self.flags = "fffffffd" + self.version = "v0.0.0" + + +class DummyUbootEnv(UbootEnv): + """UbootEnv info.""" + + def get_boot_order(self): + """Hard coded boot order for testing.""" + return ["disk", "pxe"] + + def set_boot_order(self, boot_args): + """ Do nothing """ + pass + +# pylint: disable=R0903 +class DummyIPRetriever(object): + """ Dummy IP retriever """ + + def __init__(self, ecme_ip, aggressive=False, verbosity=0, **kwargs): + self.executed = False + self.ecme_ip = ecme_ip + self.aggressive = aggressive + self.verbosity = verbosity + for name, value in kwargs.iteritems(): + setattr(self, name, value) + + def run(self): + """ Set the server_ip variable. Raises an error if called more than + once. """ + if self.executed: + raise IPDiscoveryError("DummyIPRetriever.run() was called twice!") + self.executed = True + self.server_ip = "192.168.200.1" + +# End of file: node_test.py diff --git a/cxmanage_api/tests/tasks_test.py b/cxmanage_api/tests/tasks_test.py new file mode 100644 index 0000000..2d7b9a3 --- /dev/null +++ b/cxmanage_api/tests/tasks_test.py @@ -0,0 +1,81 @@ +"""Calxeda: task_test.py""" + + +# Copyright (c) 2012-2013, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + + +import unittest +import time + +from cxmanage_api.tasks import TaskQueue + + +# pylint: disable=R0904 +class TaskTest(unittest.TestCase): + """Test for the TaskQueue Class.""" + + def test_task_queue(self): + """ Test the task queue """ + task_queue = TaskQueue() + counters = [Counter() for _ in xrange(128)] + tasks = [task_queue.put(counters[i].add, i) for i in xrange(128)] + + for task in tasks: + task.join() + + for i in xrange(128): + self.assertEqual(counters[i].value, i) + + def test_sequential_delay(self): + """ Test that a single thread delays between tasks """ + task_queue = TaskQueue(threads=1, delay=0.25) + counters = [Counter() for x in xrange(8)] + + start = time.time() + + tasks = [task_queue.put(x.add, 1) for x in counters] + for task in tasks: + task.join() + + finish = time.time() + + self.assertGreaterEqual(finish - start, 2.0) + + +# pylint: disable=R0903 +class Counter(object): + """ Simple counter object for testing purposes """ + def __init__(self): + self.value = 0 + + def add(self, value): + """ Increment this counter's value by some amount """ + self.value += value diff --git a/cxmanage_api/tests/tftp_test.py b/cxmanage_api/tests/tftp_test.py new file mode 100644 index 0000000..ccf2859 --- /dev/null +++ b/cxmanage_api/tests/tftp_test.py @@ -0,0 +1,142 @@ +"""Calxeda: tftp_test.py""" + + +# Copyright (c) 2012-2013, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + + +# +# Unit tests for the Internal and External Tftp classes. +# + + +import os +import socket +import unittest + +from cxmanage_api.tests import random_file +from cxmanage_api.tftp import InternalTftp, ExternalTftp + + +def _get_relative_host(): + """Returns the test machine ip as a relative host to pass to the + InternalTftp server. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # RFC863 defines port 9 as the UDP discard port, so we use it to + # find out our local ip to pass as a relative_host + return sock.getsockname()[0] + + except socket.error: + raise + +# pylint: disable=R0904 +class InternalTftpTest(unittest.TestCase): + """ Tests the functions of the InternalTftp class.""" + + def setUp(self): + """Create local Internal TFTP objects to test with.""" + self.tftp1 = InternalTftp() + self.tftp2 = InternalTftp(ip_address='127.0.0.254') + + def test_put_and_get(self): + """ Test file transfers on an internal host """ + + # Create file + filename = random_file(1024) + contents = open(filename).read() + + # Upload and remove + basename = os.path.basename(filename) + self.tftp1.put_file(filename, basename) + os.remove(filename) + self.assertFalse(os.path.exists(filename)) + + # Download + self.tftp1.get_file(basename, filename) + + # Verify match + self.assertEqual(open(filename).read(), contents) + os.remove(filename) + + def test_get_address_with_relhost(self): + """Tests the get_address(relative_host) function with a relative_host + specified. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # RFC863 defines port 9 as the UDP discard port, so we use it to + # find out our local ip to pass as a relative_host + sock.connect((socket.gethostname(), 9)) + relative_host = sock.getsockname()[0] + + except socket.error: + raise + self.assertEqual(self.tftp2.ip_address, + self.tftp2.get_address(relative_host=relative_host)) + sock.close() + + +# pylint: disable=R0904 +class ExternalTftpTest(unittest.TestCase): + """Tests the ExternalTftp class. + ..note: + * For testing purposes the 'external' server points to an internally + hosted one, but it allows us to make sure the actual TFTP protocol is + working. + """ + + def setUp(self): + """Create an ExternalTftp object to test with.""" + self.itftp = InternalTftp(ip_address='127.0.0.250') + self.etftp = ExternalTftp( + self.itftp.get_address(relative_host=_get_relative_host()), + self.itftp.port) + + def test_put_and_get(self): + """Test the put_file(src, dest) function. Test the get_file(src,dest) + function by movign local files around using the TFT Protocol. + """ + # Create file + filename = random_file(1024) + contents = open(filename).read() + # Upload and remove original file + basename = os.path.basename(filename) + self.etftp.put_file(src=filename, dest=basename) + os.remove(filename) + self.assertFalse(os.path.exists(filename)) + # Download + self.etftp.get_file(src=basename, dest=filename) + # Verify match + self.assertEqual(open(filename).read(), contents) + os.remove(filename) + +# End of file: ./tftp_test.py |