summaryrefslogtreecommitdiff
path: root/utilities
diff options
context:
space:
mode:
authorEelco Chaudron <echaudro@redhat.com>2021-12-22 10:19:02 +0100
committerIlya Maximets <i.maximets@ovn.org>2022-01-18 00:46:30 +0100
commit85d3785e6a59b133e2fe195efe5729d08cb6b930 (patch)
tree3680f3464c412082b5b7bc2bad702fcf65adaac9 /utilities
parent51ec98635e3f193fc8b0b4fd5636dd831e4d3073 (diff)
downloadopenvswitch-85d3785e6a59b133e2fe195efe5729d08cb6b930.tar.gz
utilities: Add netlink flow operation USDT probes and upcall_cost script.
This patch adds a series of NetLink flow operation USDT probes. These probes are in turn used in the upcall_cost Python script, which in addition of some kernel tracepoints, give an insight into the time spent on processing upcall. Signed-off-by: Eelco Chaudron <echaudro@redhat.com> Acked-by: Paolo Valerio <pvalerio@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
Diffstat (limited to 'utilities')
-rw-r--r--utilities/automake.mk4
-rwxr-xr-xutilities/usdt-scripts/upcall_cost.py1787
2 files changed, 1790 insertions, 1 deletions
diff --git a/utilities/automake.mk b/utilities/automake.mk
index 9da729404..eb57653a1 100644
--- a/utilities/automake.mk
+++ b/utilities/automake.mk
@@ -63,6 +63,7 @@ EXTRA_DIST += \
utilities/docker/debian/Dockerfile \
utilities/docker/debian/build-kernel-modules.sh \
utilities/usdt-scripts/bridge_loop.bt \
+ utilities/usdt-scripts/upcall_cost.py \
utilities/usdt-scripts/upcall_monitor.py
MAN_ROOTS += \
utilities/ovs-testcontroller.8.in \
@@ -132,6 +133,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \
utilities/ovs-check-dead-ifs.in \
utilities/ovs-tcpdump.in \
utilities/ovs-pipegen.py \
- utilities/usdt-scripts/upcall_monitor.py
+ utilities/usdt-scripts/upcall_monitor.py \
+ utilities/usdt-scripts/upcall_cost.py
include utilities/bugtool/automake.mk
diff --git a/utilities/usdt-scripts/upcall_cost.py b/utilities/usdt-scripts/upcall_cost.py
new file mode 100755
index 000000000..765669585
--- /dev/null
+++ b/utilities/usdt-scripts/upcall_cost.py
@@ -0,0 +1,1787 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2021 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Script information:
+# -------------------
+# upcall_cost.py uses various user space and kernel space probes to determine
+# the costs (in time) for handling the first packet in user space. It
+# calculates the following costs:
+#
+# - Time it takes from the kernel sending the upcall till it's received by the
+# ovs-vswitchd process.
+# - Time it takes from ovs-vswitchd sending the execute actions command till
+# the kernel receives it.
+# - The total time it takes from the kernel to sent the upcall until it
+# receives the packet execute command.
+# - The total time of the above, minus the time it takes for the actual lookup.
+#
+# In addition, it will also report the number of packets batched, as OVS will
+# first try to read UPCALL_MAX_BATCH(64) packets from kernel space and then
+# does the flow lookups and execution. So the smaller the batch size, the more
+# realistic are the cost estimates.
+#
+# The script does not need any options to attach to a running instance of
+# ovs-vswitchd. However, it's recommended always run the script with the
+# --write-events option. This way, if something does go wrong, the collected
+# data is saved. Use the --help option to see all the available options.
+#
+# Note: In addition to the bcc tools for your specific setup, you need the
+# following Python packages:
+# pip install alive-progress halo psutil scapy strenum text_histogram3
+#
+
+try:
+ from bcc import BPF, USDT, USDTException
+except ModuleNotFoundError:
+ print("WARNING: Can't find the BPF Compiler Collection (BCC) tools!")
+ print(" This is NOT problem if you analyzing previously collected"
+ " data.\n")
+from alive_progress import alive_bar
+from collections import namedtuple
+from halo import Halo
+from scapy.all import TCP, UDP
+from scapy.layers.l2 import Ether
+from strenum import StrEnum
+from text_histogram3 import histogram
+from time import process_time
+
+import argparse
+import ast
+import psutil
+import re
+import struct
+import subprocess
+import sys
+import time
+
+#
+# Global definitions
+#
+DP_TUNNEL_PORT = -1
+
+
+#
+# Actual eBPF source code
+#
+ebpf_source = """
+#include <linux/sched.h>
+#include <linux/skbuff.h>
+#include <uapi/linux/bpf.h>
+
+#define MAX_PACKET <MAX_PACKET_VAL>
+#define MAX_KEY <MAX_KEY_VAL>
+
+enum {
+ EVENT_RECV_UPCALL = 0,
+ EVENT_DP_UPCALL,
+ EVENT_OP_FLOW_PUT,
+ EVENT_OP_FLOW_EXECUTE,
+ EVENT_OVS_PKT_EXEC,
+ _EVENT_MAX_EVENT
+};
+
+#define barrier_var(var) asm volatile("" : "=r"(var) : "0"(var))
+
+struct event_t {
+ u32 event;
+ u32 cpu;
+ u32 pid;
+ u32 upcall_type;
+ u64 ts;
+ u32 pkt_frag_size;
+ u32 pkt_size;
+ u64 key_size;
+ char comm[TASK_COMM_LEN];
+ char dpif_name[32];
+ char dev_name[16];
+ unsigned char pkt[MAX_PACKET];
+ unsigned char key[MAX_KEY];
+};
+
+BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_CNT>);
+BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, _EVENT_MAX_EVENT);
+
+static struct event_t *init_event(u32 type)
+{
+ struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t));
+
+ if (!event) {
+ uint64_t *value = dropcnt.lookup(&type);
+ if (value)
+ __sync_fetch_and_add(value, 1);
+
+ return NULL;
+ }
+
+ event->event = type;
+ event->ts = bpf_ktime_get_ns();
+ event->cpu = bpf_get_smp_processor_id();
+ event->pid = bpf_get_current_pid_tgid();
+ bpf_get_current_comm(&event->comm, sizeof(event->comm));
+
+ return event;
+}
+
+int trace__recv_upcall(struct pt_regs *ctx) {
+ uint32_t upcall_type;
+ uint64_t addr;
+ uint64_t size;
+
+ bpf_usdt_readarg(2, ctx, &upcall_type);
+ if (upcall_type != 0)
+ return 0;
+
+ struct event_t *event = init_event(EVENT_RECV_UPCALL);
+ if (!event)
+ return 1;
+
+ bpf_usdt_readarg(1, ctx, &addr);
+ bpf_probe_read_str(&event->dpif_name, sizeof(event->dpif_name),
+ (void *)addr);
+
+ event->upcall_type = upcall_type;
+ bpf_usdt_readarg(4, ctx, &event->pkt_size);
+ bpf_usdt_readarg(6, ctx, &event->key_size);
+
+ if (event->pkt_size > MAX_PACKET)
+ size = MAX_PACKET;
+ else
+ size = event->pkt_size;
+ bpf_usdt_readarg(3, ctx, &addr);
+ bpf_probe_read(&event->pkt, size, (void *)addr);
+
+ if (event->key_size > MAX_KEY)
+ size = MAX_KEY;
+ else
+ size = event->key_size;
+ bpf_usdt_readarg(5, ctx, &addr);
+ bpf_probe_read(&event->key, size, (void *)addr);
+
+ events.ringbuf_submit(event, 0);
+ return 0;
+};
+
+
+int trace__op_flow_put(struct pt_regs *ctx) {
+ uint64_t addr;
+ uint64_t size;
+
+ struct event_t *event = init_event(EVENT_OP_FLOW_PUT);
+ if (!event) {
+ return 1;
+ }
+
+ events.ringbuf_submit(event, 0);
+ return 0;
+};
+
+
+int trace__op_flow_execute(struct pt_regs *ctx) {
+ uint64_t addr;
+ uint64_t size;
+
+ struct event_t *event = init_event(EVENT_OP_FLOW_EXECUTE);
+ if (!event) {
+ return 1;
+ }
+
+ bpf_usdt_readarg(4, ctx, &event->pkt_size);
+
+ if (event->pkt_size > MAX_PACKET)
+ size = MAX_PACKET;
+ else
+ size = event->pkt_size;
+ bpf_usdt_readarg(3, ctx, &addr);
+ bpf_probe_read(&event->pkt, size, (void *)addr);
+
+ events.ringbuf_submit(event, 0);
+ return 0;
+};
+
+
+TRACEPOINT_PROBE(openvswitch, ovs_dp_upcall) {
+ uint64_t size;
+ struct sk_buff *skb = args->skbaddr;
+
+ if (args->upcall_cmd != 1 || skb == NULL || skb->data == NULL)
+ return 0;
+
+ struct event_t *event = init_event(EVENT_DP_UPCALL);
+ if (!event) {
+ return 1;
+ }
+
+ event->upcall_type = args->upcall_cmd;
+ event->pkt_size = args->len;
+
+ TP_DATA_LOC_READ_CONST(&event->dpif_name, dp_name,
+ sizeof(event->dpif_name));
+ TP_DATA_LOC_READ_CONST(&event->dev_name, dev_name,
+ sizeof(event->dev_name));
+
+ if (skb->data_len != 0) {
+ event->pkt_frag_size = (skb->len - skb->data_len) & 0xfffffff;
+ size = event->pkt_frag_size;
+ } else {
+ event->pkt_frag_size = 0;
+ size = event->pkt_size;
+ }
+
+ /* Prevent clang from using register mirroring (or any optimization) on
+ * the 'size' variable. */
+ barrier_var(size);
+
+ if (size > MAX_PACKET)
+ size = MAX_PACKET;
+ bpf_probe_read_kernel(event->pkt, size, skb->data);
+
+ events.ringbuf_submit(event, 0);
+ return 0;
+}
+
+int kprobe__ovs_packet_cmd_execute(struct pt_regs *ctx, struct sk_buff *skb)
+{
+ uint64_t size;
+
+ if (skb == NULL || skb->data == NULL)
+ return 0;
+
+ struct event_t *event = init_event(EVENT_OVS_PKT_EXEC);
+ if (!event) {
+ return 1;
+ }
+
+ events.ringbuf_submit(event, 0);
+ return 0;
+}
+"""
+
+
+#
+# Event types
+#
+class EventType(StrEnum):
+ RECV_UPCALL = 'dpif_recv__recv_upcall'
+ DP_UPCALL = 'openvswitch__dp_upcall'
+ OP_FLOW_PUT = 'dpif_netlink_operate__op_flow_put'
+ OP_FLOW_EXECUTE = 'dpif_netlink_operate__op_flow_execute'
+ OVS_PKT_EXEC = 'ktrace__ovs_packet_cmd_execute'
+
+ def short_name(name, length=22):
+ if len(name) < length:
+ return name
+
+ return '..' + name[-(length - 2):]
+
+ def from_trace(trace_event):
+ if trace_event == 0:
+ return EventType.RECV_UPCALL
+ elif trace_event == 1:
+ return EventType.DP_UPCALL
+ elif trace_event == 2:
+ return EventType.OP_FLOW_PUT
+ elif trace_event == 3:
+ return EventType.OP_FLOW_EXECUTE
+ elif trace_event == 4:
+ return EventType.OVS_PKT_EXEC
+
+ raise ValueError
+
+
+#
+# Simple event class
+#
+class Event(object):
+ def __init__(self, ts, pid, comm, cpu, event_type):
+ self.ts = ts
+ self.pid = pid
+ self.comm = comm
+ self.cpu = cpu
+ self.event_type = event_type
+
+ def __str__(self):
+ return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}".format(
+ EventType.short_name(self.event_type),
+ self.comm,
+ self.pid,
+ self.cpu,
+ self.ts / 1000000000)
+
+ def __repr__(self):
+ more = ""
+ if self.__class__.__name__ != "Event":
+ more = ", ..."
+
+ return "{}({}, {}, {}, {}, {}{})".format(self.__class__.__name__,
+ self.ts, self.pid,
+ self.comm, self.cpu,
+ self.event_type, more)
+
+ def handle_event(event):
+ event = Event(event.ts, event.pid, event.comm.decode("utf-8"),
+ event.cpu, EventType.from_trace(event.event))
+
+ if not options.quiet:
+ print(event)
+
+ return event
+
+ def get_event_header_str():
+ return "{:<24} {:<16} {:>8} {:<3} {:<18} {}".format(
+ "EVENT", "COMM", "PID", "CPU", "TIME",
+ "EVENT DATA[dpif_name/dp_port/pkt_len/pkt_frag_len]")
+
+
+#
+# dp_upcall event class
+#
+class DpUpcall(Event):
+ def __init__(self, ts, pid, comm, cpu, dpif_name, port, pkt, pkt_len,
+ pkt_frag_len):
+ super(DpUpcall, self).__init__(ts, pid, comm, cpu, EventType.DP_UPCALL)
+ self.dpif_name = dpif_name
+ self.dp_port = get_dp_mapping(dpif_name, port)
+ if self.dp_port is None:
+ #
+ # As we only identify interfaces at startup, new interfaces could
+ # have been added, causing the lookup to fail. Just something to
+ # keep in mind when running this in a dynamic environment.
+ #
+ raise LookupError("Can't find datapath port mapping!")
+ self.pkt = pkt
+ self.pkt_len = pkt_len
+ self.pkt_frag_len = pkt_frag_len
+
+ def __str__(self):
+ return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: " \
+ "{:<17} {:4} {:4} {:4}".format(self.event_type,
+ self.comm,
+ self.pid,
+ self.cpu,
+ self.ts / 1000000000,
+ self.dpif_name,
+ self.dp_port,
+ self.pkt_len,
+ self.pkt_frag_len)
+
+ def handle_event(event):
+ if event.pkt_size < options.packet_size:
+ pkt_len = event.pkt_size
+ else:
+ pkt_len = options.packet_size
+
+ pkt_data = bytes(event.pkt)[:pkt_len]
+
+ if len(pkt_data) <= 0 or event.pkt_size == 0:
+ return
+
+ try:
+ event = DpUpcall(event.ts, event.pid, event.comm.decode("utf-8"),
+ event.cpu, event.dpif_name.decode("utf-8"),
+ event.dev_name.decode("utf-8"),
+ pkt_data,
+ event.pkt_size,
+ event.pkt_frag_size)
+ except LookupError:
+ #
+ # If we can't do the port lookup, ignore this event.
+ #
+ return None
+
+ if not options.quiet:
+ print(event)
+
+ return event
+
+
+#
+# recv_upcall event class
+#
+class RecvUpcall(Event):
+ def __init__(self, ts, pid, comm, cpu, dpif_name, key, pkt, pkt_len):
+ super(RecvUpcall, self).__init__(ts, pid, comm, cpu,
+ EventType.RECV_UPCALL)
+
+ if dpif_name.startswith("system@"):
+ dpif_name = dpif_name[len("system@"):]
+ self.dpif_name = dpif_name
+
+ nla = RecvUpcall.decode_nlm(key, dump=False)
+ if "OVS_KEY_ATTR_IN_PORT" in nla:
+ self.dp_port = struct.unpack('=L', nla["OVS_KEY_ATTR_IN_PORT"])[0]
+ elif "OVS_KEY_ATTR_TUNNEL" in nla:
+ self.dp_port = DP_TUNNEL_PORT
+ else:
+ self.dp_port = RecvUpcall.get_system_dp_port(self.dpif_name)
+
+ if self.dp_port is None:
+ raise LookupError("Can't find RecvUpcall dp port mapping!")
+
+ self.pkt = pkt
+ self.pkt_len = pkt_len
+
+ def __str__(self):
+ return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: {:<17} {:4} {:4}". \
+ format(
+ self.event_type,
+ self.comm,
+ self.pid,
+ self.cpu,
+ self.ts / 1000000000,
+ self.dpif_name,
+ self.dp_port,
+ self.pkt_len)
+
+ def get_system_dp_port(dpif_name):
+ dp_map = get_dp_mapping(dpif_name, "ovs-system", return_map=True)
+ if dpif_name not in dp_map:
+ return None
+ try:
+ return dp_map[dpif_name]["ovs-system"]
+ except KeyError:
+ return None
+
+ def decode_nlm(msg, indent=4, dump=True):
+ bytes_left = len(msg)
+ result = {}
+
+ while bytes_left:
+ if bytes_left < 4:
+ if dump:
+ print("{}WARN: decode truncated; can't read header".format(
+ ' ' * indent))
+ break
+
+ nla_len, nla_type = struct.unpack("=HH", msg[:4])
+
+ if nla_len < 4:
+ if dump:
+ print("{}WARN: decode truncated; nla_len < 4".format(
+ ' ' * indent))
+ break
+
+ nla_data = msg[4:nla_len]
+ trunc = ""
+
+ if nla_len > bytes_left:
+ trunc = "..."
+ nla_data = nla_data[:(bytes_left - 4)]
+ if RecvUpcall.get_ovs_key_attr_str(nla_type) == \
+ "OVS_KEY_ATTR_TUNNEL":
+ #
+ # If we have truncated tunnel information, we still would
+ # like to know. This is due to the special tunnel handling
+ # needed for port matching.
+ #
+ result[RecvUpcall.get_ovs_key_attr_str(nla_type)] = bytes()
+ else:
+ result[RecvUpcall.get_ovs_key_attr_str(nla_type)] = nla_data
+
+ if dump:
+ print("{}nla_len {}, nla_type {}[{}], data: {}{}".format(
+ ' ' * indent, nla_len,
+ RecvUpcall.get_ovs_key_attr_str(nla_type),
+ nla_type,
+ "".join("{:02x} ".format(b) for b in nla_data), trunc))
+
+ if trunc != "":
+ if dump:
+ print("{}WARN: decode truncated; nla_len > msg_len[{}] ".
+ format(' ' * indent, bytes_left))
+ break
+
+ # Update next offset, but make sure it's aligned correctly.
+ next_offset = (nla_len + 3) & ~(3)
+ msg = msg[next_offset:]
+ bytes_left -= next_offset
+
+ return result
+
+ def get_ovs_key_attr_str(attr):
+ ovs_key_attr = ["OVS_KEY_ATTR_UNSPEC",
+ "OVS_KEY_ATTR_ENCAP",
+ "OVS_KEY_ATTR_PRIORITY",
+ "OVS_KEY_ATTR_IN_PORT",
+ "OVS_KEY_ATTR_ETHERNET",
+ "OVS_KEY_ATTR_VLAN",
+ "OVS_KEY_ATTR_ETHERTYPE",
+ "OVS_KEY_ATTR_IPV4",
+ "OVS_KEY_ATTR_IPV6",
+ "OVS_KEY_ATTR_TCP",
+ "OVS_KEY_ATTR_UDP",
+ "OVS_KEY_ATTR_ICMP",
+ "OVS_KEY_ATTR_ICMPV6",
+ "OVS_KEY_ATTR_ARP",
+ "OVS_KEY_ATTR_ND",
+ "OVS_KEY_ATTR_SKB_MARK",
+ "OVS_KEY_ATTR_TUNNEL",
+ "OVS_KEY_ATTR_SCTP",
+ "OVS_KEY_ATTR_TCP_FLAGS",
+ "OVS_KEY_ATTR_DP_HASH",
+ "OVS_KEY_ATTR_RECIRC_ID",
+ "OVS_KEY_ATTR_MPLS",
+ "OVS_KEY_ATTR_CT_STATE",
+ "OVS_KEY_ATTR_CT_ZONE",
+ "OVS_KEY_ATTR_CT_MARK",
+ "OVS_KEY_ATTR_CT_LABELS",
+ "OVS_KEY_ATTR_CT_ORIG_TUPLE_IPV4",
+ "OVS_KEY_ATTR_CT_ORIG_TUPLE_IPV6",
+ "OVS_KEY_ATTR_NSH"]
+
+ if attr < 0 or attr > len(ovs_key_attr):
+ return "<UNKNOWN>"
+
+ return ovs_key_attr[attr]
+
+ def handle_event(event):
+ #
+ # For us, only upcalls with a packet, flow_key, and upcall_type
+ # DPIF_UC_MISS are of interest.
+ #
+ if event.pkt_size <= 0 or event.key_size <= 0 or \
+ event.upcall_type != 0:
+ return
+
+ if event.key_size < options.flow_key_size:
+ key_len = event.key_size
+ else:
+ key_len = options.flow_key_size
+
+ if event.pkt_size < options.packet_size:
+ pkt_len = event.pkt_size
+ else:
+ pkt_len = options.packet_size
+
+ try:
+ event = RecvUpcall(event.ts, event.pid, event.comm.decode("utf-8"),
+ event.cpu, event.dpif_name.decode("utf-8"),
+ bytes(event.key)[:key_len],
+ bytes(event.pkt)[:pkt_len],
+ event.pkt_size)
+ except LookupError:
+ return None
+
+ if not options.quiet:
+ print(event)
+
+ return event
+
+
+#
+# op_flow_execute event class
+#
+class OpFlowExecute(Event):
+ def __init__(self, ts, pid, comm, cpu, pkt, pkt_len):
+ super(OpFlowExecute, self).__init__(ts, pid, comm, cpu,
+ EventType.OP_FLOW_EXECUTE)
+ self.pkt = pkt
+ self.pkt_len = pkt_len
+
+ def __str__(self):
+ return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: " \
+ "{:<17} {:4} {:4}".format(EventType.short_name(self.event_type),
+ self.comm,
+ self.pid,
+ self.cpu,
+ self.ts / 1000000000,
+ "",
+ "",
+ self.pkt_len)
+
+ def handle_event(event):
+ if event.pkt_size < options.packet_size:
+ pkt_len = event.pkt_size
+ else:
+ pkt_len = options.packet_size
+
+ pkt_data = bytes(event.pkt)[:pkt_len]
+
+ if len(pkt_data) <= 0 or event.pkt_size == 0:
+ return
+
+ event = OpFlowExecute(event.ts, event.pid, event.comm.decode("utf-8"),
+ event.cpu, pkt_data, event.pkt_size)
+
+ if not options.quiet:
+ print(event)
+
+ return event
+
+
+#
+# get_dp_mapping()
+#
+def get_dp_mapping(dp, port, return_map=False, dp_map=None):
+ if options.unit_test:
+ return port
+
+ if dp_map is not None:
+ get_dp_mapping.dp_port_map_cache = dp_map
+
+ #
+ # Build a cache, so we do not have to execue the ovs command each time.
+ #
+ if not hasattr(get_dp_mapping, "dp_port_map_cache"):
+ try:
+ output = subprocess.check_output(['ovs-appctl', 'dpctl/show'],
+ encoding='utf8').split("\n")
+ except subprocess.CalledProcessError:
+ output = ""
+ pass
+
+ current_dp = None
+ get_dp_mapping.dp_port_map_cache = {}
+
+ for line in output:
+ match = re.match("^system@(.*):$", line)
+ if match is not None:
+ current_dp = match.group(1)
+
+ match = re.match("^ port ([0-9]+): ([^ /]*)", line)
+ if match is not None and current_dp is not None:
+ try:
+ get_dp_mapping.dp_port_map_cache[
+ current_dp][match.group(2)] = int(match.group(1))
+ except KeyError:
+ get_dp_mapping.dp_port_map_cache[current_dp] = \
+ {match.group(2): int(match.group(1))}
+
+ if return_map:
+ return get_dp_mapping.dp_port_map_cache
+
+ if dp not in get_dp_mapping.dp_port_map_cache or \
+ port not in get_dp_mapping.dp_port_map_cache[dp]:
+ return None
+
+ return get_dp_mapping.dp_port_map_cache[dp][port]
+
+
+#
+# event_to_dict()
+#
+def event_to_dict(event):
+ event_dict = {}
+
+ for field, _ in event._fields_:
+ if isinstance(getattr(event, field), (int, bytes)):
+ event_dict[field] = getattr(event, field)
+ else:
+ if (field == "key" and event.key_size == 0) or \
+ (field == "pkt" and event.pkt_size == 0):
+ data = bytes()
+ else:
+ data = bytes(getattr(event, field))
+
+ event_dict[field] = data
+
+ return event_dict
+
+
+#
+# receive_event_bcc()
+#
+def receive_event_bcc(ctx, data, size):
+ global events_received
+ events_received += 1
+
+ event = b['events'].event(data)
+
+ if export_file is not None:
+ export_file.write("event = {}\n".format(event_to_dict(event)))
+
+ receive_event(event)
+
+
+#
+# receive_event()
+#
+def receive_event(event):
+ global event_count
+
+ if event.event == 0:
+ trace_event = RecvUpcall.handle_event(event)
+ elif event.event == 1:
+ trace_event = DpUpcall.handle_event(event)
+ elif event.event == 2:
+ trace_event = Event.handle_event(event)
+ elif event.event == 3:
+ trace_event = OpFlowExecute.handle_event(event)
+ elif event.event == 4:
+ trace_event = Event.handle_event(event)
+
+ try:
+ event_count['total'][EventType.from_trace(event.event)] += 1
+ except KeyError:
+ event_count['total'][EventType.from_trace(event.event)] = 1
+ event_count['valid'][EventType.from_trace(event.event)] = 0
+
+ if trace_event is not None:
+ event_count['valid'][EventType.from_trace(event.event)] += 1
+ trace_data.append(trace_event)
+
+
+#
+# collect_event_sets()
+#
+def collect_event_sets(events, collect_stats=False, profile=False,
+ spinner=False):
+ t1_time = 0
+
+ def t1_start():
+ nonlocal t1_time
+ t1_time = process_time()
+
+ def t1_stop(description):
+ print("* PROFILING: {:<50}: {:.06f} seconds".format(
+ description, process_time() - t1_time))
+
+ warn_parcial_match = False
+ warn_frag = False
+
+ if profile:
+ t1_start()
+ #
+ # First let's create a dict of per handler thread events.
+ #
+ threads = {}
+ threads_result = {}
+ for idx, event in enumerate(events):
+ if event.event_type == EventType.DP_UPCALL:
+ continue
+ if event.pid not in threads:
+ threads[event.pid] = []
+ threads[event.pid].append([idx, event])
+
+ if profile:
+ t1_stop("Creating per thread dictionary")
+ t1_start()
+ #
+ # Now spit them in per upcall sets, but remember that
+ # RecvUpcall event can be batched.
+ #
+ batch_stats = []
+ for thread, items in threads.items():
+ thread_set = []
+ batch = []
+ ovs_pkt_exec_set = []
+ batching = True
+ collecting = 0
+ has_flow_put = False
+ has_flow_exec = False
+
+ def next_batch():
+ nonlocal batching, batch, collecting, has_flow_put, has_flow_exec
+ nonlocal ovs_pkt_exec_set, thread_set
+
+ if len(batch) > 0:
+ #
+ # If we are done with the batch, see if we need to match up
+ # any batched OVS_PKT_EXEC events.
+ #
+ for event in batch:
+ if len(ovs_pkt_exec_set) <= 0:
+ break
+ if any(isinstance(item,
+ OpFlowExecute) for item in event[2]):
+ event[2].append(ovs_pkt_exec_set.pop(0))
+ #
+ # Append the batch to the thread-specific set.
+ #
+ thread_set = thread_set + batch
+ if collect_stats:
+ batch_stats.append(len(batch))
+ batching = True
+ batch = []
+ ovs_pkt_exec_set = []
+ has_flow_put = False
+ has_flow_exec = False
+ collecting = 0
+
+ def next_batch_set():
+ nonlocal has_flow_put, has_flow_exec, collecting
+ has_flow_put = False
+ has_flow_exec = False
+ collecting += 1
+
+ for item in items:
+ idx, event = item
+
+ if batching:
+ if event.event_type == EventType.RECV_UPCALL:
+ batch.append(item + [[]])
+ elif len(batch) > 0:
+ batching = False
+ collecting = 0
+ else:
+ continue
+
+ if not batching:
+ if event.event_type == EventType.RECV_UPCALL:
+ next_batch()
+ batch.append(item + [[]])
+ else:
+ if event.event_type == EventType.OP_FLOW_PUT:
+ if has_flow_put:
+ next_batch_set()
+ if collecting >= len(batch):
+ next_batch()
+ continue
+
+ batch[collecting][2].append(item[1])
+ has_flow_put = True
+
+ elif event.event_type == EventType.OP_FLOW_EXECUTE:
+ if has_flow_exec:
+ next_batch_set()
+ if collecting >= len(batch):
+ next_batch()
+ continue
+
+ if (event.pkt_len == batch[collecting][1].pkt_len
+ and event.pkt == batch[collecting][1].pkt):
+ batch[collecting][2].append(item[1])
+ has_flow_put = True
+ has_flow_exec = True
+ else:
+ #
+ # If we end up here it could be that an upcall in a
+ # batch did not generate an EXECUTE and we are out
+ # of sync. Try to match it to the next batch entry.
+ #
+ next_idx = collecting + 1
+ while True:
+ if next_idx >= len(batch):
+ next_batch()
+ break
+ if (event.pkt_len == batch[next_idx][1].pkt_len
+ and event.pkt == batch[next_idx][1].pkt):
+
+ batch[next_idx][2] = batch[collecting][2]
+ batch[collecting][2] = []
+ collecting = next_idx
+ batch[collecting][2].append(item[1])
+ has_flow_put = True
+ has_flow_exec = True
+ break
+
+ next_idx += 1
+
+ elif event.event_type == EventType.OVS_PKT_EXEC:
+ #
+ # The OVS_PKT_EXEC might also be batched, so we keep
+ # them in a separate list and assign them to the
+ # correct set when completing the set.
+ #
+ ovs_pkt_exec_set.append(item[1])
+ continue
+
+ if collecting >= len(batch):
+ next_batch()
+ next_batch()
+ threads_result[thread] = thread_set
+
+ if profile:
+ t1_stop("Creating upcall sets")
+ t1_start()
+
+ #
+ # Move thread results from list to dictionary
+ #
+ thread_stats = {}
+ for thread, sets in threads_result.items():
+ if len(sets) > 0:
+ thread_stats[sets[0][1].comm] = len(sets)
+
+ threads_result[thread] = {}
+ for upcall in sets:
+ threads_result[thread][upcall[0]] = [upcall[1]] + upcall[2]
+
+ if profile:
+ t1_stop("Moving upcall list to dictionary")
+ t1_start()
+
+ if options.debug & 0x4000000 != 0:
+ print()
+ for thread, sets in threads_result.items():
+ for idx, idx_set in sets.items():
+ print("DBG: {}".format(idx_set))
+
+ #
+ # Create two lists on with DP_UPCALLs and RECV_UPCALLs
+ #
+ dp_upcall_list = []
+ recv_upcall_list = []
+ for idx, event in enumerate(events):
+ if event.event_type == EventType.DP_UPCALL:
+ dp_upcall_list.append([idx, event])
+ elif event.event_type == EventType.RECV_UPCALL:
+ recv_upcall_list.append([idx, event])
+
+ if profile:
+ t1_stop("Creating DP_UPCALL and RECV_UPCALL lists")
+ t1_start()
+
+ if options.debug & 0x4000000 != 0:
+ print()
+ for dp_upcall in dp_upcall_list:
+ print("DBG: {}".format(dp_upcall))
+ print()
+ for recv_upcall in recv_upcall_list:
+ print("DBG: {}".format(recv_upcall))
+
+ #
+ # Now find the matching DP_UPCALL and RECV_UPCALL events
+ #
+ event_sets = []
+ if spinner:
+ print()
+ with alive_bar(len(dp_upcall_list),
+ title="- Matching DP_UPCALLs to RECV_UPCALLs",
+ spinner=None, disable=not spinner) as bar:
+
+ for (idx, event) in dp_upcall_list:
+ remove_indexes = []
+ this_set = None
+ #
+ # TODO: This part needs some optimization, as it's slow in the
+ # PVP test scenario. This is because a lot of DP_UPCALLS
+ # will not have a matching RECV_UPCALL leading to walking
+ # the entire recv_upcall_list list.
+ #
+ # Probably some dictionary, but in the PVP scenario packets
+ # come from a limited set of ports, and the length is all the
+ # same. So we do need the key to be recv.dport +
+ # len(recv.pkt) + recv.pkt, however, the recv.pkt compare
+ # needs to happen on min(len(event.pkt), len(recv.pkt)).
+ #
+ for idx_in_list, (idx_recv, recv) in enumerate(recv_upcall_list):
+ match = False
+
+ if idx_recv < idx:
+ remove_indexes.append(idx_in_list)
+ continue
+ #
+ # If the RecvUpcall is a tunnel port, we can not map it to
+ # the correct tunnel. For now, we assume the first matching
+ # packet is the correct one. For more details see the OVS
+ # ukey_to_flow_netdev() function.
+ #
+ if (event.dp_port == recv.dp_port or
+ recv.dp_port == DP_TUNNEL_PORT) \
+ and event.pkt_len == recv.pkt_len:
+
+ compare_len = min(len(event.pkt), len(recv.pkt))
+
+ if len(event.pkt) != len(recv.pkt) \
+ and event.pkt_frag_len == 0:
+ warn_parcial_match = True
+ elif event.pkt_frag_len != 0:
+ warn_frag = True
+ compare_len = min(compare_len, event.pkt_frag_len)
+
+ if event.pkt[:compare_len] == recv.pkt[:compare_len]:
+ match = True
+ else:
+ #
+ # There are still some corner cases due to the fact
+ # the kernel dp_upcall tracepoint is hit before the
+ # packet is prepared/modified for upcall pass on.
+ # Example cases are packet checksum update, VLAN
+ # insertion, etc., etc. For now, we try to handle the
+ # checksum part, but we might need to add more
+ # exceptions in the future.
+ #
+ diff_bytes = sum(i != j for i, j in zip(
+ event.pkt[:compare_len], recv.pkt[:compare_len]))
+
+ if diff_bytes <= 2 and compare_len > 56:
+ # This could be a TCP or UDP checksum
+ event_pkt = Ether(bytes(event.pkt)[:compare_len])
+ recv_pkt = Ether(bytes(recv.pkt)[:compare_len])
+ if (event_pkt.haslayer(TCP) and
+ recv_pkt.haslayer(TCP)) or (
+ event_pkt.haslayer(UDP) and
+ recv_pkt.haslayer(UDP)):
+
+ if event_pkt.haslayer(TCP):
+ event_chksum = event_pkt[TCP].chksum
+ recv_chksum = recv_pkt[TCP].chksum
+ else:
+ event_chksum = event_pkt[UDP].chksum
+ recv_chksum = recv_pkt[UDP].chksum
+
+ if event_chksum & 0xff != recv_chksum & 0xff:
+ diff_bytes -= 1
+ if event_chksum & 0xff00 != \
+ recv_chksum & 0xff00:
+ diff_bytes -= 1
+
+ if diff_bytes == 0:
+ match = True
+
+ if match:
+ this_set = {event.event_type: event}
+ for sevent in threads_result[recv.pid][idx_recv]:
+ this_set[sevent.event_type] = sevent
+ event_sets.append(this_set)
+ remove_indexes.append(idx_in_list)
+
+ if options.debug & 0x4000000 != 0:
+ print("DBG: Matched DpUpcall({:6}) => "
+ "RecvUpcall({:6})".format(idx, idx_recv))
+
+ break
+
+ elif options.debug & 0x8000000 != 0:
+ print("DBG: COMPARE DpUpcall({:6}) != "
+ "RecvUpcall({:6})".format(idx, idx_recv))
+ event_pkt = Ether(bytes(event.pkt)[:compare_len])
+ recv_pkt = Ether(bytes(recv.pkt)[:compare_len])
+ print(re.sub('^', 'DBG:' + ' ' * 4,
+ event_pkt.show(dump=True),
+ flags=re.MULTILINE))
+ print(re.sub('^', 'DBG:' + ' ' * 4,
+ recv_pkt.show(dump=True),
+ flags=re.MULTILINE))
+
+ elif options.debug & 0x8000000 != 0:
+ print("DBG: COMPATE DpUpcall({:6}) != "
+ "RecvUpcall({:6}) -> port {}, {} -> "
+ "len = {}, {}".format(idx, idx_recv,
+ event.dp_port,
+ recv.dp_port,
+ event.pkt_len,
+ recv.pkt_len))
+
+ bar()
+ for remove_idx in sorted(remove_indexes, reverse=True):
+ del recv_upcall_list[remove_idx]
+
+ if profile:
+ t1_stop("Matching DP_UPCALLs to a set")
+
+ if warn_parcial_match:
+ print("WARNING: Packets not fully captured for matching!\n "
+ "Increase the packet buffer with the '--packet-size' option.")
+ if warn_frag:
+ print("WARNING: SKB from kernel had fragments, we could only copy/"
+ "compare the first part!")
+
+ if collect_stats:
+ return event_sets, batch_stats, thread_stats
+
+ return event_sets
+
+
+#
+# unit_test()
+#
+def unit_test():
+ pkt1 = b'\x01\x02\x03\x04\x05'
+ pkt2 = b'\x01\x02\x03\x04\x06'
+ pkt3 = b'\x01\x02\x03\x04\x07'
+ key = b'\x08\x00\x03\x00\x01\x00\x00\x00' # Port 1
+ #
+ # Basic test with all events in line
+ #
+ t1_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ Event(3, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t1_result = [{EventType.DP_UPCALL: t1_events[0],
+ EventType.RECV_UPCALL: t1_events[1],
+ EventType.OP_FLOW_PUT: t1_events[2],
+ EventType.OP_FLOW_EXECUTE: t1_events[3],
+ EventType.OVS_PKT_EXEC: t1_events[4]}]
+ #
+ # Basic test with missing flow put
+ #
+ t2_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t2_result = [{EventType.DP_UPCALL: t2_events[0],
+ EventType.RECV_UPCALL: t2_events[1],
+ EventType.OP_FLOW_EXECUTE: t2_events[2],
+ EventType.OVS_PKT_EXEC: t2_events[3]}]
+ #
+ # Test with RecvUpcall's being batched
+ #
+ t3_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+ DpUpcall(3, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
+ RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ RecvUpcall(5, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
+ RecvUpcall(6, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+ Event(7, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(9, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(10, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(11, 1, "hndl", 1, pkt3, len(pkt3)),
+ Event(12, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(13, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(14, 1, "hndl", 1, pkt2, len(pkt2)),
+ Event(15, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t3_result = [{EventType.DP_UPCALL: t3_events[0],
+ EventType.RECV_UPCALL: t3_events[3],
+ EventType.OP_FLOW_PUT: t3_events[6],
+ EventType.OP_FLOW_EXECUTE: t3_events[7],
+ EventType.OVS_PKT_EXEC: t3_events[8]},
+ {EventType.DP_UPCALL: t3_events[1],
+ EventType.RECV_UPCALL: t3_events[5],
+ EventType.OP_FLOW_PUT: t3_events[12],
+ EventType.OP_FLOW_EXECUTE: t3_events[13],
+ EventType.OVS_PKT_EXEC: t3_events[14]},
+ {EventType.DP_UPCALL: t3_events[2],
+ EventType.RECV_UPCALL: t3_events[4],
+ EventType.OP_FLOW_PUT: t3_events[9],
+ EventType.OP_FLOW_EXECUTE: t3_events[10],
+ EventType.OVS_PKT_EXEC: t3_events[11]}]
+ #
+ # Test with RecvUpcall's single + batch
+ #
+ t4_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ Event(3, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ DpUpcall(6, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ DpUpcall(7, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+ DpUpcall(8, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
+ RecvUpcall(9, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ RecvUpcall(10, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
+ RecvUpcall(11, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+ Event(12, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(13, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(14, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(15, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(16, 1, "hndl", 1, pkt3, len(pkt3)),
+ Event(17, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(18, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(14, 1, "hndl", 1, pkt2, len(pkt2)),
+ Event(19, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t4_result = [{EventType.DP_UPCALL: t4_events[0],
+ EventType.RECV_UPCALL: t4_events[1],
+ EventType.OP_FLOW_PUT: t4_events[2],
+ EventType.OP_FLOW_EXECUTE: t4_events[3],
+ EventType.OVS_PKT_EXEC: t4_events[4]},
+ {EventType.DP_UPCALL: t4_events[5],
+ EventType.RECV_UPCALL: t4_events[8],
+ EventType.OP_FLOW_PUT: t4_events[11],
+ EventType.OP_FLOW_EXECUTE: t4_events[12],
+ EventType.OVS_PKT_EXEC: t4_events[13]},
+ {EventType.DP_UPCALL: t4_events[6],
+ EventType.RECV_UPCALL: t4_events[10],
+ EventType.OP_FLOW_PUT: t4_events[17],
+ EventType.OP_FLOW_EXECUTE: t4_events[18],
+ EventType.OVS_PKT_EXEC: t4_events[19]},
+ {EventType.DP_UPCALL: t4_events[7],
+ EventType.RECV_UPCALL: t4_events[9],
+ EventType.OP_FLOW_PUT: t4_events[14],
+ EventType.OP_FLOW_EXECUTE: t4_events[15],
+ EventType.OVS_PKT_EXEC: t4_events[16]}]
+ #
+ # Test with two threads interleaved
+ #
+ t5_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ DpUpcall(2, 100, "ping", 1, "system", 1, pkt2, len(pkt2), 0),
+ RecvUpcall(3, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ RecvUpcall(4, 2, "hndl", 2, "systen", key, pkt2, len(pkt2)),
+ Event(5, 2, "hndl", 2, EventType.OP_FLOW_PUT),
+ Event(6, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(7, 2, "hndl", 1, pkt2, len(pkt2)),
+ OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(9, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(10, 2, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t5_result = [{EventType.DP_UPCALL: t5_events[0],
+ EventType.RECV_UPCALL: t5_events[2],
+ EventType.OP_FLOW_PUT: t5_events[5],
+ EventType.OP_FLOW_EXECUTE: t5_events[7],
+ EventType.OVS_PKT_EXEC: t5_events[8]},
+ {EventType.DP_UPCALL: t5_events[1],
+ EventType.RECV_UPCALL: t5_events[3],
+ EventType.OP_FLOW_PUT: t5_events[4],
+ EventType.OP_FLOW_EXECUTE: t5_events[6],
+ EventType.OVS_PKT_EXEC: t5_events[9]}]
+ #
+ # Test batch with missing events
+ #
+ t6_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+ RecvUpcall(3, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+ Event(5, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(6, 1, "hndl", 1, pkt2, len(pkt2)),
+ Event(7, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t6_result = [{EventType.DP_UPCALL: t6_events[0],
+ EventType.RECV_UPCALL: t6_events[2]},
+ {EventType.DP_UPCALL: t6_events[1],
+ EventType.RECV_UPCALL: t6_events[3],
+ EventType.OP_FLOW_PUT: t6_events[4],
+ EventType.OP_FLOW_EXECUTE: t6_events[5],
+ EventType.OVS_PKT_EXEC: t6_events[6]}]
+ #
+ # Test with RecvUpcall's and OVS_PKT_EXEC being batched
+ #
+ t7_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+ DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+ DpUpcall(3, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
+ RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+ RecvUpcall(5, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+ RecvUpcall(6, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
+ Event(7, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
+ Event(9, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(10, 1, "hndl", 1, pkt2, len(pkt2)),
+ Event(11, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+ OpFlowExecute(12, 1, "hndl", 1, pkt3, len(pkt3)),
+ Event(13, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(14, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+ Event(15, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+ t7_result = [{EventType.DP_UPCALL: t7_events[0],
+ EventType.RECV_UPCALL: t7_events[3],
+ EventType.OP_FLOW_PUT: t7_events[6],
+ EventType.OP_FLOW_EXECUTE: t7_events[7],
+ EventType.OVS_PKT_EXEC: t7_events[12]},
+ {EventType.DP_UPCALL: t7_events[1],
+ EventType.RECV_UPCALL: t7_events[4],
+ EventType.OP_FLOW_PUT: t7_events[8],
+ EventType.OP_FLOW_EXECUTE: t7_events[9],
+ EventType.OVS_PKT_EXEC: t7_events[13]},
+ {EventType.DP_UPCALL: t7_events[2],
+ EventType.RECV_UPCALL: t7_events[5],
+ EventType.OP_FLOW_PUT: t7_events[10],
+ EventType.OP_FLOW_EXECUTE: t7_events[11],
+ EventType.OVS_PKT_EXEC: t7_events[14]}]
+ #
+ # Actual test sets
+ #
+ test_set = [["Simple single event", t1_events, t1_result],
+ ["Single event, missing flow_put", t2_events, t2_result],
+ ["Batched events", t3_events, t3_result],
+ ["Single + batched events", t4_events, t4_result],
+ ["Two sets, different threads", t5_events, t5_result],
+ ["Batch with missing exec", t6_events, t6_result],
+ ["Batched events including exec", t7_events, t7_result]]
+
+ print("Running some simple unit tests:")
+
+ for test in test_set:
+ print("- {:<32} ".format(test[0]), end="")
+ result = collect_event_sets(test[1][:])
+ if result == test[2]:
+ print("PASS")
+ else:
+ print("FAIL")
+ print(" OUTPUT :")
+ for event_set in result:
+ hdr = " - "
+ for event_type, event in event_set.items():
+ print("{} {:<16}: {}".format(hdr, event_type.name, event))
+ hdr = " "
+ print(" EXPECTED:")
+ for event_set in test[2]:
+ hdr = " - "
+ for event_type, event in event_set.items():
+ print("{} {:<16}: {}".format(hdr, event_type.name, event))
+ hdr = " "
+
+
+#
+# show_key_value()
+#
+def show_key_value(data_set, description=None):
+ if description is not None:
+ print("\n=> {}:".format(description))
+
+ for k, v in data_set.items():
+ print(" {:36}: {:>10}".format(str(k), str(v)))
+
+
+#
+# show_batch_histogram()
+#
+def show_batch_histogram(data_set, description=None):
+ nr_of_buckets = 64
+
+ if description is not None:
+ print("\n=> {}:".format(description))
+
+ if len(data_set) == 0:
+ print("# NumSamples = 0")
+ return
+
+ min_val = nr_of_buckets
+ max_val = 0
+ entries = 0
+ high_buckets = 0
+ buckets = [0] * nr_of_buckets
+
+ for entry in data_set:
+ min_val = min(min_val, entry)
+ max_val = max(max_val, entry)
+ if entry == 0:
+ continue
+ elif entry > nr_of_buckets:
+ high_buckets += 1
+ else:
+ buckets[entry - 1] += 1
+
+ entries += 1
+
+ if max(buckets + [high_buckets]) > 4:
+ scale = int(max(buckets + [high_buckets]) / 4)
+ else:
+ scale = 1
+
+ print("# NumSamples = {}; Min = {}; Max = {}".format(entries, min_val,
+ max_val))
+ print("# each ∎ represents a count of {}".format(scale))
+
+ for idx in range(int(nr_of_buckets / 2)):
+ idx_2nd = idx + int(nr_of_buckets / 2)
+ print("{:5} [{:8}]: {:22} {:5} [{:8}]: {:22}".format(
+ idx + 1, buckets[idx], "∎" * int(buckets[idx] / scale),
+ idx_2nd + 1, buckets[idx_2nd],
+ "∎" * int(buckets[idx_2nd] / scale)))
+
+ if high_buckets > 0:
+ print("{:>5} [{:8}]: {:22}".format(">" + str(nr_of_buckets),
+ high_buckets,
+ "∎" * int(high_buckets / scale)))
+
+
+#
+# show_histogram()
+#
+def show_histogram(data_set, description=None, options=None,
+ minimum=None, maximum=None, buckets=None, custbuckets=None):
+ if description is not None:
+ print("\n=> {}:".format(description))
+
+ if options is not None:
+ if buckets is None:
+ buckets = options.histogram_buckets
+ if options is not None and options.sets:
+ print(data_set)
+
+ if len(data_set) == 0:
+ print("# NumSamples = 0")
+ elif len(data_set) == 1:
+ print("# NumSamples = 1; Min = {0:.4f}; Max = {0:.4f}".
+ format(data_set[0]))
+ elif len(set(data_set)) == 1 and maximum is None and minimum is None and \
+ custbuckets is None:
+ histogram(data_set, buckets=buckets, minimum=list(set(data_set))[0],
+ maximum=list(set(data_set))[0] + 1)
+ else:
+ histogram(data_set, buckets=buckets,
+ minimum=minimum, maximum=maximum, custbuckets=custbuckets)
+
+
+#
+# buffer_size_type()
+#
+def buffer_size_type(astr, min=64, max=2048):
+ value = int(astr)
+ if min <= value <= max:
+ return value
+ else:
+ raise argparse.ArgumentTypeError(
+ 'value not in range {}-{}'.format(min, max))
+
+
+#
+# next_power_of_two()
+#
+def next_power_of_two(val):
+ np = 1
+ while np < val:
+ np *= 2
+ return np
+
+
+#
+# main()
+#
+def main():
+ #
+ # Don't like these globals, but ctx passing does not seem to work with the
+ # existing open_ring_buffer() API :(
+ #
+ global b
+ global options
+ global trace_data
+ global events_received
+ global event_count
+ global export_file
+
+ #
+ # Argument parsing
+ #
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("-b", "--histogram-buckets",
+ help="Number of buckets per histogram, default 20",
+ type=int, default=20, metavar="BUCKETS")
+ parser.add_argument("--buffer-page-count",
+ help="Number of BPF ring buffer pages, default 1024",
+ type=int, default=1024, metavar="NUMBER")
+ parser.add_argument("-D", "--debug",
+ help="Enable eBPF debugging",
+ type=lambda x: int(x, 0), const=0x3f, default=0,
+ nargs='?')
+ parser.add_argument("-f", "--flow-key-size",
+ help="Set maximum flow key size to capture, "
+ "default 64", type=buffer_size_type, default=64,
+ metavar="[64-2048]")
+ parser.add_argument("--handler-filter",
+ help="Post processing handler thread filter",
+ type=str, default=None, metavar="HANDLERS")
+ parser.add_argument("-P", "--packet-size",
+ help="Set maximum packet size to capture, "
+ "default 256", type=buffer_size_type, default=256,
+ metavar="[64-2048]")
+ parser.add_argument("-p", "--pid", metavar="VSWITCHD_PID",
+ help="ovs-vswitch's PID",
+ type=int, default=None)
+ parser.add_argument("-q", "--quiet", action="store_true",
+ help="Do not show individual events")
+ parser.add_argument("-r", "--read-events",
+ help="Read events from FILE instead of installing "
+ "tracepoints", type=str, default=None, metavar="FILE")
+ parser.add_argument("--sets", action="store_true",
+ help="Dump content of data sets")
+ parser.add_argument("-s", "--stop",
+ help="Stop after receiving EVENTS number of trace "
+ "events",
+ type=int, default=0, metavar="EVENTS")
+ parser.add_argument("--unit-test", action="store_true",
+ help=argparse.SUPPRESS)
+ parser.add_argument("-w", "--write-events",
+ help="Write events to FILE",
+ type=str, default=None, metavar="FILE")
+
+ options = parser.parse_args()
+
+ if options.unit_test:
+ unit_test()
+ sys.exit(0)
+
+ #
+ # Find the PID of the ovs-vswitchd daemon if not specified.
+ #
+ if options.pid is None and options.read_events is None:
+ for proc in psutil.process_iter():
+ if 'ovs-vswitchd' in proc.name():
+ if options.pid is not None:
+ print("ERROR: Multiple ovs-vswitchd daemons running, "
+ "use the -p option!")
+ sys.exit(-1)
+
+ options.pid = proc.pid
+
+ #
+ # Error checking on input parameters.
+ #
+ if options.pid is None and options.read_events is None:
+ print("ERROR: Failed to find ovs-vswitchd's PID!")
+ sys.exit(-1)
+
+ if options.read_events is not None and options.write_events is not None:
+ print("ERROR: Either supply the read or write events option, "
+ "not both!")
+ sys.exit(-1)
+
+ if options.handler_filter is not None and options.read_events is None:
+ print("ERROR: The --handler-filter option is only valid with the "
+ "--read-events option!")
+ sys.exit(-1)
+
+ options.buffer_page_count = next_power_of_two(options.buffer_page_count)
+
+ #
+ # Open write handle if needed.
+ #
+ if options.write_events is not None:
+ try:
+ export_file = open(options.write_events, "w")
+ except (FileNotFoundError, IOError, PermissionError) as e:
+ print("ERROR: Can't create export file \"{}\": {}".format(
+ options.write_events, e.strerror))
+ sys.exit(-1)
+ else:
+ export_file = None
+
+ trace_data = []
+ event_count = {'total': {}, 'valid': {}, 'miss': {}}
+ if options.read_events is None:
+ #
+ # Call get_dp_mapping() to prepare the cache
+ #
+ dp_port_map = get_dp_mapping("ovs-system", "eth0", return_map=True)
+ if export_file is not None:
+ export_file.write("dp_port_map = {}\n".format(dp_port_map))
+
+ #
+ # Attach the usdt probe
+ #
+ u = USDT(pid=int(options.pid))
+ try:
+ u.enable_probe(probe="recv_upcall", fn_name="trace__recv_upcall")
+ u.enable_probe(probe="op_flow_put", fn_name="trace__op_flow_put")
+ u.enable_probe(probe="op_flow_execute",
+ fn_name="trace__op_flow_execute")
+ except USDTException as e:
+ print("ERROR: {}"
+ "ovs-vswitchd!".format(
+ (re.sub('^', ' ' * 7, str(e),
+ flags=re.MULTILINE)).strip().
+ replace("--with-dtrace or --enable-dtrace",
+ "--enable-usdt-probes")))
+ sys.exit(-1)
+
+ #
+ # Uncomment to see how arguments are decoded.
+ # print(u.get_text())
+ #
+ print("- Compiling eBPF programs...")
+
+ #
+ # Attach probes to the running process
+ #
+ source = ebpf_source.replace("<MAX_PACKET_VAL>",
+ str(options.packet_size))
+ source = source.replace("<MAX_KEY_VAL>", str(options.flow_key_size))
+ source = source.replace("<BUFFER_PAGE_CNT>",
+ str(options.buffer_page_count))
+
+ b = BPF(text=source, usdt_contexts=[u], debug=options.debug & 0xffffff)
+
+ #
+ # Dump out all events
+ #
+ print("- Capturing events [Press ^C to stop]...")
+ events_received = 0
+
+ if not options.quiet:
+ print("\n" + Event.get_event_header_str())
+
+ b['events'].open_ring_buffer(receive_event_bcc)
+ while 1:
+ try:
+ b.ring_buffer_poll()
+ if options.stop != 0 and events_received >= options.stop:
+ break
+ time.sleep(0.5)
+ except KeyboardInterrupt:
+ break
+
+ dropcnt = b.get_table("dropcnt")
+ export_misses = {}
+ for k in dropcnt.keys():
+ event = EventType.from_trace(k.value)
+ count = dropcnt.sum(k).value
+ if count > 0:
+ if event not in event_count['total']:
+ event_count['total'][event] = 0
+ event_count['valid'][event] = 0
+ event_count['miss'][event] = count
+ export_misses[k.value] = count
+
+ if options.write_events is not None:
+ if sum(event_count['miss'].values()) > 0:
+ export_file.write("event_miss = {}\n".format(export_misses))
+
+ export_file.close()
+
+ print()
+ else:
+ #
+ # Here we are requested to read event from an event export
+ #
+ thread_filter = None
+ if options.handler_filter is not None:
+ thread_filter = options.handler_filter.split(',')
+
+ try:
+ dp_port_mapping_valid = False
+ with open(options.read_events, 'r') as fd:
+ events_received = 0
+
+ if options.quiet:
+ spinner = Halo(spinner="dots", color="cyan",
+ text="Reading events from \"{}\"...".format(
+ options.read_events))
+ spinner.start()
+ else:
+ print("- Reading events from \"{}\"...".format(
+ options.read_events))
+
+ if not options.quiet:
+ print("\n" + Event.get_event_header_str())
+
+ for entry in fd:
+ if options.stop != 0 and events_received >= options.stop:
+ break
+
+ entry.rstrip()
+ if entry.startswith('dp_port_map = {'):
+ if not dp_port_mapping_valid:
+ dp_port_mapping_valid = True
+ get_dp_mapping("", "",
+ dp_map=ast.literal_eval(entry[14:]))
+ elif (entry.startswith('event = {') and
+ dp_port_mapping_valid):
+ event = ast.literal_eval(entry[8:])
+ event = namedtuple("EventObject",
+ event.keys())(*event.values())
+
+ if thread_filter is not None \
+ and EventType.from_trace(event.event) != \
+ EventType.DP_UPCALL \
+ and event.comm.decode("utf-8") not in thread_filter:
+ # Skip none filtered threads
+ continue
+
+ if len(event.pkt) > 0:
+ options.packet_size = len(event.pkt)
+ if len(event.key) > 0:
+ options.flow_key_size = len(event.key)
+ receive_event(event)
+ events_received += 1
+ elif entry.startswith('event_miss = {'):
+ misses = ast.literal_eval(entry[13:])
+ for e, count in misses.items():
+ event = EventType.from_trace(e)
+ if count > 0:
+ if event not in event_count['total']:
+ event_count['total'][event] = 0
+ event_count['valid'][event] = 0
+ event_count['miss'][event] = count
+
+ if options.quiet:
+ spinner.stop()
+ print("- Reading events from \"{}\"...".format(
+ options.read_events))
+
+ except (FileNotFoundError, PermissionError):
+ print("ERROR: Can't open file \"{}\" for reading!".format(
+ options.read_events))
+ sys.exit(-1)
+
+ #
+ # Start analyzing the data
+ #
+ print("- Analyzing results ({} events)...".format(len(trace_data)))
+
+ if events_received > 0:
+ if sum(event_count['miss'].values()) > 0:
+ print("\nWARNING: Not all events were captured!\n "
+ "Increase the BPF ring buffer size with the "
+ "--buffer-page-count option.")
+
+ print("\n=> Events received per type (usable/total) [missed events]:")
+ for event, total in sorted(event_count['total'].items()):
+ miss = event_count['miss'][event] if event in event_count['miss'] \
+ else 0
+ print(" {:36}: {:10}/{:10} [{:10}]".format(
+ event, event_count['valid'][event], total, miss))
+
+ collection, batch_stats, thread_stats = collect_event_sets(
+ trace_data, collect_stats=True, spinner=True)
+
+ if len(collection) <= 0:
+ print("No upcall data sets where found!!")
+ sys.exit(0)
+
+ print("\n- Analyzing {} event sets...".format(len(collection)))
+
+ if options.debug & 0x1000000 != 0:
+ for upcall in collection:
+ print("DBG: {}{}{}{}{}".format(
+ "U" if EventType.DP_UPCALL in upcall else "-",
+ "u" if EventType.RECV_UPCALL in upcall else "-",
+ "p" if EventType.OP_FLOW_PUT in upcall else "-",
+ "e" if EventType.OP_FLOW_EXECUTE in upcall else "-",
+ "E" if EventType.OVS_PKT_EXEC in upcall else "-"))
+ if options.debug & 0x2000000 != 0:
+ try:
+ print("DBG: - {}".format(upcall[EventType.DP_UPCALL]))
+ print("DBG: - {}".format(upcall[EventType.RECV_UPCALL]))
+ print("DBG: - {}".format(upcall[EventType.OP_FLOW_PUT]))
+ print("DBG: - {}".format(
+ upcall[EventType.OP_FLOW_EXECUTE]))
+ print("DBG: - {}".format(upcall[EventType.OVS_PKT_EXEC]))
+ except LookupError:
+ continue
+
+ show_key_value(thread_stats, description="Upcalls handled per thread")
+ show_batch_histogram(batch_stats,
+ description="Histogram of upcalls per batch")
+
+ kernel_to_vswitchd = []
+ kernel_to_kernel_exec = []
+ vswitchd_to_kernel = []
+ time_minus_lookup = []
+ for upcall in collection:
+ kernel_to_vswitchd.append((upcall[EventType.RECV_UPCALL].ts -
+ upcall[EventType.DP_UPCALL].ts) /
+ 1000)
+
+ if EventType.OP_FLOW_PUT in upcall and \
+ EventType.OVS_PKT_EXEC in upcall:
+ time_minus_lookup.append(
+ ((upcall[EventType.OVS_PKT_EXEC].ts -
+ upcall[EventType.DP_UPCALL].ts) -
+ (upcall[EventType.OP_FLOW_PUT].ts -
+ upcall[EventType.RECV_UPCALL].ts)) / 1000)
+
+ if EventType.OP_FLOW_EXECUTE in upcall and \
+ EventType.OVS_PKT_EXEC in upcall:
+ vswitchd_to_kernel.append((upcall[EventType.OVS_PKT_EXEC].ts
+ - upcall[EventType.OP_FLOW_EXECUTE].ts)
+ / 1000)
+
+ if EventType.OVS_PKT_EXEC in upcall:
+ kernel_to_kernel_exec.append((upcall[EventType.OVS_PKT_EXEC].ts -
+ upcall[EventType.DP_UPCALL].ts) /
+ 1000)
+
+ show_histogram(kernel_to_vswitchd,
+ description="Kernel upcall action to vswitchd receive "
+ "(microseconds)",
+ options=options)
+ show_histogram(vswitchd_to_kernel,
+ description="vswitchd execute to kernel receive "
+ "(microseconds)",
+ options=options)
+ show_histogram(time_minus_lookup,
+ description="Upcall overhead (total time minus lookup) "
+ "(microseconds)",
+ options=options)
+ show_histogram(kernel_to_kernel_exec,
+ description="Kernel upcall to kernel packet execute "
+ "(microseconds)",
+ options=options)
+
+
+#
+# Start main() as the default entry point...
+#
+if __name__ == '__main__':
+ main()