summaryrefslogtreecommitdiff
path: root/tests/unittests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r--tests/unittests/test_util.py964
1 files changed, 492 insertions, 472 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 1290cbc6..3765511b 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -3,33 +3,30 @@
"""Tests for cloudinit.util"""
import base64
-import logging
-import json
-import platform
-import pytest
-
import io
+import json
+import logging
import os
+import platform
import re
import shutil
import stat
import tempfile
-import yaml
+from textwrap import dedent
from unittest import mock
-from cloudinit import subp
-from cloudinit import importer, util
-from tests.unittests import helpers
-
+import pytest
+import yaml
+from cloudinit import importer, subp, util
+from tests.unittests import helpers
from tests.unittests.helpers import CiTestCase
-from textwrap import dedent
LOG = logging.getLogger(__name__)
MOUNT_INFO = [
- '68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64',
- '153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2',
+ "68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64",
+ "153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2",
]
OS_RELEASE_SLES = dedent(
@@ -185,6 +182,25 @@ OS_RELEASE_EUROLINUX_8 = dedent(
"""
)
+OS_RELEASE_MIRACLELINUX_8 = dedent(
+ """\
+ NAME="MIRACLE LINUX"
+ VERSION="8.4 (Peony)"
+ ID="miraclelinux"
+ ID_LIKE="rhel fedora"
+ PLATFORM_ID="platform:el8"
+ VERSION_ID="8"
+ PRETTY_NAME="MIRACLE LINUX 8.4 (Peony)"
+ ANSI_COLOR="0;31"
+ CPE_NAME="cpe:/o:cybertrust_japan:miracle_linux:8"
+ HOME_URL="https://www.cybertrust.co.jp/miracle-linux/"
+ DOCUMENTATION_URL="https://www.miraclelinux.com/support/miraclelinux8"
+ BUG_REPORT_URL="https://bugzilla.asianux.com/"
+ MIRACLELINUX_SUPPORT_PRODUCT="MIRACLE LINUX"
+ MIRACLELINUX_SUPPORT_PRODUCT_VERSION="8"
+"""
+)
+
OS_RELEASE_ROCKY_8 = dedent(
"""\
NAME="Rocky Linux"
@@ -255,6 +271,7 @@ REDHAT_RELEASE_REDHAT_7 = "Red Hat Enterprise Linux Server release 7.5 (Maipo)"
REDHAT_RELEASE_ALMALINUX_8 = "AlmaLinux release 8.3 (Purple Manul)"
REDHAT_RELEASE_EUROLINUX_7 = "EuroLinux release 7.9 (Minsk)"
REDHAT_RELEASE_EUROLINUX_8 = "EuroLinux release 8.4 (Vaduz)"
+REDHAT_RELEASE_MIRACLELINUX_8 = "MIRACLE LINUX release 8.4 (Peony)"
REDHAT_RELEASE_ROCKY_8 = "Rocky Linux release 8.3 (Green Obsidian)"
REDHAT_RELEASE_VIRTUOZZO_8 = "Virtuozzo Linux release 8"
REDHAT_RELEASE_CLOUDLINUX_8 = "CloudLinux release 8.4 (Valery Rozhdestvensky)"
@@ -309,9 +326,9 @@ class FakeCloud(object):
def get_hostname(self, fqdn=None, metadata_only=None):
myargs = {}
if fqdn is not None:
- myargs['fqdn'] = fqdn
+ myargs["fqdn"] = fqdn
if metadata_only is not None:
- myargs['metadata_only'] = metadata_only
+ myargs["metadata_only"] = metadata_only
self.calls.append(myargs)
if fqdn:
return self.fqdn
@@ -320,34 +337,34 @@ class FakeCloud(object):
class TestUtil(CiTestCase):
def test_parse_mount_info_no_opts_no_arg(self):
- result = util.parse_mount_info('/home', MOUNT_INFO, LOG)
- self.assertEqual(('/dev/sda2', 'xfs', '/home'), result)
+ result = util.parse_mount_info("/home", MOUNT_INFO, LOG)
+ self.assertEqual(("/dev/sda2", "xfs", "/home"), result)
def test_parse_mount_info_no_opts_arg(self):
- result = util.parse_mount_info('/home', MOUNT_INFO, LOG, False)
- self.assertEqual(('/dev/sda2', 'xfs', '/home'), result)
+ result = util.parse_mount_info("/home", MOUNT_INFO, LOG, False)
+ self.assertEqual(("/dev/sda2", "xfs", "/home"), result)
def test_parse_mount_info_with_opts(self):
- result = util.parse_mount_info('/', MOUNT_INFO, LOG, True)
- self.assertEqual(('/dev/sda1', 'btrfs', '/', 'ro,relatime'), result)
+ result = util.parse_mount_info("/", MOUNT_INFO, LOG, True)
+ self.assertEqual(("/dev/sda1", "btrfs", "/", "ro,relatime"), result)
- @mock.patch('cloudinit.util.get_mount_info')
+ @mock.patch("cloudinit.util.get_mount_info")
def test_mount_is_rw(self, m_mount_info):
- m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'rw,relatime')
- is_rw = util.mount_is_read_write('/')
+ m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "rw,relatime")
+ is_rw = util.mount_is_read_write("/")
self.assertEqual(is_rw, True)
- @mock.patch('cloudinit.util.get_mount_info')
+ @mock.patch("cloudinit.util.get_mount_info")
def test_mount_is_ro(self, m_mount_info):
- m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime')
- is_rw = util.mount_is_read_write('/')
+ m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "ro,relatime")
+ is_rw = util.mount_is_read_write("/")
self.assertEqual(is_rw, False)
class TestUptime(CiTestCase):
- @mock.patch('cloudinit.util.boottime')
- @mock.patch('cloudinit.util.os.path.exists')
- @mock.patch('cloudinit.util.time.time')
+ @mock.patch("cloudinit.util.boottime")
+ @mock.patch("cloudinit.util.os.path.exists")
+ @mock.patch("cloudinit.util.time.time")
def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime):
boottime = 1000.0
uptime = 10.0
@@ -362,24 +379,24 @@ class TestShellify(CiTestCase):
def test_input_dict_raises_type_error(self):
self.assertRaisesRegex(
TypeError,
- 'Input.*was.*dict.*xpected',
+ "Input.*was.*dict.*xpected",
util.shellify,
- {'mykey': 'myval'},
+ {"mykey": "myval"},
)
def test_input_str_raises_type_error(self):
self.assertRaisesRegex(
- TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar"
+ TypeError, "Input.*was.*str.*xpected", util.shellify, "foobar"
)
def test_value_with_int_raises_type_error(self):
self.assertRaisesRegex(
- TypeError, 'shellify.*int', util.shellify, ["foo", 1]
+ TypeError, "shellify.*int", util.shellify, ["foo", 1]
)
def test_supports_strings_and_lists(self):
self.assertEqual(
- '\n'.join(
+ "\n".join(
[
"#!/bin/sh",
"echo hi mom",
@@ -389,13 +406,13 @@ class TestShellify(CiTestCase):
]
),
util.shellify(
- ["echo hi mom", ["echo", "hi dad"], ('echo', 'hi', 'sis')]
+ ["echo hi mom", ["echo", "hi dad"], ("echo", "hi", "sis")]
),
)
def test_supports_comments(self):
self.assertEqual(
- '\n'.join(["#!/bin/sh", "echo start", "echo end", ""]),
+ "\n".join(["#!/bin/sh", "echo start", "echo end", ""]),
util.shellify(["echo start", None, "echo end"]),
)
@@ -404,58 +421,58 @@ class TestGetHostnameFqdn(CiTestCase):
def test_get_hostname_fqdn_from_only_cfg_fqdn(self):
"""When cfg only has the fqdn key, derive hostname and fqdn from it."""
hostname, fqdn = util.get_hostname_fqdn(
- cfg={'fqdn': 'myhost.domain.com'}, cloud=None
+ cfg={"fqdn": "myhost.domain.com"}, cloud=None
)
- self.assertEqual('myhost', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
+ self.assertEqual("myhost", hostname)
+ self.assertEqual("myhost.domain.com", fqdn)
def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self):
"""When cfg has both fqdn and hostname keys, return them."""
hostname, fqdn = util.get_hostname_fqdn(
- cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None
+ cfg={"fqdn": "myhost.domain.com", "hostname": "other"}, cloud=None
)
- self.assertEqual('other', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
+ self.assertEqual("other", hostname)
+ self.assertEqual("myhost.domain.com", fqdn)
def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self):
"""When cfg has only hostname key which represents a fqdn, use that."""
hostname, fqdn = util.get_hostname_fqdn(
- cfg={'hostname': 'myhost.domain.com'}, cloud=None
+ cfg={"hostname": "myhost.domain.com"}, cloud=None
)
- self.assertEqual('myhost', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
+ self.assertEqual("myhost", hostname)
+ self.assertEqual("myhost.domain.com", fqdn)
def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self):
"""When cfg has a hostname without a '.' query cloud.get_hostname."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
+ mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com")
hostname, fqdn = util.get_hostname_fqdn(
- cfg={'hostname': 'myhost'}, cloud=mycloud
+ cfg={"hostname": "myhost"}, cloud=mycloud
)
- self.assertEqual('myhost', hostname)
- self.assertEqual('cloudhost.mycloud.com', fqdn)
+ self.assertEqual("myhost", hostname)
+ self.assertEqual("cloudhost.mycloud.com", fqdn)
self.assertEqual(
- [{'fqdn': True, 'metadata_only': False}], mycloud.calls
+ [{"fqdn": True, "metadata_only": False}], mycloud.calls
)
def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self):
"""When cfg has neither hostname nor fqdn cloud.get_hostname."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
+ mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com")
hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud)
- self.assertEqual('cloudhost', hostname)
- self.assertEqual('cloudhost.mycloud.com', fqdn)
+ self.assertEqual("cloudhost", hostname)
+ self.assertEqual("cloudhost.mycloud.com", fqdn)
self.assertEqual(
- [{'fqdn': True, 'metadata_only': False}, {'metadata_only': False}],
+ [{"fqdn": True, "metadata_only": False}, {"metadata_only": False}],
mycloud.calls,
)
def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self):
"""Calls to cloud.get_hostname pass the metadata_only parameter."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
+ mycloud = FakeCloud("cloudhost", "cloudhost.mycloud.com")
_hn, _fqdn = util.get_hostname_fqdn(
cfg={}, cloud=mycloud, metadata_only=True
)
self.assertEqual(
- [{'fqdn': True, 'metadata_only': True}, {'metadata_only': True}],
+ [{"fqdn": True, "metadata_only": True}, {"metadata_only": True}],
mycloud.calls,
)
@@ -545,19 +562,19 @@ class TestBlkid(CiTestCase):
)
-@mock.patch('cloudinit.subp.subp')
+@mock.patch("cloudinit.subp.subp")
class TestUdevadmSettle(CiTestCase):
def test_with_no_params(self, m_subp):
"""called with no parameters."""
util.udevadm_settle()
- m_subp.called_once_with(mock.call(['udevadm', 'settle']))
+ m_subp.called_once_with(mock.call(["udevadm", "settle"]))
def test_with_exists_and_not_exists(self, m_subp):
"""with exists=file where file does not exist should invoke subp."""
mydev = self.tmp_path("mydev")
util.udevadm_settle(exists=mydev)
m_subp.called_once_with(
- ['udevadm', 'settle', '--exit-if-exists=%s' % mydev]
+ ["udevadm", "settle", "--exit-if-exists=%s" % mydev]
)
def test_with_exists_and_file_exists(self, m_subp):
@@ -572,7 +589,7 @@ class TestUdevadmSettle(CiTestCase):
timeout = 9
util.udevadm_settle(timeout=timeout)
m_subp.called_once_with(
- ['udevadm', 'settle', '--timeout=%s' % timeout]
+ ["udevadm", "settle", "--timeout=%s" % timeout]
)
def test_with_timeout_string(self, m_subp):
@@ -580,7 +597,7 @@ class TestUdevadmSettle(CiTestCase):
timeout = "555"
util.udevadm_settle(timeout=timeout)
m_subp.assert_called_once_with(
- ['udevadm', 'settle', '--timeout=%s' % timeout]
+ ["udevadm", "settle", "--timeout=%s" % timeout]
)
def test_with_exists_and_timeout(self, m_subp):
@@ -590,10 +607,10 @@ class TestUdevadmSettle(CiTestCase):
util.udevadm_settle(exists=mydev)
m_subp.called_once_with(
[
- 'udevadm',
- 'settle',
- '--exit-if-exists=%s' % mydev,
- '--timeout=%s' % timeout,
+ "udevadm",
+ "settle",
+ "--exit-if-exists=%s" % mydev,
+ "--timeout=%s" % timeout,
]
)
@@ -602,7 +619,7 @@ class TestUdevadmSettle(CiTestCase):
self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle)
-@mock.patch('os.path.exists')
+@mock.patch("os.path.exists")
class TestGetLinuxDistro(CiTestCase):
def setUp(self):
# python2 has no lru_cache, and therefore, no cache_clear()
@@ -612,36 +629,36 @@ class TestGetLinuxDistro(CiTestCase):
@classmethod
def os_release_exists(self, path):
"""Side effect function"""
- if path == '/etc/os-release':
+ if path == "/etc/os-release":
return 1
@classmethod
def redhat_release_exists(self, path):
"""Side effect function"""
- if path == '/etc/redhat-release':
+ if path == "/etc/redhat-release":
return 1
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists):
"""Verify we get the correct name if the os-release file has
the distro name in quotes"""
m_os_release.return_value = OS_RELEASE_SLES
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('sles', '12.3', platform.machine()), dist)
+ self.assertEqual(("sles", "12.3", platform.machine()), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists):
"""Verify we get the correct name if the os-release file does not
have the distro name in quotes"""
m_os_release.return_value = OS_RELEASE_UBUNTU
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('ubuntu', '16.04', 'xenial'), dist)
+ self.assertEqual(("ubuntu", "16.04", "xenial"), dist)
- @mock.patch('platform.system')
- @mock.patch('platform.release')
- @mock.patch('cloudinit.util._parse_redhat_release')
+ @mock.patch("platform.system")
+ @mock.patch("platform.release")
+ @mock.patch("cloudinit.util._parse_redhat_release")
def test_get_linux_freebsd(
self,
m_parse_redhat_release,
@@ -651,174 +668,194 @@ class TestGetLinuxDistro(CiTestCase):
):
"""Verify we get the correct name and release name on FreeBSD."""
m_path_exists.return_value = False
- m_platform_release.return_value = '12.0-RELEASE-p10'
- m_platform_system.return_value = 'FreeBSD'
+ m_platform_release.return_value = "12.0-RELEASE-p10"
+ m_platform_system.return_value = "FreeBSD"
m_parse_redhat_release.return_value = {}
util.is_BSD.cache_clear()
dist = util.get_linux_distro()
- self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist)
+ self.assertEqual(("freebsd", "12.0-RELEASE-p10", ""), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_centos6(self, m_os_release, m_path_exists):
"""Verify we get the correct name and release name on CentOS 6."""
m_os_release.return_value = REDHAT_RELEASE_CENTOS_6
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('centos', '6.10', 'Final'), dist)
+ self.assertEqual(("centos", "6.10", "Final"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists):
"""Verify the correct release info on CentOS 7 without os-release."""
m_os_release.return_value = REDHAT_RELEASE_CENTOS_7
m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('centos', '7.5.1804', 'Core'), dist)
+ self.assertEqual(("centos", "7.5.1804", "Core"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists):
"""Verify redhat 7 read from os-release."""
m_os_release.return_value = OS_RELEASE_REDHAT_7
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('redhat', '7.5', 'Maipo'), dist)
+ self.assertEqual(("redhat", "7.5", "Maipo"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists):
"""Verify redhat 7 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_REDHAT_7
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('redhat', '7.5', 'Maipo'), dist)
+ self.assertEqual(("redhat", "7.5", "Maipo"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists):
"""Verify redhat 6 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_REDHAT_6
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('redhat', '6.10', 'Santiago'), dist)
+ self.assertEqual(("redhat", "6.10", "Santiago"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_copr_centos(self, m_os_release, m_path_exists):
"""Verify we get the correct name and release name on COPR CentOS."""
m_os_release.return_value = OS_RELEASE_CENTOS
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('centos', '7', 'Core'), dist)
+ self.assertEqual(("centos", "7", "Core"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists):
"""Verify almalinux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_ALMALINUX_8
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('almalinux', '8.3', 'Purple Manul'), dist)
+ self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists):
"""Verify almalinux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_ALMALINUX_8
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('almalinux', '8.3', 'Purple Manul'), dist)
+ self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 7 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_7
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('eurolinux', '7.9', 'Minsk'), dist)
+ self.assertEqual(("eurolinux", "7.9", "Minsk"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 7 read from os-release."""
m_os_release.return_value = OS_RELEASE_EUROLINUX_7
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('eurolinux', '7.9', 'Minsk'), dist)
+ self.assertEqual(("eurolinux", "7.9", "Minsk"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_8
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('eurolinux', '8.4', 'Vaduz'), dist)
+ self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists):
"""Verify eurolinux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_EUROLINUX_8
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('eurolinux', '8.4', 'Vaduz'), dist)
+ self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
+ def test_get_linux_miraclelinux8_rhrelease(
+ self, m_os_release, m_path_exists
+ ):
+ """Verify miraclelinux 8 read from redhat-release."""
+ m_os_release.return_value = REDHAT_RELEASE_MIRACLELINUX_8
+ m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
+ dist = util.get_linux_distro()
+ self.assertEqual(("miracle", "8.4", "Peony"), dist)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_get_linux_miraclelinux8_osrelease(
+ self, m_os_release, m_path_exists
+ ):
+ """Verify miraclelinux 8 read from os-release."""
+ m_os_release.return_value = OS_RELEASE_MIRACLELINUX_8
+ m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
+ dist = util.get_linux_distro()
+ self.assertEqual(("miraclelinux", "8", "Peony"), dist)
+
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists):
"""Verify rocky linux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_ROCKY_8
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('rocky', '8.3', 'Green Obsidian'), dist)
+ self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists):
"""Verify rocky linux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_ROCKY_8
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('rocky', '8.3', 'Green Obsidian'), dist)
+ self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists):
"""Verify virtuozzo linux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_VIRTUOZZO_8
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('virtuozzo', '8', 'Virtuozzo Linux'), dist)
+ self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists):
"""Verify virtuozzo linux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_VIRTUOZZO_8
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('virtuozzo', '8', 'Virtuozzo Linux'), dist)
+ self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists):
"""Verify cloudlinux 8 read from redhat-release."""
m_os_release.return_value = REDHAT_RELEASE_CLOUDLINUX_8
m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('cloudlinux', '8.4', 'Valery Rozhdestvensky'), dist)
+ self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists):
"""Verify cloudlinux 8 read from os-release."""
m_os_release.return_value = OS_RELEASE_CLOUDLINUX_8
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('cloudlinux', '8.4', 'Valery Rozhdestvensky'), dist)
+ self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_debian(self, m_os_release, m_path_exists):
"""Verify we get the correct name and release name on Debian."""
m_os_release.return_value = OS_RELEASE_DEBIAN
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('debian', '9', 'stretch'), dist)
+ self.assertEqual(("debian", "9", "stretch"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_openeuler(self, m_os_release, m_path_exists):
"""Verify get the correct name and release name on Openeuler."""
m_os_release.return_value = OS_RELEASE_OPENEULER_20
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('openEuler', '20.03', 'LTS-SP2'), dist)
+ self.assertEqual(("openEuler", "20.03", "LTS-SP2"), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_opensuse(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on openSUSE
prior to openSUSE Leap 15.
@@ -826,9 +863,9 @@ class TestGetLinuxDistro(CiTestCase):
m_os_release.return_value = OS_RELEASE_OPENSUSE
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('opensuse', '42.3', platform.machine()), dist)
+ self.assertEqual(("opensuse", "42.3", platform.machine()), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on openSUSE
for openSUSE Leap 15.0 and later.
@@ -836,9 +873,9 @@ class TestGetLinuxDistro(CiTestCase):
m_os_release.return_value = OS_RELEASE_OPENSUSE_L15
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('opensuse-leap', '15.0', platform.machine()), dist)
+ self.assertEqual(("opensuse-leap", "15.0", platform.machine()), dist)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on openSUSE
for openSUSE Tumbleweed
@@ -847,31 +884,31 @@ class TestGetLinuxDistro(CiTestCase):
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
self.assertEqual(
- ('opensuse-tumbleweed', '20180920', platform.machine()), dist
+ ("opensuse-tumbleweed", "20180920", platform.machine()), dist
)
- @mock.patch('cloudinit.util.load_file')
+ @mock.patch("cloudinit.util.load_file")
def test_get_linux_photon_os_release(self, m_os_release, m_path_exists):
"""Verify we get the correct name and machine arch on PhotonOS"""
m_os_release.return_value = OS_RELEASE_PHOTON
m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
dist = util.get_linux_distro()
- self.assertEqual(('photon', '4.0', 'VMware Photon OS/Linux'), dist)
+ self.assertEqual(("photon", "4.0", "VMware Photon OS/Linux"), dist)
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
+ @mock.patch("platform.system")
+ @mock.patch("platform.dist", create=True)
def test_get_linux_distro_no_data(
self, m_platform_dist, m_platform_system, m_path_exists
):
"""Verify we get no information if os-release does not exist"""
- m_platform_dist.return_value = ('', '', '')
+ m_platform_dist.return_value = ("", "", "")
m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
- self.assertEqual(('', '', ''), dist)
+ self.assertEqual(("", "", ""), dist)
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
+ @mock.patch("platform.system")
+ @mock.patch("platform.dist", create=True)
def test_get_linux_distro_no_impl(
self, m_platform_dist, m_platform_system, m_path_exists
):
@@ -881,55 +918,55 @@ class TestGetLinuxDistro(CiTestCase):
m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
- self.assertEqual(('', '', ''), dist)
+ self.assertEqual(("", "", ""), dist)
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
+ @mock.patch("platform.system")
+ @mock.patch("platform.dist", create=True)
def test_get_linux_distro_plat_data(
self, m_platform_dist, m_platform_system, m_path_exists
):
"""Verify we get the correct platform information"""
- m_platform_dist.return_value = ('foo', '1.1', 'aarch64')
+ m_platform_dist.return_value = ("foo", "1.1", "aarch64")
m_platform_system.return_value = "Linux"
m_path_exists.return_value = 0
dist = util.get_linux_distro()
- self.assertEqual(('foo', '1.1', 'aarch64'), dist)
+ self.assertEqual(("foo", "1.1", "aarch64"), dist)
class TestGetVariant:
@pytest.mark.parametrize(
- 'info, expected_variant',
+ "info, expected_variant",
[
- ({'system': 'Linux', 'dist': ('almalinux',)}, 'almalinux'),
- ({'system': 'linux', 'dist': ('alpine',)}, 'alpine'),
- ({'system': 'linux', 'dist': ('arch',)}, 'arch'),
- ({'system': 'linux', 'dist': ('centos',)}, 'centos'),
- ({'system': 'linux', 'dist': ('cloudlinux',)}, 'cloudlinux'),
- ({'system': 'linux', 'dist': ('debian',)}, 'debian'),
- ({'system': 'linux', 'dist': ('eurolinux',)}, 'eurolinux'),
- ({'system': 'linux', 'dist': ('fedora',)}, 'fedora'),
- ({'system': 'linux', 'dist': ('openEuler',)}, 'openeuler'),
- ({'system': 'linux', 'dist': ('photon',)}, 'photon'),
- ({'system': 'linux', 'dist': ('rhel',)}, 'rhel'),
- ({'system': 'linux', 'dist': ('rocky',)}, 'rocky'),
- ({'system': 'linux', 'dist': ('suse',)}, 'suse'),
- ({'system': 'linux', 'dist': ('virtuozzo',)}, 'virtuozzo'),
- ({'system': 'linux', 'dist': ('ubuntu',)}, 'ubuntu'),
- ({'system': 'linux', 'dist': ('linuxmint',)}, 'ubuntu'),
- ({'system': 'linux', 'dist': ('mint',)}, 'ubuntu'),
- ({'system': 'linux', 'dist': ('redhat',)}, 'rhel'),
- ({'system': 'linux', 'dist': ('opensuse',)}, 'suse'),
- ({'system': 'linux', 'dist': ('opensuse-tumbleweed',)}, 'suse'),
- ({'system': 'linux', 'dist': ('opensuse-leap',)}, 'suse'),
- ({'system': 'linux', 'dist': ('sles',)}, 'suse'),
- ({'system': 'linux', 'dist': ('sle_hpc',)}, 'suse'),
- ({'system': 'linux', 'dist': ('my_distro',)}, 'linux'),
- ({'system': 'Windows', 'dist': ('dontcare',)}, 'windows'),
- ({'system': 'Darwin', 'dist': ('dontcare',)}, 'darwin'),
- ({'system': 'Freebsd', 'dist': ('dontcare',)}, 'freebsd'),
- ({'system': 'Netbsd', 'dist': ('dontcare',)}, 'netbsd'),
- ({'system': 'Openbsd', 'dist': ('dontcare',)}, 'openbsd'),
- ({'system': 'Dragonfly', 'dist': ('dontcare',)}, 'dragonfly'),
+ ({"system": "Linux", "dist": ("almalinux",)}, "almalinux"),
+ ({"system": "linux", "dist": ("alpine",)}, "alpine"),
+ ({"system": "linux", "dist": ("arch",)}, "arch"),
+ ({"system": "linux", "dist": ("centos",)}, "centos"),
+ ({"system": "linux", "dist": ("cloudlinux",)}, "cloudlinux"),
+ ({"system": "linux", "dist": ("debian",)}, "debian"),
+ ({"system": "linux", "dist": ("eurolinux",)}, "eurolinux"),
+ ({"system": "linux", "dist": ("fedora",)}, "fedora"),
+ ({"system": "linux", "dist": ("openEuler",)}, "openeuler"),
+ ({"system": "linux", "dist": ("photon",)}, "photon"),
+ ({"system": "linux", "dist": ("rhel",)}, "rhel"),
+ ({"system": "linux", "dist": ("rocky",)}, "rocky"),
+ ({"system": "linux", "dist": ("suse",)}, "suse"),
+ ({"system": "linux", "dist": ("virtuozzo",)}, "virtuozzo"),
+ ({"system": "linux", "dist": ("ubuntu",)}, "ubuntu"),
+ ({"system": "linux", "dist": ("linuxmint",)}, "ubuntu"),
+ ({"system": "linux", "dist": ("mint",)}, "ubuntu"),
+ ({"system": "linux", "dist": ("redhat",)}, "rhel"),
+ ({"system": "linux", "dist": ("opensuse",)}, "suse"),
+ ({"system": "linux", "dist": ("opensuse-tumbleweed",)}, "suse"),
+ ({"system": "linux", "dist": ("opensuse-leap",)}, "suse"),
+ ({"system": "linux", "dist": ("sles",)}, "suse"),
+ ({"system": "linux", "dist": ("sle_hpc",)}, "suse"),
+ ({"system": "linux", "dist": ("my_distro",)}, "linux"),
+ ({"system": "Windows", "dist": ("dontcare",)}, "windows"),
+ ({"system": "Darwin", "dist": ("dontcare",)}, "darwin"),
+ ({"system": "Freebsd", "dist": ("dontcare",)}, "freebsd"),
+ ({"system": "Netbsd", "dist": ("dontcare",)}, "netbsd"),
+ ({"system": "Openbsd", "dist": ("dontcare",)}, "openbsd"),
+ ({"system": "Dragonfly", "dist": ("dontcare",)}, "dragonfly"),
],
)
def test_get_variant(self, info, expected_variant):
@@ -940,93 +977,96 @@ class TestGetVariant:
class TestJsonDumps(CiTestCase):
def test_is_str(self):
"""json_dumps should return a string."""
- self.assertTrue(isinstance(util.json_dumps({'abc': '123'}), str))
+ self.assertTrue(isinstance(util.json_dumps({"abc": "123"}), str))
def test_utf8(self):
- smiley = '\\ud83d\\ude03'
+ smiley = "\\ud83d\\ude03"
self.assertEqual(
- {'smiley': smiley}, json.loads(util.json_dumps({'smiley': smiley}))
+ {"smiley": smiley}, json.loads(util.json_dumps({"smiley": smiley}))
)
def test_non_utf8(self):
- blob = b'\xba\x03Qx-#y\xea'
+ blob = b"\xba\x03Qx-#y\xea"
self.assertEqual(
- {'blob': 'ci-b64:' + base64.b64encode(blob).decode('utf-8')},
- json.loads(util.json_dumps({'blob': blob})),
+ {"blob": "ci-b64:" + base64.b64encode(blob).decode("utf-8")},
+ json.loads(util.json_dumps({"blob": blob})),
)
-@mock.patch('os.path.exists')
+@mock.patch("os.path.exists")
class TestIsLXD(CiTestCase):
def test_is_lxd_true_on_sock_device(self, m_exists):
"""When lxd's /dev/lxd/sock exists, is_lxd returns true."""
m_exists.return_value = True
self.assertTrue(util.is_lxd())
- m_exists.assert_called_once_with('/dev/lxd/sock')
+ m_exists.assert_called_once_with("/dev/lxd/sock")
def test_is_lxd_false_when_sock_device_absent(self, m_exists):
"""When lxd's /dev/lxd/sock is absent, is_lxd returns false."""
m_exists.return_value = False
self.assertFalse(util.is_lxd())
- m_exists.assert_called_once_with('/dev/lxd/sock')
+ m_exists.assert_called_once_with("/dev/lxd/sock")
class TestReadCcFromCmdline:
+ if hasattr(pytest, "param"):
+ random_string = pytest.param(
+ CiTestCase.random_string(), None, id="random_string"
+ )
+ else:
+ random_string = (CiTestCase.random_string(), None)
+
@pytest.mark.parametrize(
"cmdline,expected_cfg",
[
# Return None if cmdline has no cc:<YAML>end_cc content.
- (CiTestCase.random_string(), None),
+ random_string,
# Return None if YAML content is empty string.
- ('foo cc: end_cc bar', None),
+ ("foo cc: end_cc bar", None),
# Return expected dictionary without trailing end_cc marker.
- ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}),
+ ("foo cc: ssh_pwauth: true", {"ssh_pwauth": True}),
# Return expected dictionary w escaped newline and no end_cc.
- ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}),
+ ("foo cc: ssh_pwauth: true\\n", {"ssh_pwauth": True}),
# Return expected dictionary of yaml between cc: and end_cc.
- ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}),
+ ("foo cc: ssh_pwauth: true end_cc bar", {"ssh_pwauth": True}),
# Return dict with list value w escaped newline, no end_cc.
(
- 'cc: ssh_import_id: [smoser, kirkland]\\n',
- {'ssh_import_id': ['smoser', 'kirkland']},
+ "cc: ssh_import_id: [smoser, kirkland]\\n",
+ {"ssh_import_id": ["smoser", "kirkland"]},
),
# Parse urlencoded brackets in yaml content.
(
- 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc',
- {'ssh_import_id': ['smoser', 'kirkland']},
+ "cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc",
+ {"ssh_import_id": ["smoser", "kirkland"]},
),
# Parse complete urlencoded yaml content.
(
- 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc',
- {'ssh_import_id': ['user1', 'user2']},
+ "cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc",
+ {"ssh_import_id": ["user1", "user2"]},
),
# Parse nested dictionary in yaml content.
(
- 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc',
- {'ntp': {'enabled': True, 'ntp_client': 'myclient'}},
+ "cc: ntp: {enabled: true, ntp_client: myclient} end_cc",
+ {"ntp": {"enabled": True, "ntp_client": "myclient"}},
),
# Parse single mapping value in yaml content.
- ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}),
+ ("cc: ssh_import_id: smoser end_cc", {"ssh_import_id": "smoser"}),
# Parse multiline content with multiple mapping and nested lists.
(
- (
- 'cc: ssh_import_id: [smoser, bob]\\n'
- 'runcmd: [ [ ls, -l ], echo hi ] end_cc'
- ),
+ "cc: ssh_import_id: [smoser, bob]\\n"
+ "runcmd: [ [ ls, -l ], echo hi ] end_cc",
{
- 'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi'],
+ "ssh_import_id": ["smoser", "bob"],
+ "runcmd": [["ls", "-l"], "echo hi"],
},
),
# Parse multiline encoded content w/ mappings and nested lists.
(
- (
- 'cc: ssh_import_id: %5Bsmoser, bob%5D\\n'
- 'runcmd: [ [ ls, -l ], echo hi ] end_cc'
- ),
+ "cc: ssh_import_id: %5Bsmoser, bob%5D\\n"
+ "runcmd: [ [ ls, -l ], echo hi ] end_cc",
{
- 'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi'],
+ "ssh_import_id": ["smoser", "bob"],
+ "runcmd": [["ls", "-l"], "echo hi"],
},
),
# test encoded escaped newlines work.
@@ -1035,17 +1075,13 @@ class TestReadCcFromCmdline:
# 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]'
(
(
- 'cc: '
- + (
- 'ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn'
- 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
- '%20echo%20hi%20%5D'
- )
- + ' end_cc'
+ "cc: " + "ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn"
+ "runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C"
+ "%20echo%20hi%20%5D" + " end_cc"
),
{
- 'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi'],
+ "ssh_import_id": ["smoser", "bob"],
+ "runcmd": [["ls", "-l"], "echo hi"],
},
),
# test encoded newlines work.
@@ -1054,34 +1090,26 @@ class TestReadCcFromCmdline:
# 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]'
(
(
- "cc: "
- + (
- 'ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A'
- 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
- '%20echo%20hi%20%5D'
- )
- + ' end_cc'
+ "cc: " + "ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A"
+ "runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C"
+ "%20echo%20hi%20%5D" + " end_cc"
),
{
- 'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi'],
+ "ssh_import_id": ["smoser", "bob"],
+ "runcmd": [["ls", "-l"], "echo hi"],
},
),
# Parse and merge multiple yaml content sections.
(
- (
- 'cc:ssh_import_id: [smoser, bob] end_cc '
- 'cc: runcmd: [ [ ls, -l ] ] end_cc'
- ),
- {'ssh_import_id': ['smoser', 'bob'], 'runcmd': [['ls', '-l']]},
+ "cc:ssh_import_id: [smoser, bob] end_cc "
+ "cc: runcmd: [ [ ls, -l ] ] end_cc",
+ {"ssh_import_id": ["smoser", "bob"], "runcmd": [["ls", "-l"]]},
),
# Parse and merge multiple encoded yaml content sections.
(
- (
- 'cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc '
- 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc'
- ),
- {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]},
+ "cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc "
+ "cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc",
+ {"ssh_import_id": ["smoser"], "runcmd": [["ls", "-l"]]},
),
],
)
@@ -1099,7 +1127,7 @@ class TestMountCb:
TODO: Test the if/else branch that actually performs the mounting operation
"""
- @pytest.yield_fixture
+ @pytest.fixture
def already_mounted_device_and_mountdict(self):
"""Mock an already-mounted device, and yield (device, mount dict)"""
device = "/dev/fake0"
@@ -1145,7 +1173,7 @@ class TestMountCb:
)
callback = mock.Mock(autospec=True)
- util.mount_cb('/dev/fake0', callback, mtype=mtype)
+ util.mount_cb("/dev/fake0", callback, mtype=mtype)
assert (
mock.call(
[
@@ -1429,7 +1457,7 @@ class TestWriteFile(helpers.TestCase):
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- open(path, 'w').close()
+ open(path, "w").close()
os.chmod(path, 0o666)
util.write_file(path, contents, preserve_mode=True)
@@ -1464,7 +1492,7 @@ class TestWriteFile(helpers.TestCase):
fake_se = FakeSelinux(my_file)
with mock.patch.object(
- importer, 'import_module', return_value=fake_se
+ importer, "import_module", return_value=fake_se
) as mockobj:
with util.SeLinuxGuard(my_file) as is_on:
self.assertTrue(is_on)
@@ -1472,7 +1500,7 @@ class TestWriteFile(helpers.TestCase):
self.assertEqual(1, len(fake_se.restored))
self.assertEqual(my_file, fake_se.restored[0])
- mockobj.assert_called_once_with('selinux')
+ mockobj.assert_called_once_with("selinux")
class TestDeleteDirContents(helpers.TestCase):
@@ -1543,7 +1571,7 @@ class TestDeleteDirContents(helpers.TestCase):
class TestKeyValStrings(helpers.TestCase):
def test_keyval_str_to_dict(self):
- expected = {'1': 'one', '2': 'one+one', 'ro': True}
+ expected = {"1": "one", "2": "one+one", "ro": True}
cmdline = "1=one ro 2=one+one"
self.assertEqual(expected, util.keyval_str_to_dict(cmdline))
@@ -1551,7 +1579,7 @@ class TestKeyValStrings(helpers.TestCase):
class TestGetCmdline(helpers.TestCase):
def test_cmdline_reads_debug_env(self):
with mock.patch.dict(
- "os.environ", values={'DEBUG_PROC_CMDLINE': 'abcd 123'}
+ "os.environ", values={"DEBUG_PROC_CMDLINE": "abcd 123"}
):
ret = util.get_cmdline()
self.assertEqual("abcd 123", ret)
@@ -1562,13 +1590,13 @@ class TestLoadYaml(helpers.CiTestCase):
with_logs = True
def test_simple(self):
- mydata = {'1': "one", '2': "two"}
+ mydata = {"1": "one", "2": "two"}
self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata)
def test_nonallowed_returns_default(self):
- '''Any unallowed types result in returning default; log the issue.'''
+ """Any unallowed types result in returning default; log the issue."""
# for now, anything not in the allowed list just returns the default.
- myyaml = yaml.dump({'1': "one"})
+ myyaml = yaml.dump({"1": "one"})
self.assertEqual(
util.load_yaml(
blob=myyaml, default=self.mydefault, allowed=(str,)
@@ -1576,37 +1604,37 @@ class TestLoadYaml(helpers.CiTestCase):
self.mydefault,
)
regex = re.compile(
- r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but'
- r' got dict'
+ r"Yaml load allows \(<(class|type) \'str\'>,\) root types, but"
+ r" got dict"
)
self.assertTrue(
regex.search(self.logs.getvalue()),
- msg='Missing expected yaml load error',
+ msg="Missing expected yaml load error",
)
def test_bogus_scan_error_returns_default(self):
- '''On Yaml scan error, load_yaml returns the default and logs issue.'''
+ """On Yaml scan error, load_yaml returns the default and logs issue."""
badyaml = "1\n 2:"
self.assertEqual(
util.load_yaml(blob=badyaml, default=self.mydefault),
self.mydefault,
)
self.assertIn(
- 'Failed loading yaml blob. Invalid format at line 2 column 3:'
+ "Failed loading yaml blob. Invalid format at line 2 column 3:"
' "mapping values are not allowed here',
self.logs.getvalue(),
)
def test_bogus_parse_error_returns_default(self):
- '''On Yaml parse error, load_yaml returns default and logs issue.'''
+ """On Yaml parse error, load_yaml returns default and logs issue."""
badyaml = "{}}"
self.assertEqual(
util.load_yaml(blob=badyaml, default=self.mydefault),
self.mydefault,
)
self.assertIn(
- 'Failed loading yaml blob. Invalid format at line 1 column 3:'
- " \"expected \'<document start>\', but found \'}\'",
+ "Failed loading yaml blob. Invalid format at line 1 column 3:"
+ " \"expected '<document start>', but found '}'",
self.logs.getvalue(),
)
@@ -1626,7 +1654,7 @@ class TestLoadYaml(helpers.CiTestCase):
def test_python_unicode(self):
# complex type of python/unicode is explicitly allowed
- myobj = {'1': "FOOBAR"}
+ myobj = {"1": "FOOBAR"}
safe_yaml = yaml.dump(myobj)
self.assertEqual(
util.load_yaml(blob=safe_yaml, default=self.mydefault), myobj
@@ -1650,144 +1678,144 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
)
elements = line.split()
for i in range(len(elements) + 1):
- lines = [' '.join(elements[0:i])]
+ lines = [" ".join(elements[0:i])]
if i < 10:
expected = None
else:
- expected = ('/dev/mapper/vg0-root', 'ext4', '/')
- self.assertEqual(expected, util.parse_mount_info('/', lines))
+ expected = ("/dev/mapper/vg0-root", "ext4", "/")
+ self.assertEqual(expected, util.parse_mount_info("/", lines))
def test_precise_ext4_root(self):
- lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines()
+ lines = helpers.readResource("mountinfo_precise_ext4.txt").splitlines()
- expected = ('/dev/mapper/vg0-root', 'ext4', '/')
- self.assertEqual(expected, util.parse_mount_info('/', lines))
- self.assertEqual(expected, util.parse_mount_info('/usr', lines))
- self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
+ expected = ("/dev/mapper/vg0-root", "ext4", "/")
+ self.assertEqual(expected, util.parse_mount_info("/", lines))
+ self.assertEqual(expected, util.parse_mount_info("/usr", lines))
+ self.assertEqual(expected, util.parse_mount_info("/usr/bin", lines))
- expected = ('/dev/md0', 'ext4', '/boot')
- self.assertEqual(expected, util.parse_mount_info('/boot', lines))
- self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
+ expected = ("/dev/md0", "ext4", "/boot")
+ self.assertEqual(expected, util.parse_mount_info("/boot", lines))
+ self.assertEqual(expected, util.parse_mount_info("/boot/grub", lines))
- expected = ('/dev/mapper/vg0-root', 'ext4', '/')
- self.assertEqual(expected, util.parse_mount_info('/home', lines))
- self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
+ expected = ("/dev/mapper/vg0-root", "ext4", "/")
+ self.assertEqual(expected, util.parse_mount_info("/home", lines))
+ self.assertEqual(expected, util.parse_mount_info("/home/me", lines))
- expected = ('tmpfs', 'tmpfs', '/run')
- self.assertEqual(expected, util.parse_mount_info('/run', lines))
+ expected = ("tmpfs", "tmpfs", "/run")
+ self.assertEqual(expected, util.parse_mount_info("/run", lines))
- expected = ('none', 'tmpfs', '/run/lock')
- self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+ expected = ("none", "tmpfs", "/run/lock")
+ self.assertEqual(expected, util.parse_mount_info("/run/lock", lines))
def test_raring_btrfs_root(self):
- lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines()
+ lines = helpers.readResource("mountinfo_raring_btrfs.txt").splitlines()
- expected = ('/dev/vda1', 'btrfs', '/')
- self.assertEqual(expected, util.parse_mount_info('/', lines))
- self.assertEqual(expected, util.parse_mount_info('/usr', lines))
- self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
- self.assertEqual(expected, util.parse_mount_info('/boot', lines))
- self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
+ expected = ("/dev/vda1", "btrfs", "/")
+ self.assertEqual(expected, util.parse_mount_info("/", lines))
+ self.assertEqual(expected, util.parse_mount_info("/usr", lines))
+ self.assertEqual(expected, util.parse_mount_info("/usr/bin", lines))
+ self.assertEqual(expected, util.parse_mount_info("/boot", lines))
+ self.assertEqual(expected, util.parse_mount_info("/boot/grub", lines))
- expected = ('/dev/vda1', 'btrfs', '/home')
- self.assertEqual(expected, util.parse_mount_info('/home', lines))
- self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
+ expected = ("/dev/vda1", "btrfs", "/home")
+ self.assertEqual(expected, util.parse_mount_info("/home", lines))
+ self.assertEqual(expected, util.parse_mount_info("/home/me", lines))
- expected = ('tmpfs', 'tmpfs', '/run')
- self.assertEqual(expected, util.parse_mount_info('/run', lines))
+ expected = ("tmpfs", "tmpfs", "/run")
+ self.assertEqual(expected, util.parse_mount_info("/run", lines))
- expected = ('none', 'tmpfs', '/run/lock')
- self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+ expected = ("none", "tmpfs", "/run/lock")
+ self.assertEqual(expected, util.parse_mount_info("/run/lock", lines))
- @mock.patch('cloudinit.util.os')
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.util.os")
+ @mock.patch("cloudinit.subp.subp")
def test_get_device_info_from_zpool(self, zpool_output, m_os):
# mock /dev/zfs exists
m_os.path.exists.return_value = True
# mock subp command from util.get_mount_info_fs_on_zpool
zpool_output.return_value = (
- helpers.readResource('zpool_status_simple.txt'),
- '',
+ helpers.readResource("zpool_status_simple.txt"),
+ "",
)
# save function return values and do asserts
- ret = util.get_device_info_from_zpool('vmzroot')
- self.assertEqual('gpt/system', ret)
+ ret = util.get_device_info_from_zpool("vmzroot")
+ self.assertEqual("gpt/system", ret)
self.assertIsNotNone(ret)
- m_os.path.exists.assert_called_with('/dev/zfs')
+ m_os.path.exists.assert_called_with("/dev/zfs")
- @mock.patch('cloudinit.util.os')
+ @mock.patch("cloudinit.util.os")
def test_get_device_info_from_zpool_no_dev_zfs(self, m_os):
# mock /dev/zfs missing
m_os.path.exists.return_value = False
# save function return values and do asserts
- ret = util.get_device_info_from_zpool('vmzroot')
+ ret = util.get_device_info_from_zpool("vmzroot")
self.assertIsNone(ret)
- @mock.patch('cloudinit.util.os')
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.util.os")
+ @mock.patch("cloudinit.subp.subp")
def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os):
"""Handle case where there is no zpool command"""
# mock /dev/zfs exists
m_os.path.exists.return_value = True
m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd")
- ret = util.get_device_info_from_zpool('vmzroot')
+ ret = util.get_device_info_from_zpool("vmzroot")
self.assertIsNone(ret)
- @mock.patch('cloudinit.util.os')
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.util.os")
+ @mock.patch("cloudinit.subp.subp")
def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os):
# mock /dev/zfs exists
m_os.path.exists.return_value = True
# mock subp command from util.get_mount_info_fs_on_zpool
zpool_output.return_value = (
- helpers.readResource('zpool_status_simple.txt'),
- 'error',
+ helpers.readResource("zpool_status_simple.txt"),
+ "error",
)
# save function return values and do asserts
- ret = util.get_device_info_from_zpool('vmzroot')
+ ret = util.get_device_info_from_zpool("vmzroot")
self.assertIsNone(ret)
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_parse_mount_with_ext(self, mount_out):
mount_out.return_value = (
- helpers.readResource('mount_parse_ext.txt'),
- '',
+ helpers.readResource("mount_parse_ext.txt"),
+ "",
)
# this one is valid and exists in mount_parse_ext.txt
- ret = util.parse_mount('/var')
- self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret)
+ ret = util.parse_mount("/var")
+ self.assertEqual(("/dev/mapper/vg00-lv_var", "ext4", "/var"), ret)
# another one that is valid and exists
- ret = util.parse_mount('/')
- self.assertEqual(('/dev/mapper/vg00-lv_root', 'ext4', '/'), ret)
+ ret = util.parse_mount("/")
+ self.assertEqual(("/dev/mapper/vg00-lv_root", "ext4", "/"), ret)
# this one exists in mount_parse_ext.txt
- ret = util.parse_mount('/sys/kernel/debug')
+ ret = util.parse_mount("/sys/kernel/debug")
self.assertIsNone(ret)
# this one does not even exist in mount_parse_ext.txt
- ret = util.parse_mount('/not/existing/mount')
+ ret = util.parse_mount("/not/existing/mount")
self.assertIsNone(ret)
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_parse_mount_with_zfs(self, mount_out):
mount_out.return_value = (
- helpers.readResource('mount_parse_zfs.txt'),
- '',
+ helpers.readResource("mount_parse_zfs.txt"),
+ "",
)
# this one is valid and exists in mount_parse_zfs.txt
- ret = util.parse_mount('/var')
- self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret)
+ ret = util.parse_mount("/var")
+ self.assertEqual(("vmzroot/ROOT/freebsd/var", "zfs", "/var"), ret)
# this one is the root, valid and also exists in mount_parse_zfs.txt
- ret = util.parse_mount('/')
- self.assertEqual(('vmzroot/ROOT/freebsd', 'zfs', '/'), ret)
+ ret = util.parse_mount("/")
+ self.assertEqual(("vmzroot/ROOT/freebsd", "zfs", "/"), ret)
# this one does not even exist in mount_parse_ext.txt
- ret = util.parse_mount('/not/existing/mount')
+ ret = util.parse_mount("/not/existing/mount")
self.assertIsNone(ret)
class TestIsX86(helpers.CiTestCase):
def test_is_x86_matches_x86_types(self):
"""is_x86 returns True if CPU architecture matches."""
- matched_arches = ['x86_64', 'i386', 'i586', 'i686']
+ matched_arches = ["x86_64", "i386", "i586", "i686"]
for arch in matched_arches:
self.assertTrue(
util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch
@@ -1795,16 +1823,16 @@ class TestIsX86(helpers.CiTestCase):
def test_is_x86_unmatched_types(self):
"""is_x86 returns Fale on non-intel x86 architectures."""
- unmatched_arches = ['ia64', '9000/800', 'arm64v71']
+ unmatched_arches = ["ia64", "9000/800", "arm64v71"]
for arch in unmatched_arches:
self.assertFalse(
util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch
)
- @mock.patch('cloudinit.util.os.uname')
+ @mock.patch("cloudinit.util.os.uname")
def test_is_x86_calls_uname_for_architecture(self, m_uname):
"""is_x86 returns True if platform from uname matches."""
- m_uname.return_value = [0, 1, 2, 3, 'x86_64']
+ m_uname.return_value = [0, 1, 2, 3, "x86_64"]
self.assertTrue(util.is_x86())
@@ -1817,18 +1845,18 @@ class TestGetConfigLogfiles(helpers.CiTestCase):
def test_default_log_file_present(self):
"""When default_log_file is set get_config_logfiles finds it."""
self.assertEqual(
- ['/my.log'], util.get_config_logfiles({'def_log_file': '/my.log'})
+ ["/my.log"], util.get_config_logfiles({"def_log_file": "/my.log"})
)
def test_output_logs_parsed_when_teeing_files(self):
"""When output configuration is parsed when teeing files."""
self.assertEqual(
- ['/himom.log', '/my.log'],
+ ["/himom.log", "/my.log"],
sorted(
util.get_config_logfiles(
{
- 'def_log_file': '/my.log',
- 'output': {'all': '|tee -a /himom.log'},
+ "def_log_file": "/my.log",
+ "output": {"all": "|tee -a /himom.log"},
}
)
),
@@ -1837,12 +1865,12 @@ class TestGetConfigLogfiles(helpers.CiTestCase):
def test_output_logs_parsed_when_redirecting(self):
"""When output configuration is parsed when redirecting to a file."""
self.assertEqual(
- ['/my.log', '/test.log'],
+ ["/my.log", "/test.log"],
sorted(
util.get_config_logfiles(
{
- 'def_log_file': '/my.log',
- 'output': {'all': '>/test.log'},
+ "def_log_file": "/my.log",
+ "output": {"all": ">/test.log"},
}
)
),
@@ -1851,12 +1879,12 @@ class TestGetConfigLogfiles(helpers.CiTestCase):
def test_output_logs_parsed_when_appending(self):
"""When output configuration is parsed when appending to a file."""
self.assertEqual(
- ['/my.log', '/test.log'],
+ ["/my.log", "/test.log"],
sorted(
util.get_config_logfiles(
{
- 'def_log_file': '/my.log',
- 'output': {'all': '>> /test.log'},
+ "def_log_file": "/my.log",
+ "output": {"all": ">> /test.log"},
}
)
),
@@ -1865,8 +1893,8 @@ class TestGetConfigLogfiles(helpers.CiTestCase):
class TestMultiLog(helpers.FilesystemMockingTestCase):
def _createConsole(self, root):
- os.mkdir(os.path.join(root, 'dev'))
- open(os.path.join(root, 'dev', 'console'), 'a').close()
+ os.mkdir(os.path.join(root, "dev"))
+ open(os.path.join(root, "dev", "console"), "a").close()
def setUp(self):
super(TestMultiLog, self).setUp()
@@ -1880,37 +1908,37 @@ class TestMultiLog(helpers.FilesystemMockingTestCase):
self.patchStdoutAndStderr(self.stdout, self.stderr)
def test_stderr_used_by_default(self):
- logged_string = 'test stderr output'
+ logged_string = "test stderr output"
util.multi_log(logged_string)
self.assertEqual(logged_string, self.stderr.getvalue())
def test_stderr_not_used_if_false(self):
- util.multi_log('should not see this', stderr=False)
- self.assertEqual('', self.stderr.getvalue())
+ util.multi_log("should not see this", stderr=False)
+ self.assertEqual("", self.stderr.getvalue())
def test_logs_go_to_console_by_default(self):
self._createConsole(self.root)
- logged_string = 'something very important'
+ logged_string = "something very important"
util.multi_log(logged_string)
- self.assertEqual(logged_string, open('/dev/console').read())
+ self.assertEqual(logged_string, open("/dev/console").read())
def test_logs_dont_go_to_stdout_if_console_exists(self):
self._createConsole(self.root)
- util.multi_log('something')
- self.assertEqual('', self.stdout.getvalue())
+ util.multi_log("something")
+ self.assertEqual("", self.stdout.getvalue())
def test_logs_go_to_stdout_if_console_does_not_exist(self):
- logged_string = 'something very important'
+ logged_string = "something very important"
util.multi_log(logged_string)
self.assertEqual(logged_string, self.stdout.getvalue())
def test_logs_dont_go_to_stdout_if_fallback_to_stdout_is_false(self):
- util.multi_log('something', fallback_to_stdout=False)
- self.assertEqual('', self.stdout.getvalue())
+ util.multi_log("something", fallback_to_stdout=False)
+ self.assertEqual("", self.stdout.getvalue())
def test_logs_go_to_log_if_given(self):
log = mock.MagicMock()
- logged_string = 'something very important'
+ logged_string = "something very important"
util.multi_log(logged_string, log=log)
self.assertEqual(
[((mock.ANY, logged_string), {})], log.log.call_args_list
@@ -1918,26 +1946,26 @@ class TestMultiLog(helpers.FilesystemMockingTestCase):
def test_newlines_stripped_from_log_call(self):
log = mock.MagicMock()
- expected_string = 'something very important'
- util.multi_log('{0}\n'.format(expected_string), log=log)
+ expected_string = "something very important"
+ util.multi_log("{0}\n".format(expected_string), log=log)
self.assertEqual((mock.ANY, expected_string), log.log.call_args[0])
def test_log_level_defaults_to_debug(self):
log = mock.MagicMock()
- util.multi_log('message', log=log)
+ util.multi_log("message", log=log)
self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0])
def test_given_log_level_used(self):
log = mock.MagicMock()
log_level = mock.Mock()
- util.multi_log('message', log=log, log_level=log_level)
+ util.multi_log("message", log=log, log_level=log_level)
self.assertEqual((log_level, mock.ANY), log.log.call_args[0])
class TestMessageFromString(helpers.TestCase):
def test_unicode_not_messed_up(self):
- roundtripped = util.message_from_string('\n').as_string()
- self.assertNotIn('\x00', roundtripped)
+ roundtripped = util.message_from_string("\n").as_string()
+ self.assertNotIn("\x00", roundtripped)
class TestReadSeeded(helpers.TestCase):
@@ -1951,12 +1979,12 @@ class TestReadSeeded(helpers.TestCase):
vd = b"vendordatablob"
helpers.populate_dir(
self.tmp,
- {'meta-data': "key1: val1", 'user-data': ud, 'vendor-data': vd},
+ {"meta-data": "key1: val1", "user-data": ud, "vendor-data": vd},
)
sdir = self.tmp + os.path.sep
(found_md, found_ud, found_vd) = util.read_seeded(sdir)
- self.assertEqual(found_md, {'key1': 'val1'})
+ self.assertEqual(found_md, {"key1": "val1"})
self.assertEqual(found_ud, ud)
self.assertEqual(found_vd, vd)
@@ -1971,12 +1999,12 @@ class TestReadSeededWithoutVendorData(helpers.TestCase):
ud = b"userdatablob"
vd = None
helpers.populate_dir(
- self.tmp, {'meta-data': "key1: val1", 'user-data': ud}
+ self.tmp, {"meta-data": "key1: val1", "user-data": ud}
)
sdir = self.tmp + os.path.sep
(found_md, found_ud, found_vd) = util.read_seeded(sdir)
- self.assertEqual(found_md, {'key1': 'val1'})
+ self.assertEqual(found_md, {"key1": "val1"})
self.assertEqual(found_ud, ud)
self.assertEqual(found_vd, vd)
@@ -1985,7 +2013,7 @@ class TestEncode(helpers.TestCase):
"""Test the encoding functions"""
def test_decode_binary_plain_text_with_hex(self):
- blob = 'BOOTABLE_FLAG=\x80init=/bin/systemd'
+ blob = "BOOTABLE_FLAG=\x80init=/bin/systemd"
text = util.decode_binary(blob)
self.assertEqual(text, blob)
@@ -1993,20 +2021,20 @@ class TestEncode(helpers.TestCase):
class TestProcessExecutionError(helpers.TestCase):
template = (
- '{description}\n'
- 'Command: {cmd}\n'
- 'Exit code: {exit_code}\n'
- 'Reason: {reason}\n'
- 'Stdout: {stdout}\n'
- 'Stderr: {stderr}'
+ "{description}\n"
+ "Command: {cmd}\n"
+ "Exit code: {exit_code}\n"
+ "Reason: {reason}\n"
+ "Stdout: {stdout}\n"
+ "Stderr: {stderr}"
)
- empty_attr = '-'
- empty_description = 'Unexpected error while running command.'
+ empty_attr = "-"
+ empty_description = "Unexpected error while running command."
def test_pexec_error_indent_text(self):
error = subp.ProcessExecutionError()
- msg = 'abc\ndef'
- formatted = 'abc\n{0}def'.format(' ' * 4)
+ msg = "abc\ndef"
+ formatted = "abc\n{0}def".format(" " * 4)
self.assertEqual(error._indent_text(msg, indent_level=4), formatted)
self.assertEqual(
error._indent_text(msg.encode(), indent_level=4),
@@ -2041,9 +2069,9 @@ class TestProcessExecutionError(helpers.TestCase):
)
def test_pexec_error_single_line_msgs(self):
- stdout_msg = 'out out'
- stderr_msg = 'error error'
- cmd = 'test command'
+ stdout_msg = "out out"
+ stderr_msg = "error error"
+ cmd = "test command"
exit_code = 3
error = subp.ProcessExecutionError(
stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd
@@ -2062,25 +2090,25 @@ class TestProcessExecutionError(helpers.TestCase):
def test_pexec_error_multi_line_msgs(self):
# make sure bytes is converted handled properly when formatting
- stdout_msg = 'multi\nline\noutput message'.encode()
- stderr_msg = 'multi\nline\nerror message\n\n\n'
+ stdout_msg = "multi\nline\noutput message".encode()
+ stderr_msg = "multi\nline\nerror message\n\n\n"
error = subp.ProcessExecutionError(
stdout=stdout_msg, stderr=stderr_msg
)
self.assertEqual(
str(error),
- '\n'.join(
+ "\n".join(
(
- '{description}',
- 'Command: {empty_attr}',
- 'Exit code: {empty_attr}',
- 'Reason: {empty_attr}',
- 'Stdout: multi',
- ' line',
- ' output message',
- 'Stderr: multi',
- ' line',
- ' error message',
+ "{description}",
+ "Command: {empty_attr}",
+ "Exit code: {empty_attr}",
+ "Reason: {empty_attr}",
+ "Stdout: multi",
+ " line",
+ " output message",
+ "Stderr: multi",
+ " line",
+ " error message",
)
).format(
description=self.empty_description, empty_attr=self.empty_attr
@@ -2091,31 +2119,31 @@ class TestProcessExecutionError(helpers.TestCase):
class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
def test_id_in_os_release_quoted(self):
"""os-release containing ID="ubuntu-core" is snappy."""
- orcontent = '\n'.join(['ID="ubuntu-core"', ''])
+ orcontent = "\n".join(['ID="ubuntu-core"', ""])
root_d = self.tmp_dir()
- helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ helpers.populate_dir(root_d, {"etc/os-release": orcontent})
self.reRoot(root_d)
self.assertTrue(util.system_is_snappy())
def test_id_in_os_release(self):
"""os-release containing ID=ubuntu-core is snappy."""
- orcontent = '\n'.join(['ID=ubuntu-core', ''])
+ orcontent = "\n".join(["ID=ubuntu-core", ""])
root_d = self.tmp_dir()
- helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ helpers.populate_dir(root_d, {"etc/os-release": orcontent})
self.reRoot(root_d)
self.assertTrue(util.system_is_snappy())
- @mock.patch('cloudinit.util.get_cmdline')
+ @mock.patch("cloudinit.util.get_cmdline")
def test_bad_content_in_os_release_no_effect(self, m_cmdline):
"""malformed os-release should not raise exception."""
- m_cmdline.return_value = 'root=/dev/sda'
- orcontent = '\n'.join(['IDubuntu-core', ''])
+ m_cmdline.return_value = "root=/dev/sda"
+ orcontent = "\n".join(["IDubuntu-core", ""])
root_d = self.tmp_dir()
- helpers.populate_dir(root_d, {'etc/os-release': orcontent})
+ helpers.populate_dir(root_d, {"etc/os-release": orcontent})
self.reRoot()
self.assertFalse(util.system_is_snappy())
- @mock.patch('cloudinit.util.get_cmdline')
+ @mock.patch("cloudinit.util.get_cmdline")
def test_snap_core_in_cmdline_is_snappy(self, m_cmdline):
"""The string snap_core= in kernel cmdline indicates snappy."""
cmdline = (
@@ -2128,31 +2156,31 @@ class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
self.assertTrue(util.system_is_snappy())
self.assertTrue(m_cmdline.call_count > 0)
- @mock.patch('cloudinit.util.get_cmdline')
+ @mock.patch("cloudinit.util.get_cmdline")
def test_nothing_found_is_not_snappy(self, m_cmdline):
"""If no positive identification, then not snappy."""
- m_cmdline.return_value = 'root=/dev/sda'
+ m_cmdline.return_value = "root=/dev/sda"
self.reRoot()
self.assertFalse(util.system_is_snappy())
self.assertTrue(m_cmdline.call_count > 0)
- @mock.patch('cloudinit.util.get_cmdline')
+ @mock.patch("cloudinit.util.get_cmdline")
def test_channel_ini_with_snappy_is_snappy(self, m_cmdline):
"""A Channel.ini file with 'ubuntu-core' indicates snappy."""
- m_cmdline.return_value = 'root=/dev/sda'
+ m_cmdline.return_value = "root=/dev/sda"
root_d = self.tmp_dir()
- content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""])
- helpers.populate_dir(root_d, {'etc/system-image/channel.ini': content})
+ content = "\n".join(["[Foo]", "source = 'ubuntu-core'", ""])
+ helpers.populate_dir(root_d, {"etc/system-image/channel.ini": content})
self.reRoot(root_d)
self.assertTrue(util.system_is_snappy())
- @mock.patch('cloudinit.util.get_cmdline')
+ @mock.patch("cloudinit.util.get_cmdline")
def test_system_image_config_dir_is_snappy(self, m_cmdline):
"""Existence of /etc/system-image/config.d indicates snappy."""
- m_cmdline.return_value = 'root=/dev/sda'
+ m_cmdline.return_value = "root=/dev/sda"
root_d = self.tmp_dir()
helpers.populate_dir(
- root_d, {'etc/system-image/config.d/my.file': "_unused"}
+ root_d, {"etc/system-image/config.d/my.file": "_unused"}
)
self.reRoot(root_d)
self.assertTrue(util.system_is_snappy())
@@ -2162,16 +2190,16 @@ class TestLoadShellContent(helpers.TestCase):
def test_comments_handled_correctly(self):
"""Shell comments should be allowed in the content."""
self.assertEqual(
- {'key1': 'val1', 'key2': 'val2', 'key3': 'val3 #tricky'},
+ {"key1": "val1", "key2": "val2", "key3": "val3 #tricky"},
util.load_shell_content(
- '\n'.join(
+ "\n".join(
[
"#top of file comment",
"key1=val1 #this is a comment",
"# second comment",
'key2="val2" # inlin comment#badkey=wark',
'key3="val3 #tricky"',
- '',
+ "",
]
)
),
@@ -2181,15 +2209,15 @@ class TestLoadShellContent(helpers.TestCase):
class TestGetProcEnv(helpers.TestCase):
"""test get_proc_env."""
- null = b'\x00'
- simple1 = b'HOME=/'
- simple2 = b'PATH=/bin:/sbin'
- bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371
- mixed = b'MIXED=' + b'ab\xccde'
+ null = b"\x00"
+ simple1 = b"HOME=/"
+ simple2 = b"PATH=/bin:/sbin"
+ bootflag = b"BOOTABLE_FLAG=\x80" # from LP: #1775371
+ mixed = b"MIXED=" + b"ab\xccde"
- def _val_decoded(self, blob, encoding='utf-8', errors='replace'):
+ def _val_decoded(self, blob, encoding="utf-8", errors="replace"):
# return the value portion of key=val decoded.
- return blob.split(b'=', 1)[1].decode(encoding, errors)
+ return blob.split(b"=", 1)[1].decode(encoding, errors)
@mock.patch("cloudinit.util.load_file")
def test_non_utf8_in_environment(self, m_load_file):
@@ -2201,10 +2229,10 @@ class TestGetProcEnv(helpers.TestCase):
self.assertEqual(
{
- 'BOOTABLE_FLAG': self._val_decoded(self.bootflag),
- 'HOME': '/',
- 'PATH': '/bin:/sbin',
- 'MIXED': self._val_decoded(self.mixed),
+ "BOOTABLE_FLAG": self._val_decoded(self.bootflag),
+ "HOME": "/",
+ "PATH": "/bin:/sbin",
+ "MIXED": self._val_decoded(self.mixed),
},
util.get_proc_env(1),
)
@@ -2218,7 +2246,7 @@ class TestGetProcEnv(helpers.TestCase):
m_load_file.return_value = content
self.assertEqual(
- dict([t.split(b'=') for t in lines]),
+ dict([t.split(b"=") for t in lines]),
util.get_proc_env(1, encoding=None),
)
self.assertEqual(1, m_load_file.call_count)
@@ -2229,7 +2257,7 @@ class TestGetProcEnv(helpers.TestCase):
content = self.null.join((self.simple1, self.simple2))
m_load_file.return_value = content
self.assertEqual(
- {'HOME': '/', 'PATH': '/bin:/sbin'}, util.get_proc_env(1)
+ {"HOME": "/", "PATH": "/bin:/sbin"}, util.get_proc_env(1)
)
self.assertEqual(1, m_load_file.call_count)
@@ -2252,13 +2280,13 @@ class TestKernelVersion:
"""test kernel version function"""
params = [
- ('5.6.19-300.fc32.x86_64', (5, 6)),
- ('4.15.0-101-generic', (4, 15)),
- ('3.10.0-1062.12.1.vz7.131.10', (3, 10)),
- ('4.18.0-144.el8.x86_64', (4, 18)),
+ ("5.6.19-300.fc32.x86_64", (5, 6)),
+ ("4.15.0-101-generic", (4, 15)),
+ ("3.10.0-1062.12.1.vz7.131.10", (3, 10)),
+ ("4.18.0-144.el8.x86_64", (4, 18)),
]
- @mock.patch('os.uname')
+ @mock.patch("os.uname")
@pytest.mark.parametrize("uname_release,expected", params)
def test_kernel_version(self, m_uname, uname_release, expected):
m_uname.return_value.release = uname_release
@@ -2266,11 +2294,11 @@ class TestKernelVersion:
class TestFindDevs:
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_find_devs_with(self, m_subp):
m_subp.return_value = (
'/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"',
- '',
+ "",
)
devlist = util.find_devs_with()
assert devlist == [
@@ -2282,32 +2310,32 @@ class TestFindDevs:
'/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"'
]
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_find_devs_with_openbsd(self, m_subp):
- m_subp.return_value = ('cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '')
+ m_subp.return_value = ("cd0:,sd0:630d98d32b5d3759,sd1:,fd0:", "")
devlist = util.find_devs_with_openbsd()
- assert devlist == ['/dev/cd0a', '/dev/sd1i']
+ assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"]
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.subp.subp")
def test_find_devs_with_openbsd_with_criteria(self, m_subp):
- m_subp.return_value = ('cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '')
+ m_subp.return_value = ("cd0:,sd0:630d98d32b5d3759,sd1:,fd0:", "")
devlist = util.find_devs_with_openbsd(criteria="TYPE=iso9660")
- assert devlist == ['/dev/cd0a']
+ assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"]
# lp: #1841466
devlist = util.find_devs_with_openbsd(criteria="LABEL_FATBOOT=A_LABEL")
- assert devlist == ['/dev/cd0a', '/dev/sd1i']
+ assert devlist == ["/dev/cd0a", "/dev/sd1a", "/dev/sd1i"]
@pytest.mark.parametrize(
- 'criteria,expected_devlist',
+ "criteria,expected_devlist",
(
- (None, ['/dev/msdosfs/EFISYS', '/dev/iso9660/config-2']),
- ('TYPE=iso9660', ['/dev/iso9660/config-2']),
- ('TYPE=vfat', ['/dev/msdosfs/EFISYS']),
- ('LABEL_FATBOOT=A_LABEL', []), # lp: #1841466
+ (None, ["/dev/msdosfs/EFISYS", "/dev/iso9660/config-2"]),
+ ("TYPE=iso9660", ["/dev/iso9660/config-2"]),
+ ("TYPE=vfat", ["/dev/msdosfs/EFISYS"]),
+ ("LABEL_FATBOOT=A_LABEL", []), # lp: #1841466
),
)
- @mock.patch('glob.glob')
+ @mock.patch("glob.glob")
def test_find_devs_with_freebsd(self, m_glob, criteria, expected_devlist):
def fake_glob(pattern):
msdos = ["/dev/msdosfs/EFISYS"]
@@ -2324,14 +2352,14 @@ class TestFindDevs:
assert devlist == expected_devlist
@pytest.mark.parametrize(
- 'criteria,expected_devlist',
+ "criteria,expected_devlist",
(
- (None, ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']),
- ('TYPE=iso9660', ['/dev/cd0']),
- ('TYPE=vfat', ["/dev/ld0", "/dev/dk0", "/dev/dk1"]),
+ (None, ["/dev/ld0", "/dev/dk0", "/dev/dk1", "/dev/cd0"]),
+ ("TYPE=iso9660", ["/dev/cd0"]),
+ ("TYPE=vfat", ["/dev/ld0", "/dev/dk0", "/dev/dk1"]),
(
- 'LABEL_FATBOOT=A_LABEL', # lp: #1841466
- ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0'],
+ "LABEL_FATBOOT=A_LABEL", # lp: #1841466
+ ["/dev/ld0", "/dev/dk0", "/dev/dk1", "/dev/cd0"],
),
),
)
@@ -2340,39 +2368,31 @@ class TestFindDevs:
side_effect_values = [
("ld0 dk0 dk1 cd0", ""),
(
- (
- "mscdlabel: CDIOREADTOCHEADER: "
- "Inappropriate ioctl for device\n"
- "track (ctl=4) at sector 0\n"
- "disklabel not written\n"
- ),
+ "mscdlabel: CDIOREADTOCHEADER: "
+ "Inappropriate ioctl for device\n"
+ "track (ctl=4) at sector 0\n"
+ "disklabel not written\n",
"",
),
(
- (
- "mscdlabel: CDIOREADTOCHEADER: "
- "Inappropriate ioctl for device\n"
- "track (ctl=4) at sector 0\n"
- "disklabel not written\n"
- ),
+ "mscdlabel: CDIOREADTOCHEADER: "
+ "Inappropriate ioctl for device\n"
+ "track (ctl=4) at sector 0\n"
+ "disklabel not written\n",
"",
),
(
- (
- "mscdlabel: CDIOREADTOCHEADER: "
- "Inappropriate ioctl for device\n"
- "track (ctl=4) at sector 0\n"
- "disklabel not written\n"
- ),
+ "mscdlabel: CDIOREADTOCHEADER: "
+ "Inappropriate ioctl for device\n"
+ "track (ctl=4) at sector 0\n"
+ "disklabel not written\n",
"",
),
(
- (
- "track (ctl=4) at sector 0\n"
- 'ISO filesystem, label "config-2", '
- "creation time: 2020/03/31 17:29\n"
- "adding as 'a'\n"
- ),
+ "track (ctl=4) at sector 0\n"
+ 'ISO filesystem, label "config-2", '
+ "creation time: 2020/03/31 17:29\n"
+ "adding as 'a'\n",
"",
),
]
@@ -2381,14 +2401,14 @@ class TestFindDevs:
assert devlist == expected_devlist
@pytest.mark.parametrize(
- 'criteria,expected_devlist',
+ "criteria,expected_devlist",
(
- (None, ['/dev/vbd0', '/dev/cd0', '/dev/acd0']),
- ('TYPE=iso9660', ['/dev/cd0', '/dev/acd0']),
- ('TYPE=vfat', ['/dev/vbd0']),
+ (None, ["/dev/vbd0", "/dev/cd0", "/dev/acd0"]),
+ ("TYPE=iso9660", ["/dev/cd0", "/dev/acd0"]),
+ ("TYPE=vfat", ["/dev/vbd0"]),
(
- 'LABEL_FATBOOT=A_LABEL', # lp: #1841466
- ['/dev/vbd0', '/dev/cd0', '/dev/acd0'],
+ "LABEL_FATBOOT=A_LABEL", # lp: #1841466
+ ["/dev/vbd0", "/dev/cd0", "/dev/acd0"],
),
),
)
@@ -2396,7 +2416,7 @@ class TestFindDevs:
def test_find_devs_with_dragonflybsd(
self, m_subp, criteria, expected_devlist
):
- m_subp.return_value = ('md2 md1 cd0 vbd0 acd0 vn3 vn2 vn1 vn0 md0', '')
+ m_subp.return_value = ("md2 md1 cd0 vbd0 acd0 vn3 vn2 vn1 vn0 md0", "")
devlist = util.find_devs_with_dragonflybsd(criteria=criteria)
assert devlist == expected_devlist