path: root/examples
diff options
authorThomas Haller <>2019-01-08 11:10:25 +0100
committerThomas Haller <>2019-02-22 11:00:11 +0100
commitdebd022a6d23e372d7940231e12f17bc5578c3d4 (patch)
tree97000a1ee0c14bda7013e139505c95a565ef9aea /examples
parent395a78618b6d42c059b52aee129a7777922469eb (diff)
examples: add python example script "nm-wg-set" for modifying WireGuard profile
Use the script to test how GObject introspection with libnm's WireGuard support works. Also, since support for WireGuard peers is not yet implemented in nmcli (or other clients), this script is rather useful.
Diffstat (limited to 'examples')
1 files changed, 423 insertions, 0 deletions
diff --git a/examples/python/gi/nm-wg-set b/examples/python/gi/nm-wg-set
new file mode 100755
index 0000000000..85376eada3
--- /dev/null
+++ b/examples/python/gi/nm-wg-set
@@ -0,0 +1,423 @@
+#!/usr/bin/env python
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+# Copyright 2018 - 2019 Red Hat, Inc.
+# nm-wg-set: modify an existing WireGuard connection profile.
+# $ nm-wg-set [id|uuid|interface] ID [wg-args...]
+# The arguments to set the parameters are like the set parameters from `man 8 wg`.
+# For example:
+# $ nm-wg-set wg0 peer wN8G5HpphoXOGkiXTgBPyr9BhrRm2z9JEI6BiH6fB0g= preshared-key <(wg genpsk)
+# extra, script specific arguments:
+# - private-key-flags
+# - preshared-key-flags
+# Note that the arguments have some simliarities to `wg set` command. But this
+# script only modify the connection profile in NetworkManager. They don't (re)activate
+# the profile and thus the changes only result in the configuration of the kernel interface
+# after activating the profile. Use `nmcli connection up` for that.
+# The example script does not support creating or deleting the WireGuard profile itself. It also
+# does not support modifying other settings of the connection profile, like the IP address configuation.
+# For that also use nmcli. For example:
+# PROFILE=wg0
+# # create the WireGuard profile with nmcli
+# PRIVKEY_FILE=/tmp/wg.key
+# (umask 077; rm -f "$PRIVKEY_FILE"; wg genkey > "$PRIVKEY_FILE")
+# IFNAME=wg0
+# PUBKEY=$(wg pubkey < "$PRIVKEY_FILE")
+# IP4GW=
+# nmcli connection delete id "$PROFILE"
+# nmcli connection add \
+# type wireguard \
+# con-name "$PROFILE" \
+# ifname "$IFNAME" \
+# connection.stable-id "$PROFILE-$PUBKEY" \
+# ipv4.method manual \
+# ipv4.addresses "$IP4ADDR" \
+# ipv4.gateway "$IP4GW" \
+# ipv4.never-default yes \
+# ipv6.method link-local \
+# wireguard.listen-port 0 \
+# wireguard.fwmark 0 \
+# wireguard.private-key '' \
+# wireguard.private-key-flags 0
+# nmcli connection up \
+# id "$PROFILE" \
+# passwd-file <(echo "wireguard.private-key:$(cat "$PRIVKEY_FILE")")
+# # modify the WireGuard profile with the script
+# nm-wg-set id "$PROFILE" $WG_ARGS
+import sys
+import re
+import gi
+gi.require_version('NM', '1.0')
+from gi.repository import NM
+class MyError(Exception):
+ pass
+def pr(v):
+ import pprint
+ pprint.pprint(v, indent=4, depth=5, width=60)
+def connection_is_wireguard(conn):
+ s_con = conn.get_setting(NM.SettingConnection)
+ return s_con \
+ and s_con.get_connection_type() == NM.SETTING_WIREGUARD_SETTING_NAME \
+ and conn.get_setting(NM.SettingWireGuard)
+def connection_to_str(conn):
+ if connection_is_wireguard(conn):
+ iface = conn.get_setting(NM.SettingConnection).get_interface_name()
+ if iface:
+ extra = ', interface: "%s"' % (iface)
+ else:
+ extra = ''
+ else:
+ extra = ', type: %s' % (conn.get_setting(NM.SettingConnection).get_connection_type())
+ return '"%s" (%s%s)' % (conn.get_id(), conn.get_uuid(), extra)
+def connections_find(connections, con_spec, con_id):
+ connections = list(sorted(connections, key=connection_to_str))
+ l = []
+ if con_spec in [None, 'id']:
+ for c in connections:
+ if con_id == c.get_id():
+ if c not in l:
+ l.append(c)
+ if con_spec in [None, 'interface']:
+ for c in connections:
+ s_con = c.get_setting(NM.SettingConnection)
+ if s_con \
+ and con_id == s_con.get_interface_name():
+ if c not in l:
+ l.append(c)
+ if con_spec in [None, 'uuid']:
+ for c in connections:
+ if con_id == c.get_uuid():
+ if c not in l:
+ l.append(c)
+ return l
+def argv_get_one(argv, idx, type_ctor=None, topic=None):
+ if topic is not None:
+ try:
+ v = argv_get_one(argv, idx, type_ctor, None)
+ except MyError as e:
+ if isinstance(topic, (int, long)):
+ topic = argv[topic]
+ raise MyError('error for "%s": %s' % (topic, e.message))
+ return v
+ v = None
+ try:
+ v = argv[idx]
+ except:
+ raise MyError('missing argument')
+ if type_ctor is not None:
+ try:
+ v = type_ctor(v)
+ except Exception as e:
+ raise MyError('invalid argument "%s" (%s)' % (v, e.message))
+ return v
+def arg_parse_secret_flags(arg):
+ try:
+ f = arg.strip()
+ n = {
+ 'none': NM.SettingSecretFlags.NONE,
+ 'not-saved': NM.SettingSecretFlags.NOT_SAVED,
+ 'not-required': NM.SettingSecretFlags.NOT_REQUIRED,
+ 'agent-owned': NM.SettingSecretFlags.AGENT_OWNED,
+ }.get(f)
+ if n is not None:
+ return n
+ return NM.SettingSecretFlags(int(f))
+ except Exception as e:
+ raise MyError('invalid secret flags "%s"' % (arg))
+def _arg_parse_int(arg, vmin, vmax, key, base = 0):
+ try:
+ v = int(arg, base)
+ if v >= vmin and vmax <= 0xFFFFFFFF:
+ return v
+ except:
+ raise MyError('invalid %s "%s"' % (key, arg))
+ raise MyError("%s out of range" % (key))
+def arg_parse_listen_port(arg):
+ return _arg_parse_int(arg, 0, 0xFFFF, "listen-port")
+def arg_parse_fwmark(arg):
+ return _arg_parse_int(arg, 0, 0xFFFFFFFF, "fwmark", base = 0)
+def arg_parse_persistent_keep_alive(arg):
+ return _arg_parse_int(arg, 0, 0xFFFFFFFF, "persistent-keepalive")
+def arg_parse_allowed_ips(arg):
+ l = [s.strip() for s in arg.strip().split(',')]
+ l = [s for s in l if s != '']
+ l = list(l)
+ # use a peer to parse and validate the allowed-ips.
+ peer = NM.WireGuardPeer()
+ for aip in l:
+ if not peer.append_allowed_ip(aip, False):
+ raise MyError('invalid allowed-ip "%s"' % (aip))
+ return l
+def secret_flags_to_string(flags):
+ nick = {
+ NM.SettingSecretFlags.NONE: 'none',
+ NM.SettingSecretFlags.NOT_SAVED: 'not-saved',
+ NM.SettingSecretFlags.NOT_REQUIRED: 'not-required',
+ NM.SettingSecretFlags.AGENT_OWNED: 'agent-owned',
+ }.get(flags)
+ num = str(int(flags))
+ if nick is None:
+ return num
+ return '%s (%s)' % (num, nick)
+def wg_read_private_key(privkey_file):
+ import base64
+ try:
+ with open(privkey_file, "r") as f:
+ data =
+ bdata = base64.decodestring(data)
+ if len(bdata) != 32:
+ raise Exception("not 32 bytes base64 encoded")
+ return base64.encodestring(bdata).strip()
+ except Exception as e:
+ raise MyError('failed to read private key "%s": %s' % (privkey_file, e.message))
+def wg_peer_is_valid(peer, msg = None):
+ try:
+ peer.is_valid(True, True)
+ except gi.repository.GLib.Error as e:
+ if msg is None:
+ raise MyError('%s' % (e.message))
+ else:
+ raise MyError('%s' % (msg))
+def do_get(nm_client, connection):
+ s_con = conn.get_setting(NM.SettingConnection)
+ s_wg = conn.get_setting(NM.SettingWireGuard)
+ # Fetching secrets is not implemented. For now show them all as
+ # <hidden>.
+ print('interface: %s' % (s_con.get_interface_name()))
+ print('uuid: %s' % (conn.get_uuid()))
+ print('id: %s' % (conn.get_id()))
+ print('private-key: %s' % ('<hidden>'))
+ print('private-key-flags: %s' % (secret_flags_to_string(s_wg.get_private_key_flags())))
+ print('listen-port: %s' % (s_wg.get_listen_port()))
+ print('fwmark: 0x%x' % (s_wg.get_fwmark()))
+ for i in range(s_wg.get_peers_len()):
+ peer = s_wg.get_peer(i)
+ print('peer[%d].public-key: %s' % (i, peer.get_public_key()))
+ print('peer[%d].preshared-key: %s' % (i, '<hidden>' if peer.get_preshared_key_flags() != NM.SettingSecretFlags.NOT_REQUIRED else ''))
+ print('peer[%d].preshared-key-flags: %s' % (i, secret_flags_to_string(peer.get_preshared_key_flags())))
+ print('peer[%d].endpoint: %s' % (i, peer.get_endpoint() if peer.get_endpoint() else ''))
+ print('peer[%d].persistent-keepalive: %s' % (i, peer.get_persistent_keepalive()))
+ print('peer[%d].allowed-ips: %s' % (i, ','.join([peer.get_allowed_ip(j) for j in range(peer.get_allowed_ips_len())])))
+def do_set(nm_client, conn, argv):
+ s_wg = conn.get_setting(NM.SettingWireGuard)
+ peer = None
+ peer_remove = False
+ peer_idx = None
+ peer_secret_flags = None
+ try:
+ idx = 0
+ while True:
+ if peer \
+ and ( idx >= len(argv) \
+ or argv[idx] == 'peer'):
+ if peer_remove:
+ pp_peer, pp_idx = s_wg.get_peer_by_public_key(peer.get_public_key())
+ if pp_peer:
+ s_wg.remove_peer(pp_idx)
+ else:
+ if peer_secret_flags is not None:
+ peer.set_preshared_key_flags(peer_secret_flags)
+ wg_peer_is_valid(peer)
+ if peer_idx is None:
+ s_wg.append_peer(peer)
+ else:
+ s_wg.set_peer(peer, peer_idx)
+ peer = None
+ peer_remove = False
+ peer_idx = None
+ peer_secret_flags = None
+ if idx >= len(argv):
+ break;
+ if not peer and argv[idx] == 'private-key':
+ key = argv_get_one(argv, idx + 1, None, idx)
+ if key == '':
+ s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, None)
+ else:
+ s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY, wg_read_private_key(key))
+ idx += 2
+ continue
+ if not peer and argv[idx] == 'private-key-flags':
+ s_wg.set_property(NM.SETTING_WIREGUARD_PRIVATE_KEY_FLAGS, argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx))
+ idx += 2
+ continue
+ if not peer and argv[idx] == 'listen-port':
+ s_wg.set_property(NM.SETTING_WIREGUARD_LISTEN_PORT, argv_get_one(argv, idx + 1, arg_parse_listen_port, idx))
+ idx += 2
+ continue
+ if not peer and argv[idx] == 'fwmark':
+ s_wg.set_property(NM.SETTING_WIREGUARD_FWMARK, argv_get_one(argv, idx + 1, arg_parse_fwmark, idx))
+ idx += 2
+ continue
+ if argv[idx] == 'peer':
+ public_key = argv_get_one(argv, idx + 1, None, idx)
+ peer, peer_idx = s_wg.get_peer_by_public_key(public_key)
+ if peer:
+ peer = peer.new_clone(True)
+ else:
+ peer_idx = None
+ peer = NM.WireGuardPeer()
+ peer.set_public_key(public_key)
+ wg_peer_is_valid(peer, 'public key "%s" is invalid' % (public_key))
+ peer_remove = False
+ idx += 2
+ continue
+ if peer and argv[idx] == 'remove':
+ peer_remove = True
+ idx += 1
+ continue
+ if peer and argv[idx] == 'preshared-key':
+ psk = argv_get_one(argv, idx + 1, None, idx)
+ if psk == '':
+ peer.set_preshared_key(None)
+ if peer_secret_flags is not None:
+ peer_secret_flags = NM.SettingSecretFlags.NOT_REQUIRED
+ else:
+ peer.set_preshared_key(wg_read_private_key(psk))
+ if peer_secret_flags is not None:
+ peer_secret_flags = NM.SettingSecretFlags.NONE
+ idx += 2
+ continue
+ if peer and argv[idx] == 'preshared-key-flags':
+ peer_secret_flags = argv_get_one(argv, idx + 1, arg_parse_secret_flags, idx)
+ idx += 2
+ continue
+ if peer and argv[idx] == 'endpoint':
+ peer.set_endpoint(argv_get_one(argv, idx + 1, None, idx))
+ idx += 2
+ continue
+ if peer and argv[idx] == 'persistent-keepalive':
+ peer.set_persistent_keepalive(argv_get_one(argv, idx + 1, arg_parse_persistent_keep_alive, idx))
+ idx += 2
+ continue
+ if peer and argv[idx] == 'allowed-ips':
+ allowed_ips = list(argv_get_one(argv, idx + 1, arg_parse_allowed_ips, idx))
+ peer.clear_allowed_ips()
+ for aip in allowed_ips:
+ peer.append_allowed_ip(aip, False)
+ del allowed_ips
+ idx += 2
+ continue
+ raise MyError('invalid argument "%s"' % (argv[idx]))
+ except MyError as e:
+ print('Error: %s' % (e.message))
+ sys.exit(1)
+ try:
+ conn.commit_changes(True, None)
+ except Exception as e:
+ print('failure to commit connection: %s' % (e))
+ sys.exit(1)
+ print('Success')
+ sys.exit(0)
+if __name__ == '__main__':
+ argv = sys.argv
+ del argv[0]
+ con_spec = None
+ if len(argv) >= 1:
+ if argv[0] in [ 'id', 'uuid', 'interface' ]:
+ con_spec = argv[0]
+ del argv[0]
+ if len(argv) < 1:
+ print('Requires an existing NetworkManager connection profile as first argument')
+ print('Select it based on the connection ID, UUID, or interface-name (optionally qualify the selection with [id|uuid|interface])')
+ print('Maybe you want to create one first with')
+ print(' nmcli connection add type wireguard ifname wg0 $MORE_ARGS')
+ sys.exit(1)
+ con_id = argv[0]
+ del argv[0]
+ nm_client =
+ connections = connections_find(nm_client.get_connections(), con_spec, con_id)
+ if len(connections) == 0:
+ print('No matching connection %s\"%s\" found.' % ((con_spec+' ' if con_spec else ''), con_id))
+ print('Maybe you want to create one first with')
+ print(' nmcli connection add type wireguard ifname wg0 $MORE_ARGS')
+ sys.exit(1)
+ if len(connections) > 1:
+ print("Connection %s\"%s\" is not unique (%s)" % ((con_spec+' ' if con_spec else ''), con_id, ', '.join(['['+connection_to_str(c)+']' for c in connections])))
+ if not con_spec:
+ print('Maybe qualify the name with [id|uuid|interface]?')
+ sys.exit(1)
+ conn = connections[0]
+ if not connection_is_wireguard(conn):
+ print('Connection %s is not a WireGuard profile' % (connection_to_str(conn)))
+ print('See available profiles with `nmcli connection show`')
+ sys.exit(1)
+ if not argv:
+ do_get(nm_client, conn)
+ else:
+ do_set(nm_client, conn, argv)