summaryrefslogtreecommitdiff
path: root/ironic_python_agent/efi_utils.py
blob: 48e643d3f9951d71773609e12c80c8f3f0882262 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import os
import re
import sys
import tempfile

from ironic_lib import disk_utils
from oslo_concurrency import processutils
from oslo_log import log

from ironic_python_agent import errors
from ironic_python_agent import hardware
from ironic_python_agent import partition_utils
from ironic_python_agent import raid_utils
from ironic_python_agent import utils


LOG = log.getLogger(__name__)


def get_partition_path_by_number(device, part_num):
    """Get partition path (/dev/something) by a partition number on device.

    Only works for GPT partition table.
    """
    uuid = None
    partinfo, _ = utils.execute('sgdisk', '-i', str(part_num), device,
                                use_standard_locale=True)
    for line in partinfo.splitlines():
        if not line.strip():
            continue

        try:
            field, value = line.rsplit(':', 1)
        except ValueError:
            LOG.warning('Invalid sgdisk line: %s', line)
            continue

        if 'partition unique guid' in field.lower():
            uuid = value.strip().lower()
            LOG.debug('GPT partition number %s on device %s has UUID %s',
                      part_num, device, uuid)
            break

    if uuid is not None:
        return partition_utils.get_partition(device, uuid)
    else:
        LOG.warning('No UUID information provided in sgdisk output for '
                    'partition %s on device %s', part_num, device)


def manage_uefi(device, efi_system_part_uuid=None):
    """Manage the device looking for valid efi bootloaders to update the nvram.

    This method checks for valid efi bootloaders in the device, if they exists
    it updates the nvram using the efibootmgr.

    :param device: the device to be checked.
    :param efi_system_part_uuid: efi partition uuid.
    :raises: DeviceNotFound if the efi partition cannot be found.
    :return: True - if it founds any efi bootloader and the nvram was updated
             using the efibootmgr.
             False - if no efi bootloader is found.
    """
    efi_partition_mount_point = None
    efi_mounted = False
    LOG.debug('Attempting UEFI loader autodetection and NVRAM record setup.')

    # Force UEFI to rescan the device.
    utils.rescan_device(device)

    # Trust the contents on the disk in the event of a whole disk image.
    efi_partition = disk_utils.find_efi_partition(device)
    if efi_partition:
        efi_part_num = efi_partition['number']
        efi_partition = get_partition_path_by_number(device, efi_part_num)

    if not efi_partition and efi_system_part_uuid:
        # get_partition returns <device>+<partition> and we only need the
        # partition number
        efi_partition = partition_utils.get_partition(
            device, uuid=efi_system_part_uuid)
        # FIXME(dtantsur): this procedure will not work for devicemapper
        # devices. To fix that we need a way to convert a UUID to a partition
        # number, which is surprisingly non-trivial and may involve looping
        # over existing numbers and calling `sgdisk -i` for each of them.
        # But I'm not sure we even need this logic: find_efi_partition should
        # be sufficient for both whole disk and partition images.
        try:
            efi_part_num = int(efi_partition.replace(device, ""))
        except ValueError:
            # NVMe Devices get a partitioning scheme that is different from
            # traditional block devices like SCSI/SATA
            try:
                efi_part_num = int(efi_partition.replace(device + 'p', ""))
            except ValueError as exc:
                # At least provide a reasonable error message if the device
                # does not follow this procedure.
                raise errors.DeviceNotFound(
                    "Cannot detect the partition number of the device %s: %s" %
                    (efi_partition, exc))

    if not efi_partition:
        # NOTE(dtantsur): we cannot have a valid EFI deployment without an
        # EFI partition at all. This code path is easily hit when using an
        # image that is not UEFI compatible (which sadly applies to most
        # cloud images out there, with a nice exception of Ubuntu).
        raise errors.DeviceNotFound(
            "No EFI partition could be detected on device %s and "
            "EFI partition UUID has not been recorded during deployment "
            "(which is often the case for whole disk images). "
            "Are you using a UEFI-compatible image?" % device)

    local_path = tempfile.mkdtemp()
    efi_partition_mount_point = os.path.join(local_path, "boot/efi")
    if not os.path.exists(efi_partition_mount_point):
        os.makedirs(efi_partition_mount_point)

    try:
        utils.execute('mount', efi_partition, efi_partition_mount_point)
        efi_mounted = True

        valid_efi_bootloaders = _get_efi_bootloaders(efi_partition_mount_point)
        if not valid_efi_bootloaders:
            # NOTE(dtantsur): if we have an empty EFI partition, try to use
            # grub-install to populate it.
            LOG.warning('Empty EFI partition detected.')
            return False

        if not hardware.is_md_device(device):
            efi_devices = [device]
            efi_partition_numbers = [efi_part_num]
            efi_label_suffix = ''
        else:
            # umount to allow for signature removal (to avoid confusion about
            # which ESP to mount once the instance is deployed)
            utils.execute('umount', efi_partition_mount_point, attempts=3,
                          delay_on_retry=True)
            efi_mounted = False

            holders = hardware.get_holder_disks(device)
            efi_md_device = raid_utils.prepare_boot_partitions_for_softraid(
                device, holders, efi_partition, target_boot_mode='uefi'
            )
            efi_devices = hardware.get_component_devices(efi_md_device)
            efi_partition_numbers = []
            _PARTITION_NUMBER = re.compile(r'(\d+)$')
            for dev in efi_devices:
                match = _PARTITION_NUMBER.search(dev)
                if match:
                    partition_number = match.group(1)
                    efi_partition_numbers.append(partition_number)
                else:
                    raise errors.DeviceNotFound(
                        "Could not extract the partition number "
                        "from %s!" % dev)
            efi_label_suffix = "(RAID, part%s)"

            # remount for _run_efibootmgr
            utils.execute('mount', efi_partition, efi_partition_mount_point)
            efi_mounted = True

        efi_dev_part = zip(efi_devices, efi_partition_numbers)
        for i, (efi_dev, efi_part) in enumerate(efi_dev_part):
            LOG.debug("Calling efibootmgr with dev %s partition number %s",
                      efi_dev, efi_part)
            if efi_label_suffix:
                # NOTE (arne_wiebalck): uniqify the labels to prevent
                # unintentional boot entry cleanup
                _run_efibootmgr(valid_efi_bootloaders, efi_dev, efi_part,
                                efi_partition_mount_point,
                                efi_label_suffix % i)
            else:
                _run_efibootmgr(valid_efi_bootloaders, efi_dev, efi_part,
                                efi_partition_mount_point)
        return True

    except processutils.ProcessExecutionError as e:
        error_msg = ('Could not configure UEFI boot on device %(dev)s: %(err)s'
                     % {'dev': device, 'err': e})
        LOG.exception(error_msg)
        raise errors.CommandExecutionError(error_msg)
    finally:
        if efi_mounted:
            try:
                utils.execute('umount', efi_partition_mount_point,
                              attempts=3, delay_on_retry=True)
            except processutils.ProcessExecutionError as e:
                error_msg = ('Umounting efi system partition failed. '
                             'Attempted 3 times. Error: %s' % e)
                LOG.error(error_msg)
                # Do not mask the actual failure, if any
                if sys.exc_info()[0] is None:
                    raise errors.CommandExecutionError(error_msg)

            else:
                try:
                    utils.execute('sync')
                except processutils.ProcessExecutionError as e:
                    LOG.warning('Unable to sync the local disks: %s', e)


