diff options
author | Dan Winship <danw@gnome.org> | 2014-08-29 12:27:47 -0400 |
---|---|---|
committer | Dan Winship <danw@gnome.org> | 2014-09-09 12:16:38 -0400 |
commit | c9fb96e02ec9a1c644a3482980002e6cff021a0a (patch) | |
tree | ef484a329dd9e16e15cd045682f92a2616b78462 /tools/test-networkmanager-service.py | |
parent | 79093c655b49cb136b50943bbdaaf8585b283fac (diff) | |
download | NetworkManager-c9fb96e02ec9a1c644a3482980002e6cff021a0a.tar.gz |
libnm: add test-secret-agent
Implement some basic secret agent functionality in
test-networkmanager-service.py, and add test-secret-agent to test that
NMSecretAgent works as expected.
Diffstat (limited to 'tools/test-networkmanager-service.py')
-rwxr-xr-x | tools/test-networkmanager-service.py | 92 |
1 files changed, 90 insertions, 2 deletions
diff --git a/tools/test-networkmanager-service.py b/tools/test-networkmanager-service.py index 2f80ca516c..7716dca9ac 100755 --- a/tools/test-networkmanager-service.py +++ b/tools/test-networkmanager-service.py @@ -517,6 +517,9 @@ class PermissionDeniedException(dbus.DBusException): class UnknownDeviceException(dbus.DBusException): _dbus_error_name = IFACE_NM + '.UnknownDevice' +class UnknownConnectionException(dbus.DBusException): + _dbus_error_name = IFACE_NM + '.UnknownConnection' + PM_DEVICES = 'Devices' PM_NETWORKING_ENABLED = 'NetworkingEnabled' PM_WWAN_ENABLED = 'WwanEnabled' @@ -575,6 +578,26 @@ class NetworkManager(ExportedObj): break if not device: raise UnknownDeviceException("No device found for the requested iface.") + + try: + connection = settings.get_connection(conpath) + except Exception as e: + raise UnknownConnectionException("Connection not found") + + # See if we need secrets. For the moment, we only support WPA + hash = connection.GetSettings() + if hash.has_key('802-11-wireless-security'): + s_wsec = hash['802-11-wireless-security'] + if (s_wsec['key-mgmt'] == 'wpa-psk' and not s_wsec.has_key('psk')): + secrets = agent_manager.get_secrets(hash, conpath, '802-11-wireless-security') + if secrets is None: + raise NoSecretsException("No secret agent available") + if not secrets.has_key('802-11-wireless-security'): + raise NoSecretsException("No secrets provided") + s_wsec = secrets['802-11-wireless-security'] + if not s_wsec.has_key('psk'): + raise NoSecretsException("No secrets provided") + raise PermissionDeniedException("Not yet implemented") @dbus.service.method(dbus_interface=IFACE_NM, in_signature='a{sa{sv}}oo', out_signature='oo') @@ -586,7 +609,9 @@ class NetworkManager(ExportedObj): break if not device: raise UnknownDeviceException("No device found for the requested iface.") - raise PermissionDeniedException("Not yet implemented") + + conpath = manager.AddConnection(connection) + return self.ActivateConnection(conpath, devpath, specific_object) @dbus.service.method(dbus_interface=IFACE_NM, in_signature='o', out_signature='') def DeactivateConnection(self, active_connection): @@ -822,6 +847,9 @@ class Settings(dbus.service.Object): def auto_remove_next_connection(self): self.remove_next_connection = True; + def get_connection(self, path): + return self.connections[path] + @dbus.service.method(dbus_interface=IFACE_SETTINGS, in_signature='', out_signature='ao') def ListConnections(self): return self.connections.keys() @@ -871,6 +899,65 @@ class Settings(dbus.service.Object): mainloop.quit() ################################################################### +IFACE_AGENT_MANAGER = 'org.freedesktop.NetworkManager.AgentManager' +IFACE_AGENT = 'org.freedesktop.NetworkManager.SecretAgent' + +PATH_SECRET_AGENT = '/org/freedesktop/NetworkManager/SecretAgent' + +FLAG_ALLOW_INTERACTION = 0x1 +FLAG_REQUEST_NEW = 0x2 +FLAG_USER_REQUESTED = 0x4 + +class NoSecretsException(dbus.DBusException): + _dbus_error_name = IFACE_AGENT_MANAGER + '.NoSecrets' + +class UserCanceledException(dbus.DBusException): + _dbus_error_name = IFACE_AGENT_MANAGER + '.UserCanceled' + +class AgentManager(dbus.service.Object): + def __init__(self, bus, object_path): + dbus.service.Object.__init__(self, bus, object_path) + self.agents = {} + self.bus = bus + + @dbus.service.method(dbus_interface=IFACE_AGENT_MANAGER, + in_signature='s', out_signature='', + sender_keyword='sender') + def Register(self, name, sender=None): + self.RegisterWithCapabilities(name, 0, sender) + + @dbus.service.method(dbus_interface=IFACE_AGENT_MANAGER, + in_signature='su', out_signature='', + sender_keyword='sender') + def RegisterWithCapabilities(self, name, caps, sender=None): + self.agents[sender] = self.bus.get_object(sender, PATH_SECRET_AGENT) + + @dbus.service.method(dbus_interface=IFACE_AGENT_MANAGER, + in_signature='', out_signature='', + sender_keyword='sender') + def Unregister(self, sender=None): + del self.agents[sender] + + def get_secrets(self, connection, path, setting_name): + if len(self.agents) == 0: + return None + + secrets = {} + for sender in self.agents: + agent = self.agents[sender] + try: + secrets = agent.GetSecrets(connection, path, setting_name, + dbus.Array([], 's'), + FLAG_ALLOW_INTERACTION | FLAG_USER_REQUESTED, + dbus_interface=IFACE_AGENT) + break + except dbus.DBusException as e: + if e.get_dbus_name() == IFACE_AGENT + '.UserCanceled': + raise UserCanceledException('User canceled') + continue + return secrets + +################################################################### def stdin_cb(io, condition): mainloop.quit() @@ -885,9 +972,10 @@ def main(): bus = dbus.SessionBus() - global manager, settings + global manager, settings, agent_manager manager = NetworkManager(bus, "/org/freedesktop/NetworkManager") settings = Settings(bus, "/org/freedesktop/NetworkManager/Settings") + agent_manager = AgentManager(bus, "/org/freedesktop/NetworkManager/AgentManager") if not bus.request_name("org.freedesktop.NetworkManager"): sys.exit(1) |