# Copyright (C) 2013-2014 Canonical Ltd. # # Author: Scott Moser # Author: Blake Rouse # # This file is part of cloud-init. See LICENSE file for license information. import abc import base64 import glob import gzip import io import logging import os import shlex from cloudinit import util from cloudinit.net import get_devicelist, read_sys_net_safe _OPEN_ISCSI_INTERFACE_FILE = "/run/initramfs/open-iscsi.interface" KERNEL_CMDLINE_NETWORK_CONFIG_DISABLED = "disabled" LOG = logging.getLogger(__name__) class InitramfsNetworkConfigSource(metaclass=abc.ABCMeta): """ABC for net config sources that read config written by initramfses""" @abc.abstractmethod def is_applicable(self) -> bool: """Is this initramfs config source applicable to the current system?""" @abc.abstractmethod def render_config(self) -> dict: """Render a v1 network config from the initramfs configuration""" class KlibcNetworkConfigSource(InitramfsNetworkConfigSource): """InitramfsNetworkConfigSource for klibc initramfs (i.e. Debian/Ubuntu) Has three parameters, but they are intended to make testing simpler, _not_ for use in production code. (This is indicated by the prepended underscores.) """ def __init__(self, _files=None, _mac_addrs=None, _cmdline=None): self._files = _files self._mac_addrs = _mac_addrs self._cmdline = _cmdline # Set defaults here, as they require computation that we don't want to # do at method definition time if self._files is None: self._files = _get_klibc_net_cfg_files() if self._cmdline is None: self._cmdline = util.get_cmdline() if self._mac_addrs is None: self._mac_addrs = {} for k in get_devicelist(): mac_addr = read_sys_net_safe(k, "address") if mac_addr: self._mac_addrs[k] = mac_addr def is_applicable(self) -> bool: """ Return whether this system has klibc initramfs network config or not Will return True if: (a) klibc files exist in /run, AND (b) either: (i) ip= or ip6= are on the kernel cmdline, OR (ii) an open-iscsi interface file is present in the system """ if self._files: for item in shlex.split(self._cmdline): if item.startswith("ip=") or item.startswith("ip6="): return True if os.path.exists(_OPEN_ISCSI_INTERFACE_FILE): # iBft can configure networking without ip= return True return False def render_config(self) -> dict: return config_from_klibc_net_cfg( files=self._files, mac_addrs=self._mac_addrs, ) _INITRAMFS_CONFIG_SOURCES = [KlibcNetworkConfigSource] def _klibc_to_config_entry(content, mac_addrs=None): """Convert a klibc written shell content file to a 'config' entry When ip= is seen on the kernel command line in debian initramfs and networking is brought up, ipconfig will populate /run/net-.cfg. The files are shell style syntax, and examples are in the tests provided here. There is no good documentation on this unfortunately. DEVICE= is expected/required and PROTO should indicate if this is 'none' (static) or 'dhcp' or 'dhcp6' (LP: #1621507). note that IPV6PROTO is also written by newer code to address the possibility of both ipv4 and ipv6 getting addresses. Full syntax is documented at: https://git.kernel.org/pub/scm/libs/klibc/klibc.git/plain/usr/kinit/ipconfig/README.ipconfig """ if mac_addrs is None: mac_addrs = {} data = util.load_shell_content(content) try: name = data["DEVICE"] if "DEVICE" in data else data["DEVICE6"] except KeyError as e: raise ValueError("no 'DEVICE' or 'DEVICE6' entry in data") from e # ipconfig on precise does not write PROTO # IPv6 config gives us IPV6PROTO, not PROTO. proto = data.get("PROTO", data.get("IPV6PROTO")) if not proto: if data.get("filename"): proto = "dhcp" else: proto = "none" if proto not in ("none", "dhcp", "dhcp6"): raise ValueError("Unexpected value for PROTO: %s" % proto) iface = { "type": "physical", "name": name, "subnets": [], } if name in mac_addrs: iface["mac_address"] = mac_addrs[name] # Handle both IPv4 and IPv6 values for pre in ("IPV4", "IPV6"): # if no IPV4ADDR or IPV6ADDR, then go on. if pre + "ADDR" not in data: continue # PROTO for ipv4, IPV6PROTO for ipv6 cur_proto = data.get(pre + "PROTO", proto) # ipconfig's 'none' is called 'static' if cur_proto == "none": cur_proto = "static" subnet = {"type": cur_proto, "control": "manual"} # only populate address for static types. While the rendered config # may have an address for dhcp, that is not really expected. if cur_proto == "static": subnet["address"] = data[pre + "ADDR"] # these fields go right on the subnet for key in ("NETMASK", "BROADCAST", "GATEWAY"): if pre + key in data: subnet[key.lower()] = data[pre + key] dns = [] # handle IPV4DNS0 or IPV6DNS0 for nskey in ("DNS0", "DNS1"): ns = data.get(pre + nskey) # verify it has something other than 0.0.0.0 (or ipv6) if ns and len(ns.strip(":.0")): dns.append(data[pre + nskey]) if dns: subnet["dns_nameservers"] = dns # add search to both ipv4 and ipv6, as it has no namespace search = data.get("DOMAINSEARCH") if search: if "," in search: subnet["dns_search"] = search.split(",") else: subnet["dns_search"] = search.split() iface["subnets"].append(subnet) return name, iface def _get_klibc_net_cfg_files(): return glob.glob("/run/net-*.conf") + glob.glob("/run/net6-*.conf") def config_from_klibc_net_cfg(files=None, mac_addrs=None): if files is None: files = _get_klibc_net_cfg_files() entries = [] names = {} for cfg_file in files: name, entry = _klibc_to_config_entry( util.load_file(cfg_file), mac_addrs=mac_addrs ) if name in names: prev = names[name]["entry"] if prev.get("mac_address") != entry.get("mac_address"): raise ValueError( "device '{name}' was defined multiple times ({files})" " but had differing mac addresses: {old} -> {new}.".format( name=name, files=" ".join(names[name]["files"]), old=prev.get("mac_address"), new=entry.get("mac_address"), ) ) prev["subnets"].extend(entry["subnets"]) names[name]["files"].append(cfg_file) else: names[name] = {"files": [cfg_file], "entry": entry} entries.append(entry) return {"config": entries, "version": 1} def read_initramfs_config(): """ Return v1 network config for initramfs-configured networking (or None) This will consider each _INITRAMFS_CONFIG_SOURCES entry in turn, and return v1 network configuration for the first one that is applicable. If none are applicable, return None. """ for src_cls in _INITRAMFS_CONFIG_SOURCES: cfg_source = src_cls() if not cfg_source.is_applicable(): continue return cfg_source.render_config() return None def _decomp_gzip(blob): # decompress blob or return original blob with io.BytesIO(blob) as iobuf: gzfp = None try: gzfp = gzip.GzipFile(mode="rb", fileobj=iobuf) return gzfp.read() except IOError: return blob finally: if gzfp: gzfp.close() def _b64dgz(data): """Decode a string base64 encoding, if gzipped, uncompress as well :return: decompressed unencoded string of the data or empty string on unencoded data. """ try: blob = base64.b64decode(data) except (TypeError, ValueError): LOG.error( "Expected base64 encoded kernel commandline parameter" " network-config. Ignoring network-config=%s.", data, ) return "" return _decomp_gzip(blob) def read_kernel_cmdline_config(cmdline=None): if cmdline is None: cmdline = util.get_cmdline() if "network-config=" in cmdline: data64 = None for tok in cmdline.split(): if tok.startswith("network-config="): data64 = tok.split("=", 1)[1] if data64: if data64 == KERNEL_CMDLINE_NETWORK_CONFIG_DISABLED: return {"config": "disabled"} return util.load_yaml(_b64dgz(data64)) return None # vi: ts=4 expandtab