summaryrefslogtreecommitdiff
path: root/cloudinit/net/network_state.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/net/network_state.py')
-rw-r--r--cloudinit/net/network_state.py77
1 files changed, 59 insertions, 18 deletions
diff --git a/cloudinit/net/network_state.py b/cloudinit/net/network_state.py
index 80f2b108..e4f7a7fd 100644
--- a/cloudinit/net/network_state.py
+++ b/cloudinit/net/network_state.py
@@ -7,7 +7,7 @@
import copy
import functools
import logging
-from typing import Any, Dict
+from typing import TYPE_CHECKING, Any, Dict, Optional
from cloudinit import safeyaml, util
from cloudinit.net import (
@@ -22,6 +22,9 @@ from cloudinit.net import (
net_prefix_to_ipv4_mask,
)
+if TYPE_CHECKING:
+ from cloudinit.net.renderer import Renderer
+
LOG = logging.getLogger(__name__)
NETWORK_STATE_VERSION = 1
@@ -136,14 +139,16 @@ class CommandHandlerMeta(type):
class NetworkState(object):
- def __init__(self, network_state, version=NETWORK_STATE_VERSION):
+ def __init__(
+ self, network_state: dict, version: int = NETWORK_STATE_VERSION
+ ):
self._network_state = copy.deepcopy(network_state)
self._version = version
self.use_ipv6 = network_state.get("use_ipv6", False)
self._has_default_route = None
@property
- def config(self):
+ def config(self) -> dict:
return self._network_state["config"]
@property
@@ -204,6 +209,20 @@ class NetworkState(object):
route.get("prefix") == 0 and route.get("network") in default_nets
)
+ @classmethod
+ def to_passthrough(cls, network_state: dict) -> "NetworkState":
+ """Instantiates a `NetworkState` without interpreting its data.
+
+ That means only `config` and `version` are copied.
+
+ :param network_state: Network state data.
+ :return: Instance of `NetworkState`.
+ """
+ kwargs = {}
+ if "version" in network_state:
+ kwargs["version"] = network_state["version"]
+ return cls({"config": network_state}, **kwargs)
+
class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
@@ -218,16 +237,27 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
"config": None,
}
- def __init__(self, version=NETWORK_STATE_VERSION, config=None):
+ def __init__(
+ self,
+ version=NETWORK_STATE_VERSION,
+ config=None,
+ renderer=None, # type: Optional[Renderer]
+ ):
self._version = version
self._config = config
self._network_state = copy.deepcopy(self.initial_network_state)
self._network_state["config"] = config
self._parsed = False
- self._interface_dns_map = {}
+ self._interface_dns_map: dict = {}
+ self._renderer = renderer
@property
- def network_state(self):
+ def network_state(self) -> NetworkState:
+ from cloudinit.net.netplan import Renderer as NetplanRenderer
+
+ if self._version == 2 and isinstance(self._renderer, NetplanRenderer):
+ LOG.debug("Passthrough netplan v2 config")
+ return NetworkState.to_passthrough(self._config)
return NetworkState(self._network_state, version=self._version)
@property
@@ -268,10 +298,6 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
def as_dict(self):
return {"version": self._version, "config": self._config}
- def get_network_state(self):
- ns = self.network_state
- return ns
-
def parse_config(self, skip_broken=True):
if self._version == 1:
self.parse_config_v1(skip_broken=skip_broken)
@@ -316,6 +342,12 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
}
def parse_config_v2(self, skip_broken=True):
+ from cloudinit.net.netplan import Renderer as NetplanRenderer
+
+ if isinstance(self._renderer, NetplanRenderer):
+ # Nothing to parse as we are going to perform a Netplan passthrough
+ return
+
for command_type, command in self._config.items():
if command_type in ["version", "renderer"]:
continue
@@ -764,7 +796,7 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
" netplan rendering support."
)
- def _v2_common(self, cfg):
+ def _v2_common(self, cfg) -> None:
LOG.debug("v2_common: handling config:\n%s", cfg)
for iface, dev_cfg in cfg.items():
if "set-name" in dev_cfg:
@@ -781,10 +813,13 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
name_cmd.update({"address": dns})
self.handle_nameserver(name_cmd)
- mac_address = dev_cfg.get("match", {}).get("macaddress")
- real_if_name = find_interface_name_from_mac(mac_address)
- if real_if_name:
- iface = real_if_name
+ mac_address: Optional[str] = dev_cfg.get("match", {}).get(
+ "macaddress"
+ )
+ if mac_address:
+ real_if_name = find_interface_name_from_mac(mac_address)
+ if real_if_name:
+ iface = real_if_name
self._handle_individual_nameserver(name_cmd, iface)
@@ -1044,7 +1079,11 @@ def _normalize_subnets(subnets):
return [_normalize_subnet(s) for s in subnets]
-def parse_net_config_data(net_config, skip_broken=True) -> NetworkState:
+def parse_net_config_data(
+ net_config: dict,
+ skip_broken: bool = True,
+ renderer=None, # type: Optional[Renderer]
+) -> NetworkState:
"""Parses the config, returns NetworkState object
:param net_config: curtin network config dict
@@ -1058,9 +1097,11 @@ def parse_net_config_data(net_config, skip_broken=True) -> NetworkState:
config = net_config
if version and config is not None:
- nsi = NetworkStateInterpreter(version=version, config=config)
+ nsi = NetworkStateInterpreter(
+ version=version, config=config, renderer=renderer
+ )
nsi.parse_config(skip_broken=skip_broken)
- state = nsi.get_network_state()
+ state = nsi.network_state
if not state:
raise RuntimeError(