# NOTE(TheJulia): Do not add bootia32.csv to this list. That is 32bit
# EFI booting and never really became popular.
BOOTLOADERS_EFI = [
    'bootx64.csv',  # Used by GRUB2 shim loader (Ubuntu, Red Hat)
    'boot.csv',  # Used by rEFInd, Centos7 Grub2
    'bootia32.efi',
    'bootx64.efi',  # x86_64 Default
    'bootia64.efi',
    'bootarm.efi',
    'bootaa64.efi',  # Arm64 Default
    'bootriscv32.efi',
    'bootriscv64.efi',
    'bootriscv128.efi',
    'grubaa64.efi',
    'winload.efi'
]


def _get_efi_bootloaders(location):
    """Get all valid efi bootloaders in a given location

    :param location: the location where it should start looking for the
                     efi files.
    :return: a list of relative paths to valid efi bootloaders or reference
             files.
    """
    # Let's find all files with .efi or .EFI extension
    LOG.debug('Looking for all efi files on %s', location)
    valid_bootloaders = []
    for root, dirs, files in os.walk(location):
        efi_files = [f for f in files if f.lower() in BOOTLOADERS_EFI]
        LOG.debug('efi files found in %(location)s : %(efi_files)s',
                  {'location': location, 'efi_files': str(efi_files)})
        for name in efi_files:
            efi_f = os.path.join(root, name)
            LOG.debug('Checking if %s is executable', efi_f)
            if os.access(efi_f, os.X_OK):
                v_bl = efi_f.split(location)[-1][1:]
                LOG.debug('%s is a valid bootloader', v_bl)
                valid_bootloaders.append(v_bl)
            if 'csv' in efi_f.lower():
                v_bl = efi_f.split(location)[-1][1:]
                LOG.debug('%s is a pointer to a bootloader', v_bl)
                # The CSV files are intended to be authortative as
                # to the bootloader and the label to be used. Since
                # we found one, we're going to point directly to it.
                # centos7 did ship with 2, but with the same contents.
                # TODO(TheJulia): Perhaps we extend this to make a list
                # of CSVs instead and only return those?! But then the
                # question is which is right/first/preferred.
                return [v_bl]
    return valid_bootloaders


