summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Haller <thaller@redhat.com>2023-05-11 11:26:56 +0200
committerThomas Haller <thaller@redhat.com>2023-05-15 14:59:58 +0200
commitefc14fcbec90cc057f37d8a4fca48c5b5b15b786 (patch)
treef8360c5f54430cb14ed7165cbcb85ca2014fc677
parent50f97307c5e00157fdd29d3b61ee873f9cb5886b (diff)
downloadNetworkManager-efc14fcbec90cc057f37d8a4fca48c5b5b15b786.tar.gz
test-client: drop TestNmClient base class from tests
With the unit test framework, we define special methods, like setUp() and test_*(). This is documented, but not obvious. Previously, TestNmClient was the base class for our tests classes, and it provided some functionality (and state). It was utterly confusing how pieces fit together. Instead, move the state to a new class NMTestContext(). That contains most of the code from TestNmClient. Drop TestNmClient and let the test classes directly descend from unittest.TestCase. The difference is, when you now look at a certain test (test_001()), you can easier understand which code runs when. First, the test class has a setUp() method which runs, but that method is now trivial without extra context. Second, there is the @nm_test attribute that wraps the function. But that's it. It's all at one place, and we delegate instead of inherit.
-rwxr-xr-xsrc/tests/client/test-client.py136
1 files changed, 74 insertions, 62 deletions
diff --git a/src/tests/client/test-client.py b/src/tests/client/test-client.py
index d752fd9f34..9aa018b61c 100755
--- a/src/tests/client/test-client.py
+++ b/src/tests/client/test-client.py
@@ -1050,21 +1050,25 @@ class AsyncProcess:
###############################################################################
-MAX_JOBS = 15
+class NMTestContext:
+ MAX_JOBS = 15
-
-class TestNmClient(unittest.TestCase):
- def __init__(self, *args, **kwargs):
+ def __init__(self, testMethodName):
+ self.testMethodName = testMethodName
self._calling_num = {}
self._skip_test_for_l10n_diff = []
self._async_jobs = []
- self._results = []
+ self.ctx_results = []
self.srv = None
- unittest.TestCase.__init__(self, *args, **kwargs)
+
+ def calling_num(self, calling_fcn):
+ calling_num = self._calling_num.get(calling_fcn, 0) + 1
+ self._calling_num[calling_fcn] = calling_num
+ return calling_num
def srv_start(self):
self.srv_shutdown()
- self.srv = NMStubServer(self._testMethodName)
+ self.srv = NMStubServer(self.testMethodName)
def srv_shutdown(self):
if self.srv is not None:
@@ -1077,13 +1081,13 @@ class TestNmClient(unittest.TestCase):
while True:
while True:
- for async_job in list(self._async_jobs[0:MAX_JOBS]):
+ for async_job in list(self._async_jobs[0 : self.MAX_JOBS]):
async_job.start()
# start up to MAX_JOBS jobs, but poll() and complete those
# that are already exited. Retry, until there are no more
# jobs to start, or until MAX_JOBS are running.
jobs_running = []
- for async_job in list(self._async_jobs[0:MAX_JOBS]):
+ for async_job in list(self._async_jobs[0 : self.MAX_JOBS]):
if async_job.poll() is not None:
self._async_jobs.remove(async_job)
async_job.wait_and_complete()
@@ -1091,7 +1095,7 @@ class TestNmClient(unittest.TestCase):
jobs_running.append(async_job)
if len(jobs_running) >= len(self._async_jobs):
break
- if len(jobs_running) >= MAX_JOBS:
+ if len(jobs_running) >= self.MAX_JOBS:
break
if not jobs_running:
@@ -1111,7 +1115,10 @@ class TestNmClient(unittest.TestCase):
def async_wait(self):
return self.async_start(wait_all=True)
- def _nm_test_post(self):
+ def async_append_job(self, async_job):
+ self._async_jobs.append(async_job)
+
+ def run_post(self):
self.async_wait()
@@ -1119,8 +1126,8 @@ class TestNmClient(unittest.TestCase):
self._calling_num = None
- results = self._results
- self._results = None
+ results = self.ctx_results
+ self.ctx_results = None
if len(results) == 0:
return
@@ -1128,12 +1135,10 @@ class TestNmClient(unittest.TestCase):
skip_test_for_l10n_diff = self._skip_test_for_l10n_diff
self._skip_test_for_l10n_diff = None
- test_name = self._testMethodName
-
filename = os.path.abspath(
PathConfiguration.srcdir()
+ "/test-client.check-on-disk/"
- + test_name
+ + self.testMethodName
+ ".expected"
)
@@ -1221,12 +1226,16 @@ class TestNmClient(unittest.TestCase):
% (",".join(skip_test_for_l10n_diff))
)
+
+###############################################################################
+
+
+class TestNmcli(unittest.TestCase):
def setUp(self):
Util.skip_without_dbus_session()
Util.skip_without_NM()
+ self.ctx = NMTestContext(self._testMethodName)
-
-class TestNmcli(TestNmClient):
def call_nmcli_l(
self,
args,
@@ -1328,11 +1337,10 @@ class TestNmcli(TestNmClient):
):
if sync_barrier:
- self.async_wait()
+ self.ctx.async_wait()
calling_fcn = frame.f_code.co_name
- calling_num = self._calling_num.get(calling_fcn, 0) + 1
- self._calling_num[calling_fcn] = calling_num
+ calling_num = self.ctx.calling_num(calling_fcn)
test_name = "%s-%03d" % (calling_fcn, calling_num)
@@ -1401,8 +1409,8 @@ class TestNmcli(TestNmClient):
if expected_stderr is _DEFAULT_ARG:
expected_stderr = None
- results_idx = len(self._results)
- self._results.append(None)
+ results_idx = len(self.ctx.ctx_results)
+ self.ctx.ctx_results.append(None)
def complete_cb(async_job, returncode, stdout, stderr):
@@ -1477,7 +1485,7 @@ class TestNmcli(TestNmClient):
)
content = ("size: %s\n" % (len(content))).encode("utf8") + content
- self._results[results_idx] = {
+ self.ctx.ctx_results[results_idx] = {
"test_name": test_name,
"ignore_l10n_diff": ignore_l10n_diff,
"content": content,
@@ -1486,38 +1494,38 @@ class TestNmcli(TestNmClient):
env = Util.cmd_create_env(lang, calling_num, fatal_warnings, extra_env)
async_job = AsyncProcess(args=args, env=env, complete_cb=complete_cb)
- self._async_jobs.append(async_job)
+ self.ctx.async_append_job(async_job)
- self.async_start(wait_all=sync_barrier)
+ self.ctx.async_start(wait_all=sync_barrier)
def nm_test(func):
def f(self):
- self.srv_start()
+ self.ctx.srv_start()
func(self)
- self._nm_test_post()
+ self.ctx.run_post()
return f
def nm_test_no_dbus(func):
def f(self):
func(self)
- self._nm_test_post()
+ self.ctx.run_post()
return f
def init_001(self):
- self.srv.op_AddObj("WiredDevice", iface="eth0")
- self.srv.op_AddObj("WiredDevice", iface="eth1")
- self.srv.op_AddObj("WifiDevice", iface="wlan0")
- self.srv.op_AddObj("WifiDevice", iface="wlan1")
+ self.ctx.srv.op_AddObj("WiredDevice", iface="eth0")
+ self.ctx.srv.op_AddObj("WiredDevice", iface="eth1")
+ self.ctx.srv.op_AddObj("WifiDevice", iface="wlan0")
+ self.ctx.srv.op_AddObj("WifiDevice", iface="wlan1")
# add another device with an identical ifname. The D-Bus API itself
# does not enforce the ifnames are unique.
- self.srv.op_AddObj("WifiDevice", ident="wlan1/x", iface="wlan1")
+ self.ctx.srv.op_AddObj("WifiDevice", ident="wlan1/x", iface="wlan1")
- self.srv.op_AddObj("WifiAp", device="wlan0", rsnf=0x0)
+ self.ctx.srv.op_AddObj("WifiAp", device="wlan0", rsnf=0x0)
- self.srv.op_AddObj("WifiAp", device="wlan0")
+ self.ctx.srv.op_AddObj("WifiAp", device="wlan0")
NM_AP_FLAGS = getattr(NM, "80211ApSecurityFlags")
rsnf = 0x0
@@ -1526,11 +1534,11 @@ class TestNmcli(TestNmClient):
rsnf = rsnf | NM_AP_FLAGS.GROUP_TKIP
rsnf = rsnf | NM_AP_FLAGS.GROUP_CCMP
rsnf = rsnf | NM_AP_FLAGS.KEY_MGMT_SAE
- self.srv.op_AddObj("WifiAp", device="wlan0", wpaf=0x0, rsnf=rsnf)
+ self.ctx.srv.op_AddObj("WifiAp", device="wlan0", wpaf=0x0, rsnf=rsnf)
- self.srv.op_AddObj("WifiAp", device="wlan1")
+ self.ctx.srv.op_AddObj("WifiAp", device="wlan1")
- self.srv.addConnection(
+ self.ctx.srv.addConnection(
{"connection": {"type": "802-3-ethernet", "id": "con-1"}}
)
@@ -1590,7 +1598,7 @@ class TestNmcli(TestNmClient):
replace_uuids = []
replace_uuids.append(
- self.srv.ReplaceTextConUuid(
+ self.ctx.srv.ReplaceTextConUuid(
"con-xx1", "UUID-con-xx1-REPLACED-REPLACED-REPLA"
)
)
@@ -1605,7 +1613,7 @@ class TestNmcli(TestNmClient):
for con_name, apn in con_gsm_list:
replace_uuids.append(
- self.srv.ReplaceTextConUuid(
+ self.ctx.srv.ReplaceTextConUuid(
con_name, "UUID-" + con_name + "-REPLACED-REPLACED-REPL"
)
)
@@ -1637,7 +1645,7 @@ class TestNmcli(TestNmClient):
)
replace_uuids.append(
- self.srv.ReplaceTextConUuid(
+ self.ctx.srv.ReplaceTextConUuid(
"ethernet", "UUID-ethernet-REPLACED-REPLACED-REPL"
)
)
@@ -1722,9 +1730,9 @@ class TestNmcli(TestNmClient):
["-f", "ALL", "-t", "dev", "show", "eth0"], replace_stdout=replace_uuids
)
- self.async_wait()
+ self.ctx.async_wait()
- self.srv.setProperty(
+ self.ctx.srv.setProperty(
"/org/freedesktop/NetworkManager/ActiveConnection/1",
"State",
dbus.UInt32(NM.ActiveConnectionState.DEACTIVATING),
@@ -1734,8 +1742,8 @@ class TestNmcli(TestNmClient):
for i in [0, 1]:
if i == 1:
- self.async_wait()
- self.srv.op_ConnectionSetVisible(False, con_id="ethernet")
+ self.ctx.async_wait()
+ self.ctx.srv.op_ConnectionSetVisible(False, con_id="ethernet")
for mode in Util.iter_nmcli_output_modes():
self.call_nmcli_l(
@@ -1768,7 +1776,7 @@ class TestNmcli(TestNmClient):
replace_uuids = []
replace_uuids.append(
- self.srv.ReplaceTextConUuid(
+ self.ctx.srv.ReplaceTextConUuid(
"con-xx1", "UUID-con-xx1-REPLACED-REPLACED-REPLA"
)
)
@@ -1813,10 +1821,10 @@ class TestNmcli(TestNmClient):
)
self.call_nmcli_l(["con", "s", "con-xx1"], replace_stdout=replace_uuids)
- self.async_wait()
+ self.ctx.async_wait()
replace_uuids.append(
- self.srv.ReplaceTextConUuid(
+ self.ctx.srv.ReplaceTextConUuid(
"con-vpn-1", "UUID-con-vpn-1-REPLACED-REPLACED-REP"
)
)
@@ -1849,16 +1857,16 @@ class TestNmcli(TestNmClient):
self.call_nmcli_l(["con", "s"], replace_stdout=replace_uuids)
self.call_nmcli_l(["con", "s", "con-vpn-1"], replace_stdout=replace_uuids)
- self.async_wait()
+ self.ctx.async_wait()
- self.srv.setProperty(
+ self.ctx.srv.setProperty(
"/org/freedesktop/NetworkManager/ActiveConnection/2",
"VpnState",
dbus.UInt32(NM.VpnConnectionState.ACTIVATED),
)
uuids = Util.replace_text_sort_list(
- [c[1] for c in self.srv.findConnections()], replace_uuids
+ [c[1] for c in self.ctx.srv.findConnections()], replace_uuids
)
self.call_nmcli_l([], replace_stdout=replace_uuids)
@@ -2165,10 +2173,10 @@ class TestNmcli(TestNmClient):
nmc = start_mon(self)
- self.srv.op_AddObj("WiredDevice", iface="eth0")
+ self.ctx.srv.op_AddObj("WiredDevice", iface="eth0")
nmc.pexp.expect("eth0: device created\r\n")
- self.srv.addConnection(
+ self.ctx.srv.addConnection(
{"connection": {"type": "802-3-ethernet", "id": "con-1"}}
)
nmc.pexp.expect("con-1: connection profile created\r\n")
@@ -2176,7 +2184,7 @@ class TestNmcli(TestNmClient):
end_mon(self, nmc)
nmc = start_mon(self)
- self.srv_shutdown()
+ self.ctx.srv_shutdown()
Util.pexpect_expect_all(
nmc.pexp,
"con-1: connection profile removed",
@@ -2189,7 +2197,11 @@ class TestNmcli(TestNmClient):
###############################################################################
-class TestNmCloudSetup(TestNmClient):
+class TestNmCloudSetup(unittest.TestCase):
+ def setUp(self):
+ Util.skip_without_dbus_session()
+ Util.skip_without_NM()
+ self.ctx = NMTestContext(self._testMethodName)
_mac1 = "9e:c0:3e:92:24:2d"
_mac2 = "53:e9:7e:52:8d:a8"
@@ -2240,12 +2252,12 @@ class TestNmCloudSetup(TestNmClient):
error = None
- self.srv_start()
+ self.ctx.srv_start()
try:
func(self)
except Exception as e:
error = e
- self._nm_test_post()
+ self.ctx.run_post()
self.md_conn.close()
p.stdin.close()
@@ -2259,8 +2271,8 @@ class TestNmCloudSetup(TestNmClient):
def _mock_devices(self):
# Add a device with an active connection that has IPv4 configured
- self.srv.op_AddObj("WiredDevice", iface="eth0", mac="9e:c0:3e:92:24:2d")
- self.srv.addAndActivateConnection(
+ self.ctx.srv.op_AddObj("WiredDevice", iface="eth0", mac="9e:c0:3e:92:24:2d")
+ self.ctx.srv.addAndActivateConnection(
{
"connection": {"type": "802-3-ethernet", "id": "con-eth0"},
"ipv4": {"method": "auto"},
@@ -2270,8 +2282,8 @@ class TestNmCloudSetup(TestNmClient):
)
# The second connection has no IPv4
- self.srv.op_AddObj("WiredDevice", iface="eth1", mac="53:e9:7e:52:8d:a8")
- self.srv.addAndActivateConnection(
+ self.ctx.srv.op_AddObj("WiredDevice", iface="eth1", mac="53:e9:7e:52:8d:a8")
+ self.ctx.srv.addAndActivateConnection(
{"connection": {"type": "802-3-ethernet", "id": "con-eth1"}},
"/org/freedesktop/NetworkManager/Devices/2",
"",