# Author: Eric Benner # # This file is part of cloud-init. See LICENSE file for license information. import json from functools import lru_cache from requests import exceptions from cloudinit import dmi from cloudinit import log as log from cloudinit import net, netinfo, subp, url_helper, util from cloudinit.net.dhcp import NoDHCPLeaseError from cloudinit.net.ephemeral import EphemeralDHCPv4 # Get LOG LOG = log.getLogger(__name__) @lru_cache() def get_metadata(url, timeout, retries, sec_between, agent, tmp_dir=None): # Bring up interface (and try untill one works) exception = RuntimeError("Failed to DHCP") # Seek iface with DHCP for iface in get_interface_list(): try: with EphemeralDHCPv4( iface=iface, connectivity_url_data={"url": url} ): # Check for the metadata route, skip if not there if not check_route(url): continue # Fetch the metadata v1 = read_metadata(url, timeout, retries, sec_between, agent) metadata = json.loads(v1) refactor_metadata(metadata) return metadata except ( NoDHCPLeaseError, subp.ProcessExecutionError, RuntimeError, exceptions.RequestException, ) as exc: LOG.error("DHCP Exception: %s", exc) exception = exc raise exception # Refactor metadata into acceptable format def refactor_metadata(metadata): metadata["instance-id"] = metadata["instance-v2-id"] metadata["local-hostname"] = metadata["hostname"] region = metadata["region"]["regioncode"] if "countrycode" in metadata["region"]: region = metadata["region"]["countrycode"] metadata["region"] = region.lower() # Get interface list, sort, and clean def get_interface_list(): ifaces = [] for iface in net.find_candidate_nics(): # Skip dummy if "dummy" in iface: continue ifaces.append(iface) return ifaces # Check for /32 route that our dhcp servers inject # in order to determine if this a customer-run dhcp server def check_route(url): # Get routes, confirm entry exists routes = netinfo.route_info() # If no tools exist and empty dict is returned if "ipv4" not in routes: return False # Parse each route into a more searchable format for route in routes["ipv4"]: if route.get("destination", None) in url: return True return False # Read the system information from SMBIOS def get_sysinfo(): return { "manufacturer": dmi.read_dmi_data("system-manufacturer"), "subid": dmi.read_dmi_data("system-serial-number"), } # Assumes is Vultr is already checked def is_baremetal(): if get_sysinfo()["manufacturer"] != "Vultr": return True return False # Confirm is Vultr def is_vultr(): # VC2, VDC, and HFC use DMI sysinfo = get_sysinfo() if sysinfo["manufacturer"] == "Vultr": return True # Baremetal requires a kernel parameter if "vultr" in util.get_cmdline().split(): return True return False # Read Metadata endpoint def read_metadata(url, timeout, retries, sec_between, agent): url = "%s/v1.json" % url # Announce os details so we can handle non Vultr origin # images and provide correct vendordata generation. headers = {"Metadata-Token": "cloudinit", "User-Agent": agent} response = url_helper.readurl( url, timeout=timeout, retries=retries, headers=headers, sec_between=sec_between, ) if not response.ok(): raise RuntimeError( "Failed to connect to %s: Code: %s" % url, response.code ) return response.contents.decode() # Wrapped for caching @lru_cache() def get_interface_map(): return net.get_interfaces_by_mac() # Convert macs to nics def get_interface_name(mac): macs_to_nic = get_interface_map() if mac not in macs_to_nic: return None return macs_to_nic.get(mac) # Generate network configs def generate_network_config(interfaces): network = { "version": 1, "config": [ { "type": "nameserver", "address": ["108.61.10.10", "2001:19f0:300:1704::6"], } ], } # Prepare interface 0, public if len(interfaces) > 0: public = generate_interface(interfaces[0], primary=True) network["config"].append(public) # Prepare additional interfaces, private for i in range(1, len(interfaces)): interface = interfaces[i] # Skip interfaces set not to be configured if interface.get("unconfigured"): continue private = generate_interface(interface) network["config"].append(private) return network def generate_interface(interface, primary=False): interface_name = get_interface_name(interface["mac"]) if not interface_name: raise RuntimeError( "Interface: %s could not be found on the system" % interface["mac"] ) netcfg = { "name": interface_name, "type": "physical", "mac_address": interface["mac"], } if primary: netcfg["accept-ra"] = 1 netcfg["subnets"] = [ {"type": "dhcp", "control": "auto"}, {"type": "ipv6_slaac", "control": "auto"}, ] if not primary: netcfg["subnets"] = [ { "type": "static", "control": "auto", "address": interface["ipv4"]["address"], "netmask": interface["ipv4"]["netmask"], } ] generate_interface_routes(interface, netcfg) generate_interface_additional_addresses(interface, netcfg) # Add config to template return netcfg def generate_interface_routes(interface, netcfg): # Options that may or may not be used if "mtu" in interface: netcfg["mtu"] = interface["mtu"] if "accept-ra" in interface: netcfg["accept-ra"] = interface["accept-ra"] if "routes" in interface: netcfg["subnets"][0]["routes"] = interface["routes"] def generate_interface_additional_addresses(interface, netcfg): # Check for additional IP's additional_count = len(interface["ipv4"]["additional"]) if "ipv4" in interface and additional_count > 0: for additional in interface["ipv4"]["additional"]: add = { "type": "static", "control": "auto", "address": additional["address"], "netmask": additional["netmask"], } if "routes" in additional: add["routes"] = additional["routes"] netcfg["subnets"].append(add) # Check for additional IPv6's additional_count = len(interface["ipv6"]["additional"]) if "ipv6" in interface and additional_count > 0: for additional in interface["ipv6"]["additional"]: add = { "type": "static6", "control": "auto", "address": "%s/%s" % (additional["network"], additional["prefix"]), } if "routes" in additional: add["routes"] = additional["routes"] netcfg["subnets"].append(add) # Make required adjustments to the network configs provided def add_interface_names(netcfg): for interface in netcfg["config"]: if interface["type"] != "physical": continue interface_name = get_interface_name(interface["mac_address"]) if not interface_name: raise RuntimeError( "Interface: %s could not be found on the system" % interface["mac_address"] ) interface["name"] = interface_name # vi: ts=4 expandtab