# NOTE(TheJulia): regex used to identify entries in the efibootmgr
# output on stdout.
_ENTRY_LABEL = re.compile(r'Boot([0-9a-f-A-F]+)\*?\s(.*).*$')


def get_boot_records():
    """Executes efibootmgr and returns boot records.

    :return: an iterator yielding pairs (boot number, boot record).
    """
    efi_output = utils.execute('efibootmgr', '-v')
    for line in efi_output[0].split('\n'):
        match = _ENTRY_LABEL.match(line)
        if match is not None:
            yield (match[1], match[2])


def add_boot_record(device, efi_partition, loader, label):
    """Add an EFI boot record with efibootmgr.

    :param device: the device to be used
    :param efi_partition: the number of the EFI partition on the device
    :param loader: path to the EFI boot loader
    :param label: the record label
    """
    # https://linux.die.net/man/8/efibootmgr
    utils.execute('efibootmgr', '-v', '-c', '-d', device,
                  '-p', str(efi_partition), '-w', '-L', label,
                  '-l', loader)


def remove_boot_record(boot_num):
    """Remove an EFI boot record with efibootmgr.

    :param boot_num: the number of the boot record
    """
    utils.execute('efibootmgr', '-b', boot_num, '-B')


def _run_efibootmgr(valid_efi_bootloaders, device, efi_partition,
                    mount_point, label_suffix=None):
    """Executes efibootmgr and removes duplicate entries.

    :param valid_efi_bootloaders: the list of valid efi bootloaders
    :param device: the device to be used
    :param efi_partition: the efi partition on the device
    :param mount_point: The mountpoint for the EFI partition so we can
                        read contents of files if necessary to perform
                        proper bootloader injection operations.
    :param label_suffix: a string to be appended to the EFI label,
                         mainly used in the case of software to uniqify
                         the entries for the md components.
    """

    # Before updating let's get information about the bootorder
    LOG.debug("Getting information about boot order.")
    boot_records = list(get_boot_records())
    label_id = 1
    for v_bl in valid_efi_bootloaders:
        if 'csv' in v_bl.lower():
            LOG.debug('A CSV file has been identified as a bootloader hint. '
                      'File: %s', v_bl)
            # These files are always UTF-16 encoded, sometimes have a header.
            # Positive bonus is python silently drops the FEFF header.
            try:
                with open(mount_point + '/' + v_bl, 'r',
                          encoding='utf-16') as csv:
                    contents = str(csv.read())
            except UnicodeError:
                with open(mount_point + '/' + v_bl, 'r',
                          encoding='utf-16-le') as csv:
                    contents = str(csv.read())
            csv_contents = contents.split(',', maxsplit=3)
            csv_filename = v_bl.split('/')[-1]
            v_efi_bl_path = v_bl.replace(csv_filename, str(csv_contents[0]))
            v_efi_bl_path = '\\' + v_efi_bl_path.replace('/', '\\')
            label = csv_contents[1]
            if label_suffix:
                label = label + " " + str(label_suffix)
        else:
            v_efi_bl_path = '\\' + v_bl.replace('/', '\\')
            label = 'ironic' + str(label_id)
            if label_suffix:
                label = label + " " + str(label_suffix)

        # Iterate through standard out, and look for duplicates
        for boot_num, boot_rec in boot_records:
            # Look for the base label in the string if a line match
            # occurs, so we can identify if we need to eliminate the
            # entry.
            if label in boot_rec:
                LOG.debug("Found bootnum %s matching label", boot_num)
                remove_boot_record(boot_num)

        LOG.info("Adding loader %(path)s on partition %(part)s of device "
                 " %(dev)s with label %(label)s",
                 {'path': v_efi_bl_path, 'part': efi_partition,
                  'dev': device, 'label': label})

        # Update the nvram using efibootmgr
        add_boot_record(device, efi_partition, v_efi_bl_path, label)
        # Increment the ID in case the loop runs again.
        label_id += 1