summaryrefslogtreecommitdiff
path: root/tests/unittests/sources/azure
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources/azure')
-rw-r--r--tests/unittests/sources/azure/test_errors.py7
-rw-r--r--tests/unittests/sources/azure/test_kvp.py88
2 files changed, 92 insertions, 3 deletions
diff --git a/tests/unittests/sources/azure/test_errors.py b/tests/unittests/sources/azure/test_errors.py
index 9211d472..d2213613 100644
--- a/tests/unittests/sources/azure/test_errors.py
+++ b/tests/unittests/sources/azure/test_errors.py
@@ -105,7 +105,8 @@ def test_reportable_errors(
)
data = [
- "PROVISIONING_ERROR: " + quote_csv_value(f"reason={reason}"),
+ "result=error",
+ quote_csv_value(f"reason={reason}"),
f"agent=Cloud-Init/{version.version_string()}",
]
data += [quote_csv_value(f"{k}={v}") for k, v in supporting_data.items()]
@@ -115,7 +116,7 @@ def test_reportable_errors(
"documentation_url=https://aka.ms/linuxprovisioningerror",
]
- assert error.as_description() == "|".join(data)
+ assert error.as_encoded_report() == "|".join(data)
def test_unhandled_exception():
@@ -136,4 +137,4 @@ def test_unhandled_exception():
assert trace.endswith("ValueError: my value error\n")
quoted_value = quote_csv_value(f"exception={source_error!r}")
- assert f"|{quoted_value}|" in error.as_description()
+ assert f"|{quoted_value}|" in error.as_encoded_report()
diff --git a/tests/unittests/sources/azure/test_kvp.py b/tests/unittests/sources/azure/test_kvp.py
new file mode 100644
index 00000000..f0f4a999
--- /dev/null
+++ b/tests/unittests/sources/azure/test_kvp.py
@@ -0,0 +1,88 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from datetime import datetime
+from unittest import mock
+
+import pytest
+
+from cloudinit import version
+from cloudinit.sources.azure import errors, kvp
+
+
+@pytest.fixture()
+def fake_utcnow():
+ timestamp = datetime.utcnow()
+ with mock.patch.object(kvp, "datetime", autospec=True) as m:
+ m.utcnow.return_value = timestamp
+ yield timestamp
+
+
+@pytest.fixture()
+def fake_vm_id():
+ vm_id = "fake-vm-id"
+ with mock.patch.object(kvp.identity, "query_vm_id", autospec=True) as m:
+ m.return_value = vm_id
+ yield vm_id
+
+
+@pytest.fixture
+def telemetry_reporter(tmp_path):
+ kvp_file_path = tmp_path / "kvp_pool_file"
+ kvp_file_path.write_bytes(b"")
+ reporter = kvp.handlers.HyperVKvpReportingHandler(
+ kvp_file_path=str(kvp_file_path)
+ )
+
+ kvp.instantiated_handler_registry.register_item("telemetry", reporter)
+ yield reporter
+ kvp.instantiated_handler_registry.unregister_item("telemetry")
+
+
+class TestReportFailureToHost:
+ def test_report_failure_to_host(self, caplog, telemetry_reporter):
+ error = errors.ReportableError(reason="test")
+ assert kvp.report_failure_to_host(error) is True
+ assert (
+ "KVP handler not enabled, skipping host report." not in caplog.text
+ )
+
+ report = {
+ "key": "PROVISIONING_REPORT",
+ "value": error.as_encoded_report(),
+ }
+ assert report in list(telemetry_reporter._iterate_kvps(0))
+
+ def test_report_skipped_without_telemetry(self, caplog):
+ error = errors.ReportableError(reason="test")
+
+ assert kvp.report_failure_to_host(error) is False
+ assert "KVP handler not enabled, skipping host report." in caplog.text
+
+
+class TestReportSuccessToHost:
+ def test_report_success_to_host(
+ self, caplog, fake_utcnow, fake_vm_id, telemetry_reporter
+ ):
+ assert kvp.report_success_to_host() is True
+ assert (
+ "KVP handler not enabled, skipping host report." not in caplog.text
+ )
+
+ report_value = errors.encode_report(
+ [
+ "result=success",
+ f"agent=Cloud-Init/{version.version_string()}",
+ f"timestamp={fake_utcnow.isoformat()}",
+ f"vm_id={fake_vm_id}",
+ ]
+ )
+
+ report = {
+ "key": "PROVISIONING_REPORT",
+ "value": report_value,
+ }
+ assert report in list(telemetry_reporter._iterate_kvps(0))
+
+ def test_report_skipped_without_telemetry(self, caplog):
+ assert kvp.report_success_to_host() is False
+ assert "KVP handler not enabled, skipping host report." in caplog.text