summaryrefslogtreecommitdiff
path: root/nova/pci/utils.py
blob: 778cdb227c418980a91b6bd870cfdba72b69fcaa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# Copyright (c) 2013 Intel, Inc.
# Copyright (c) 2012 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import glob
import os
import re
import typing as ty

from oslo_log import log as logging

from nova import exception

if ty.TYPE_CHECKING:
    # avoid circular import
    from nova.pci import stats

LOG = logging.getLogger(__name__)

PCI_VENDOR_PATTERN = "^(hex{4})$".replace("hex", r"[\da-fA-F]")
_PCI_ADDRESS_PATTERN = ("^(hex{4}):(hex{2}):(hex{2}).(oct{1})$".
                                             replace("hex", r"[\da-fA-F]").
                                             replace("oct", "[0-7]"))
_PCI_ADDRESS_REGEX = re.compile(_PCI_ADDRESS_PATTERN)
_SRIOV_TOTALVFS = "sriov_totalvfs"


def pci_device_prop_match(
    pci_dev: 'stats.Pool', specs: ty.List[ty.Dict[str, str]],
) -> bool:
    """Check if the pci_dev meet spec requirement

    Specs is a list of PCI device property requirements.
    An example of device requirement that the PCI should be either:
    a) Device with vendor_id as 0x8086 and product_id as 0x8259, or
    b) Device with vendor_id as 0x10de and product_id as 0x10d8:

    [{"vendor_id":"8086", "product_id":"8259"},
     {"vendor_id":"10de", "product_id":"10d8",
      "capabilities_network": ["rx", "tx", "tso", "gso"]}]

    """

    def _matching_devices(spec: ty.Dict[str, str]) -> bool:
        for k, v in spec.items():
            pci_dev_v = pci_dev.get(k)
            if isinstance(v, list) and isinstance(pci_dev_v, list):
                if not all(x in pci_dev.get(k) for x in v):
                    return False
            else:
                # We don't need to check case for tags in order to avoid any
                # mismatch with the tags provided by users for port
                # binding profile and the ones configured by operators
                # with pci whitelist option.
                if isinstance(v, str):
                    v = v.lower()
                if isinstance(pci_dev_v, str):
                    pci_dev_v = pci_dev_v.lower()
                if pci_dev_v != v:
                    return False
        return True

    return any(_matching_devices(spec) for spec in specs)


def parse_address(address: str) -> ty.Sequence[str]:
    """Parse a PCI address.

    Returns (domain, bus, slot, function) from PCI address that is stored in
    PciDevice DB table.
    """
    m = _PCI_ADDRESS_REGEX.match(address)
    if not m:
        raise exception.PciDeviceWrongAddressFormat(address=address)
    return m.groups()


def get_pci_address_fields(pci_addr: str) -> ty.Tuple[str, str, str, str]:
    """Parse a fully-specified PCI device address.

    Does not validate that the components are valid hex or wildcard values.

    :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>".
    :return: A 4-tuple of strings ("<domain>", "<bus>", "<slot>", "<function>")
    """
    dbs, sep, func = pci_addr.partition('.')
    domain, bus, slot = dbs.split(':')
    return domain, bus, slot, func


def get_pci_address(domain: str, bus: str, slot: str, func: str) -> str:
    """Assembles PCI address components into a fully-specified PCI address.

    Does not validate that the components are valid hex or wildcard values.

    :param domain, bus, slot, func: Hex or wildcard strings.
    :return: A string of the form "<domain>:<bus>:<slot>.<function>".
    """
    return '%s:%s:%s.%s' % (domain, bus, slot, func)


def get_function_by_ifname(ifname: str) -> ty.Tuple[ty.Optional[str], bool]:
    """Given the device name, returns the PCI address of a device
    and returns True if the address is in a physical function.
    """
    dev_path = "/sys/class/net/%s/device" % ifname
    sriov_totalvfs = 0
    if os.path.isdir(dev_path):
        try:
            # sriov_totalvfs contains the maximum possible VFs for this PF
            with open(os.path.join(dev_path, _SRIOV_TOTALVFS)) as fd:
                sriov_totalvfs = int(fd.read())
                return (os.readlink(dev_path).strip("./"),
                        sriov_totalvfs > 0)
        except (IOError, ValueError):
            return os.readlink(dev_path).strip("./"), False
    return None, False


def is_physical_function(
    domain: str, bus: str, slot: str, function: str,
) -> bool:
    dev_path = "/sys/bus/pci/devices/%(d)s:%(b)s:%(s)s.%(f)s/" % {
        "d": domain, "b": bus, "s": slot, "f": function}
    if os.path.isdir(dev_path):
        try:
            with open(dev_path + _SRIOV_TOTALVFS) as fd:
                sriov_totalvfs = int(fd.read())
                return sriov_totalvfs > 0
        except (IOError, ValueError):
            pass
    return False


def _get_sysfs_netdev_path(pci_addr: str, pf_interface: bool) -> str:
    """Get the sysfs path based on the PCI address of the device.

    Assumes a networking device - will not check for the existence of the path.
    """
    if pf_interface:
        return "/sys/bus/pci/devices/%s/physfn/net" % pci_addr
    return "/sys/bus/pci/devices/%s/net" % pci_addr


def get_ifname_by_pci_address(
    pci_addr: str, pf_interface: bool = False,
) -> str:
    """Get the interface name based on a VF's pci address.

    The returned interface name is either the parent PF's or that of the VF
    itself based on the argument of pf_interface.
    """
    dev_path = _get_sysfs_netdev_path(pci_addr, pf_interface)
    try:
        dev_info = os.listdir(dev_path)
        return dev_info.pop()
    except Exception:
        raise exception.PciDeviceNotFoundById(id=pci_addr)


def get_mac_by_pci_address(pci_addr: str, pf_interface: bool = False) -> str:
    """Get the MAC address of the nic based on its PCI address.

    Raises PciDeviceNotFoundById in case the pci device is not a NIC
    """
    dev_path = _get_sysfs_netdev_path(pci_addr, pf_interface)
    if_name = get_ifname_by_pci_address(pci_addr, pf_interface)
    addr_file = os.path.join(dev_path, if_name, 'address')

    try:
        with open(addr_file) as f:
            mac = next(f).strip()
            return mac
    except (IOError, StopIteration) as e:
        LOG.warning("Could not find the expected sysfs file for "
                    "determining the MAC address of the PCI device "
                    "%(addr)s. May not be a NIC. Error: %(e)s",
                    {'addr': pci_addr, 'e': e})
        raise exception.PciDeviceNotFoundById(id=pci_addr)


def get_vf_num_by_pci_address(pci_addr: str) -> str:
    """Get the VF number based on a VF's pci address

    A VF is associated with an VF number, which ip link command uses to
    configure it. This number can be obtained from the PCI device filesystem.
    """
    VIRTFN_RE = re.compile(r"virtfn(\d+)")
    virtfns_path = "/sys/bus/pci/devices/%s/physfn/virtfn*" % (pci_addr)
    vf_num = None

    for vf_path in glob.iglob(virtfns_path):
        if re.search(pci_addr, os.readlink(vf_path)):
            t = VIRTFN_RE.search(vf_path)
            if t:
                vf_num = t.group(1)
                break
    else:
        raise exception.PciDeviceNotFoundById(id=pci_addr)

    return vf_num