summaryrefslogtreecommitdiff
path: root/psutil/_virt.py
blob: 87e5cc53100e62ec61c82aedb174db1cc8420862 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""
Routines & strategies to guess whether we're running inside a virtual
machine.

LINUX
=====

This implementation is a port of `systemd-detect-virt` C CLI tool
and tries to mimick it as much as possible:
https://github.com/systemd/systemd/blob/main/src/basic/virt.c
I attempted to do a literal C -> Python translation whenever possible,
including respecting the original order in which the various 'guess'
functions are being called.
Differences with `systemd-detect-virt`:
* systemd-detect-virt returns 'oracle', we return 'virtualbox'

In addition, it uses a couple of strategies of `virt-what` bash script:
* detects "ibm-systemz"

WINDOWS
=======

We simply interpret the string returned by __cpuid() syscall, if any.
Useful references:
* https://artemonsecurity.com/vmde.pdf
"""

import os
import re

from ._common import LINUX
from ._common import OPENBSD
from ._common import WINDOWS
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import cat
from ._common import debug
from ._common import get_procfs_path
from ._common import open_binary
from ._common import open_text
from ._compat import FileNotFoundError
from ._compat import PermissionError


# virtualization strings

VIRTUALIZATION_ACRN = "acrn"
VIRTUALIZATION_AMAZON = "amazon"
VIRTUALIZATION_BHYVE = "bhyve"
VIRTUALIZATION_BOCHS = "bochs"
VIRTUALIZATION_DOCKER = "docker"
VIRTUALIZATION_IBM_SYSTEMZ = "ibm-systemz"  # detected by virt-what
VIRTUALIZATION_KVM = "kvm"
VIRTUALIZATION_LXC = "lxc"
VIRTUALIZATION_LXC_LIBVIRT = "lxc-libvirt"
VIRTUALIZATION_MICROSOFT = "microsoft"
VIRTUALIZATION_OPENVZ = "openvz"
VIRTUALIZATION_PARALLELS = "parallels"
VIRTUALIZATION_PODMAN = "podman"
VIRTUALIZATION_POWERVM = "powervm"
VIRTUALIZATION_PROOT = "proot"
VIRTUALIZATION_QEMU = "qemu"
VIRTUALIZATION_QNX = "qnx"
VIRTUALIZATION_RKT = "rkt"
VIRTUALIZATION_SYSTEMD_NSPAWN = "systemd-nspawn"
VIRTUALIZATION_UML = "uml"
VIRTUALIZATION_VIRTUALBOX = "virtualbox"
VIRTUALIZATION_VMWARE = "vmware"
VIRTUALIZATION_WSL = "wsl"
VIRTUALIZATION_XEN = "xen"
VIRTUALIZATION_ZVM = "zvm"

VIRTUALIZATION_CONTAINER_OTHER = "container-other"
VIRTUALIZATION_VM_OTHER = "vm-other"

# https://evasions.checkpoint.com/techniques/cpu.html
# https://github.com/a0rtega/pafish/blob/master/pafish/cpu.c
CPUID_VENDORS_TABLE = {
    "ACRNACRNACRN": VIRTUALIZATION_ACRN,
    "bhyve bhyve ": VIRTUALIZATION_BHYVE,
    "KVMKVMKVM": VIRTUALIZATION_KVM,
    "Microsoft Hv": VIRTUALIZATION_MICROSOFT,
    "prl hyperv": VIRTUALIZATION_PARALLELS,  # on Windows only
    "QNXQVMBSQG": VIRTUALIZATION_QNX,
    "TCGTCGTCGTCG": VIRTUALIZATION_QEMU,
    "VBoxVBoxVBox": VIRTUALIZATION_VIRTUALBOX,  # on Windows only
    "VMwareVMware": VIRTUALIZATION_VMWARE,
    "XenVMMXenVMM": VIRTUALIZATION_XEN,
}

DMI_VENDORS_TABLE = {
    "KVM": VIRTUALIZATION_KVM,
    "Amazon EC2": VIRTUALIZATION_AMAZON,
    "QEMU": VIRTUALIZATION_QEMU,
    # https://kb.vmware.com/s/article/1009458
    "VMware": VIRTUALIZATION_VMWARE,
    "VMW": VIRTUALIZATION_VMWARE,
    "innotek GmbH": VIRTUALIZATION_VIRTUALBOX,
    "Oracle Corporation": VIRTUALIZATION_VIRTUALBOX,
    "Xen": VIRTUALIZATION_XEN,
    "Bochs": VIRTUALIZATION_BOCHS,
    "Parallels": VIRTUALIZATION_PARALLELS,
    # https://wiki.freebsd.org/bhyve
    "BHYVE": VIRTUALIZATION_BHYVE,
}

# =====================================================================
# --- Linux
# =====================================================================

if LINUX:
    from . import _psutil_linux as cext
    from ._pslinux import Process

    class _VirtualizationBase:
        __slots__ = ["procfs_path"]

        def __init__(self):
            self.procfs_path = get_procfs_path()

    class ContainerDetector(_VirtualizationBase):
        """A "container" is typically "shared kernel virtualization",
        e.g. LXC.
        """

        def _container_from_string(self, s):
            s = s.strip()
            assert s, repr(s)
            mapping = {
                "docker": VIRTUALIZATION_DOCKER,
                "lxc": VIRTUALIZATION_LXC,
                "lxc-libvirt": VIRTUALIZATION_LXC_LIBVIRT,
                "podman": VIRTUALIZATION_PODMAN,
                "rkt": VIRTUALIZATION_RKT,
                "systemd-nspawn": VIRTUALIZATION_SYSTEMD_NSPAWN,
                "wsl": VIRTUALIZATION_WSL,
            }

            if s == "oci":
                # Some images hardcode container=oci, but OCI is not
                # a specific container manager. Try to detect one
                # based on well-known files.
                try:
                    return self.look_for_known_files() or \
                        VIRTUALIZATION_CONTAINER_OTHER
                except Exception as err:
                    debug(err)
                    return VIRTUALIZATION_CONTAINER_OTHER

            for k, v in mapping.items():
                if s.lower().startswith(k):
                    return v

            return VIRTUALIZATION_CONTAINER_OTHER

        def detect_openvz(self):
            # Check for OpenVZ / Virtuozzo.
            # /proc/vz exists in container and outside of the container,
            # /proc/bc only outside of the container.
            if os.path.exists("%s/vz" % self.procfs_path):
                if not os.path.exists("%s/bc" % self.procfs_path):
                    return VIRTUALIZATION_OPENVZ

        def detect_wsl(self):
            # "Official" way of detecting WSL:
            # https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
            data = cat('%s/sys/kernel/osrelease' % self.procfs_path)
            if "Microsoft" in data or "WSL" in data:
                return VIRTUALIZATION_WSL

        def detect_proot(self):
            data = cat('%s/self/status' % self.procfs_path)
            m = re.search(r'TracerPid:\t(\d+)', data)
            if m:
                tracer_pid = int(m.group(1))
                if tracer_pid != 0:
                    pname = Process(tracer_pid).name()
                    if pname.startswith("proot"):
                        return VIRTUALIZATION_PROOT

        def ask_run_host(self):
            # The container manager might have placed this in the /run/host/
            # hierarchy for us, which is good because it doesn't require root
            # privileges.
            data = cat("/run/host/container-manager")
            return self._container_from_string(data)

        def ask_run_systemd(self):
            # ...Otherwise PID 1 might have dropped this information into a
            # file in /run. This is better than accessing /proc/1/environ,
            # since we don't need CAP_SYS_PTRACE for that.
            data = cat("/run/systemd/container")
            return self._container_from_string(data)

        def ask_pid_1(self):
            # Only works if running as root or if we have CAP_SYS_PTRACE.
            env = Process(1).environ()
            if "container" in env:
                return self._container_from_string(env["container"])

        def look_for_known_files(self):
            # Check for existence of some well-known files. We only do this
            # after checking for other specific container managers, otherwise
            # we risk mistaking another container manager for Docker: the
            # /.dockerenv file could inadvertently end up in a file system
            # image.
            if os.path.exists("/run/.containerenv"):
                # https://github.com/containers/podman/issues/6192
                # https://github.com/containers/podman/issues/3586#issuecomment-661918679
                return VIRTUALIZATION_PODMAN
            if os.path.exists("/.dockerenv"):
                # https://github.com/moby/moby/issues/18355
                return VIRTUALIZATION_DOCKER
            # Note: virt-what also checks for `/.dockerinit`.

    class VmDetector(_VirtualizationBase):
        """A "vm" means "full hardware virtualization", e.g. VirtualBox."""

        def ask_dmi(self):
            files = [
                "/sys/class/dmi/id/product_name",
                "/sys/class/dmi/id/sys_vendor",
                "/sys/class/dmi/id/board_vendor",
                "/sys/class/dmi/id/bios_vendor",
            ]
            for file in files:
                out = cat(file, fallback="").strip()
                for k, v in DMI_VENDORS_TABLE.items():
                    if out.startswith(k):
                        debug("virtualization found in file %r" % file)
                        return v

        def detect_uml(self):
            with open_binary('%s/cpuinfo' % self.procfs_path) as f:
                for line in f:
                    if line.lower().startswith(b"vendor_id"):
                        value = line.partition(b":")[2].strip()
                        if value == b"User Mode Linux":
                            return VIRTUALIZATION_UML

        def ask_cpuid(self):
            vendor = cext.linux_cpuid()
            if vendor and vendor in CPUID_VENDORS_TABLE:
                return CPUID_VENDORS_TABLE[vendor]

        def detect_xen(self):
            if os.path.exists('%s/xen' % self.procfs_path):
                return VIRTUALIZATION_XEN
            data = cat("/sys/hypervisor/type").strip()
            if data == "xen":
                return VIRTUALIZATION_XEN
            else:
                return VIRTUALIZATION_VM_OTHER

        def ask_device_tree(self):
            path = "%s/device-tree/hypervisor/compatible" % self.procfs_path
            data = cat(path).strip()
            if data == "linux,kvm":
                return VIRTUALIZATION_KVM
            elif data == "xen":
                return VIRTUALIZATION_XEN
            elif data == "vmware":
                return VIRTUALIZATION_VMWARE
            else:
                return VIRTUALIZATION_VM_OTHER

        def detect_powervm(self):
            base = "%s/device-tree" % self.procfs_path
            if os.path.exists("%s/ibm,partition-name" % base):
                if os.path.exists("%s/hmc-managed?" % base):
                    if not os.path.exists(
                            "%s/chosen/qemu,graphic-width" % base):
                        return VIRTUALIZATION_POWERVM

        def detect_qemu(self):
            if "fw-cfg" in os.listdir("%s/device-tree" % self.procfs_path):
                return VIRTUALIZATION_QEMU
            # https://unix.stackexchange.com/a/89718
            for name in os.listdir("/dev/disk/by-id/"):
                if "QEMU" in name:
                    return VIRTUALIZATION_QEMU

        def detect_zvm(self):
            with open_text("%s/sysinfo" % self.procfs_path) as f:
                for line in f:
                    if line.startswith("VM00 Control Program"):
                        if "z/VM" in line.partition("VM00 Control Program")[2]:
                            return VIRTUALIZATION_ZVM
                        else:
                            return VIRTUALIZATION_KVM

    class VmDetectorOthers(_VirtualizationBase):
        """Mostly stuff took from `virt-what` bash script."""

        def ask_proc_cpuinfo(self):
            with open_binary('%s/cpuinfo' % self.procfs_path) as f:
                for line in f:
                    if line.lower().startswith(b"vendor_id"):
                        value = line.partition(b":")[2].strip()
                        if b"PowerVM Lx86" in value:
                            return VIRTUALIZATION_POWERVM
                        elif b"IBM/S390" in value:
                            return VIRTUALIZATION_IBM_SYSTEMZ

    def get_functions():
        # There is a distinction between containers and VMs.
        # A "container" is typically "shared kernel virtualization", e.g. LXC.
        # A "vm" is "full hardware virtualization", e.g. VirtualBox.
        # If multiple virtualization solutions are used, only the innermost
        # is detected and identified. E.g. if both machine and container
        # virtualization are used in conjunction, only the latter will be
        # returned.
        container = ContainerDetector()
        vm = VmDetector()
        vmothers = VmDetectorOthers()
        # order matters (FIFO), and it's the same as `systemd-detect-virt`.
        funcs = [
            # containers
            container.detect_openvz,
            container.detect_wsl,
            container.detect_proot,
            container.ask_run_host,
            container.ask_run_systemd,
            container.ask_pid_1,
            container.look_for_known_files,  # podman / docker
            # vms
            vm.ask_dmi,
            vm.detect_uml,  # uml
            vm.ask_cpuid,
            vm.detect_xen,  # xen
            vm.ask_device_tree,
            vm.detect_powervm,
            vm.detect_qemu,
            vm.detect_zvm,  # zvm
            # vms others
            vmothers.ask_proc_cpuinfo,
        ]
        return funcs

# =====================================================================
# --- Windows
# =====================================================================

elif WINDOWS:
    import winreg

    from . import _psutil_windows as cext

    def _winreg_key_exists(key, root=winreg.HKEY_LOCAL_MACHINE):
        try:
            with winreg.OpenKeyEx(root, key):
                return True
        except FileNotFoundError:
            return False

    def _winreg_key_get(key, subkey, root=winreg.HKEY_LOCAL_MACHINE):
        try:
            with winreg.OpenKeyEx(root, key) as handle:
                return winreg.QueryValueEx(handle, subkey)[0]
        except FileNotFoundError:
            return ""

    class GenericDetector:

        @staticmethod
        def ask_cpuid():
            vendor = cext.cpuid()
            if vendor is not None:
                if vendor in CPUID_VENDORS_TABLE:
                    return CPUID_VENDORS_TABLE[vendor]
                else:
                    return VIRTUALIZATION_VM_OTHER

    class VboxDetector:
        # https://github.com/a0rtega/pafish/blob/master/pafish/vbox.c

        @staticmethod
        def from_registry_1():
            keys = [
                "SOFTWARE\\Oracle\\VirtualBox Guest Additions",
                "HARDWARE\\ACPI\\DSDT\\VBOX__",
                "HARDWARE\\ACPI\\FADT\\VBOX__",
                "HARDWARE\\ACPI\\RSDT\\VBOX__",
                "SYSTEM\\ControlSet001\\Services\\VBoxGuest",
                "SYSTEM\\ControlSet001\\Services\\VBoxMouse",
                "SYSTEM\\ControlSet001\\Services\\VBoxService",
                "SYSTEM\\ControlSet001\\Services\\VBoxSF",
                "SYSTEM\\ControlSet001\\Services\\VBoxVideo",
            ]
            for k in keys:
                if _winreg_key_exists(k):
                    return VIRTUALIZATION_VIRTUALBOX

        @staticmethod
        def from_registry_2():
            triples = [
                ("HARDWARE\\DEVICEMAP\\Scsi\\Scsi Port 0\\Scsi Bus 0\\Target Id 0\\Logical Unit Id 0", "Identifier", "VBOX"),  # NOQA
                ("HARDWARE\\Description\\System", "SystemBiosVersion", "VBOX"),
                ("HARDWARE\\Description\\System", "VideoBiosVersion", "VIRTUALBOX"),  # NOQA
                ("HARDWARE\\DESCRIPTION\\System", "SystemBiosDate", "06/23/99"),  # NOQA
            ]
            for key, subkey, value in triples:
                if _winreg_key_get(key, subkey).startswith(value):
                    return VIRTUALIZATION_VIRTUALBOX

        @staticmethod
        def from_devices():
            devs = [
                "\\\\.\\VBoxMiniRdrDN",
                "\\\\.\\pipe\\VBoxMiniRdDN",
                "\\\\.\\VBoxTrayIPC",
                "\\\\.\\pipe\\VBoxTrayIPC",
            ]
            for dev in devs:
                try:
                    with open(dev):
                        return VIRTUALIZATION_VIRTUALBOX
                except PermissionError:
                    return VIRTUALIZATION_VIRTUALBOX
                except Exception:
                    pass

    def get_functions():
        generic = GenericDetector()
        vbox = VboxDetector()
        return [
            generic.ask_cpuid,
            vbox.from_registry_1,
            vbox.from_registry_2,
            vbox.from_devices,
        ]

# =====================================================================
# --- OpenBSD
# =====================================================================

elif OPENBSD:
    from . import _psutil_bsd as cext

    def _find_dmi_or_cpuid_match(inputstr):
        # Try to match input against known CPUID / DMI strings.
        inputstr = inputstr.strip()
        for k, v in CPUID_VENDORS_TABLE.items():
            if inputstr.startswith(k):
                debug("found CPUID match")
                return v
        for k, v in DMI_VENDORS_TABLE.items():
            if inputstr.startswith(k):
                debug("found DMI match")
                return v

    def ask_cpu_vendor():
        # Vendor is determined via "sysctl hw.vendor". On VirtualBox
        # this returns "innotek GmbH", which is the same DMI value
        # returned on Linux.
        vendor = cext.cpu_vendor()
        if vendor:
            return _find_dmi_or_cpuid_match(vendor)

    def ask_cpu_chipset():
        # Chipset is determined via "sysctl hw.product". On VirtualBox
        # this returns "VirtualBox". If it's not that we try to match
        # it against known CPUID / DMI strings.
        chipset = cext.cpu_chipset()
        if chipset.startswith("VirtualBox"):
            return VIRTUALIZATION_VIRTUALBOX
        if chipset:
            return _find_dmi_or_cpuid_match(chipset)

    def get_functions():
        return [
            ask_cpu_vendor,
            ask_cpu_chipset,
        ]

# ---


def detect():
    funcs = get_functions()
    retval = None
    for func in funcs:
        if hasattr(func, "__self__"):  # it's a class method
            func_name = "%s.%s" % (
                func.__self__.__class__.__name__, func.__name__)
        else:  # it's a function
            func_name = func.__name__

        debug("trying method %r" % func_name)
        try:
            retval = func()
            if retval:
                debug("virtualization technology %r found via %r method" % (
                      retval, func_name))
                break
        except (IOError, OSError) as err:
            debug(err)
        except (AccessDenied, NoSuchProcess) as err:
            debug(err)

    return retval or ""