summaryrefslogtreecommitdiff
path: root/tests/twisted/servicetest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/twisted/servicetest.py')
-rw-r--r--tests/twisted/servicetest.py249
1 files changed, 24 insertions, 225 deletions
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index 8a813f50..bca3b13b 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -1,23 +1,6 @@
-# Copyright (C) 2009 Nokia Corporation
-# Copyright (C) 2009-2013 Collabora Ltd.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-# 02110-1301 USA
"""
-Infrastructure code for testing Telepathy services.
+Infrastructure code for testing connection managers.
"""
from twisted.internet import glib2reactor
@@ -25,22 +8,18 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
glib2reactor.install()
import sys
import time
-import os
import pprint
import unittest
-import dbus
-import dbus.lowlevel
-from dbus.mainloop.glib import DBusGMainLoop
-DBusGMainLoop(set_as_default=True)
+import dbus.glib
from twisted.internet import reactor
import constants as cs
-tp_name_prefix = cs.PREFIX
-tp_path_prefix = cs.PATH_PREFIX
+tp_name_prefix = 'org.freedesktop.Telepathy'
+tp_path_prefix = '/org/freedesktop/Telepathy'
class DictionarySupersetOf (object):
"""Utility class for expecting "a dictionary with at least these keys"."""
@@ -61,19 +40,16 @@ class DictionarySupersetOf (object):
except TypeError: # other is not iterable
return False
-class Event(object):
+class Event:
def __init__(self, type, **kw):
self.__dict__.update(kw)
self.type = type
(self.subqueue, self.subtype) = type.split ("-", 1)
- def __str__(self):
- return '\n'.join([ str(type(self)) ] + format_event(self))
-
def format_event(event):
ret = ['- type %s' % event.type]
- for key in sorted(dir(event)):
+ for key in dir(event):
if key != 'type' and not key.startswith('_'):
ret.append('- %s: %s' % (
key, pprint.pformat(getattr(event, key))))
@@ -175,12 +151,6 @@ class BaseEventQueue:
"""
self.forbidden_events.difference_update(set(patterns))
- def unforbid_all(self):
- """
- Remove all patterns from the set of forbidden events.
- """
- self.forbidden_events.clear()
-
def _check_forbidden(self, event):
for e in self.forbidden_events:
if e.match(event):
@@ -309,11 +279,6 @@ class IteratingEventQueue(BaseEventQueue):
def __init__(self, timeout=None):
BaseEventQueue.__init__(self, timeout)
- self._dbus_method_impls = []
- self._buses = []
- # a message filter which will claim we handled everything
- self._dbus_dev_null = \
- lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED
def wait(self, queues=None):
stop = [False]
@@ -338,127 +303,6 @@ class IteratingEventQueue(BaseEventQueue):
else:
raise TimeoutError
- def add_dbus_method_impl(self, cb, bus=None, **kwargs):
- if bus is None:
- bus = self._buses[0]
-
- self._dbus_method_impls.append(
- (EventPattern('dbus-method-call', **kwargs), cb))
-
- def dbus_emit(self, path, iface, name, *a, **k):
- bus = k.pop('bus', self._buses[0])
- assert 'signature' in k, k
- message = dbus.lowlevel.SignalMessage(path, iface, name)
- message.append(*a, **k)
- bus.send_message(message)
-
- def dbus_return(self, in_reply_to, *a, **k):
- bus = k.pop('bus', self._buses[0])
- assert 'signature' in k, k
- reply = dbus.lowlevel.MethodReturnMessage(in_reply_to)
- reply.append(*a, **k)
- bus.send_message(reply)
-
- def dbus_raise(self, in_reply_to, name, message=None, bus=None):
- if bus is None:
- bus = self._buses[0]
-
- reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message)
- bus.send_message(reply)
-
- def attach_to_bus(self, bus):
- if not self._buses:
- # first-time setup
- self._dbus_filter_bound_method = self._dbus_filter
-
- self._buses.append(bus)
-
- # Only subscribe to messages on the first bus connection (assumed to
- # be the shared session bus connection used by the simulated connection
- # manager and most of the test suite), not on subsequent bus
- # connections (assumed to represent extra clients).
- #
- # When we receive a method call on the other bus connections, ignore
- # it - the eavesdropping filter installed on the first bus connection
- # will see it too.
- #
- # This is highly counter-intuitive, but it means our messages are in
- # a guaranteed order (we don't have races between messages arriving on
- # various connections).
- if len(self._buses) > 1:
- bus.add_message_filter(self._dbus_dev_null)
- return
-
- try:
- # for dbus > 1.5
- bus.add_match_string("eavesdrop=true,type='signal'")
- except dbus.DBusException:
- bus.add_match_string("type='signal'")
- bus.add_match_string("type='method_call'")
- else:
- bus.add_match_string("eavesdrop=true,type='method_call'")
-
- bus.add_message_filter(self._dbus_filter_bound_method)
-
- bus.add_signal_receiver(
- lambda *args, **kw:
- self.append(
- Event('dbus-signal',
- path=unwrap(kw['path']),
- signal=kw['member'],
- args=map(unwrap, args),
- interface=kw['interface'])),
- None,
- None,
- None,
- path_keyword='path',
- member_keyword='member',
- interface_keyword='interface',
- byte_arrays=True,
- )
-
- def cleanup(self):
- if self._buses:
- self._buses[0].remove_message_filter(self._dbus_filter_bound_method)
- for bus in self._buses[1:]:
- bus.remove_message_filter(self._dbus_dev_null)
-
- self._buses = []
- self._dbus_method_impls = []
-
- def _dbus_filter(self, bus, message):
- if isinstance(message, dbus.lowlevel.MethodCallMessage):
-
- destination = message.get_destination()
- sender = message.get_sender()
-
- if (destination == 'org.freedesktop.DBus' or
- sender == self._buses[0].get_unique_name()):
- # suppress reply and don't make an Event
- return dbus.lowlevel.HANDLER_RESULT_HANDLED
-
- e = Event('dbus-method-call', message=message,
- interface=message.get_interface(), path=message.get_path(),
- raw_args=message.get_args_list(byte_arrays=True),
- args=map(unwrap, message.get_args_list(byte_arrays=True)),
- destination=str(destination),
- method=message.get_member(),
- sender=message.get_sender(),
- handled=False)
-
- for pair in self._dbus_method_impls:
- pattern, cb = pair
- if pattern.match(e):
- cb(e)
- e.handled = True
- break
-
- self.append(e)
-
- return dbus.lowlevel.HANDLER_RESULT_HANDLED
-
- return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
-
class TestEventQueue(BaseEventQueue):
def __init__(self, events):
BaseEventQueue.__init__(self)
@@ -569,23 +413,20 @@ def call_async(test, proxy, method, *args, **kw):
kw.update({'reply_handler': reply_func, 'error_handler': error_func})
method_proxy(*args, **kw)
-def sync_dbus(bus, q, proxy):
- # Dummy D-Bus method call. We can't use DBus.Peer.Ping() because libdbus
- # replies to that message immediately, rather than handing it up to
- # dbus-glib and thence the application, which means that Ping()ing the
- # application doesn't ensure that it's processed all D-Bus messages prior
- # to our ping.
- call_async(q, dbus.Interface(proxy, 'org.freedesktop.Telepathy.Tests'),
- 'DummySyncDBus')
+def sync_dbus(bus, q, conn):
+ # Dummy D-Bus method call
+ # This won't do the right thing unless the proxy has a unique name.
+ assert conn.object.bus_name.startswith(':')
+ root_object = bus.get_object(conn.object.bus_name, '/')
+ call_async(
+ q, dbus.Interface(root_object, 'org.freedesktop.Telepathy.Tests'), 'DummySyncDBus')
q.expect('dbus-error', method='DummySyncDBus')
class ProxyWrapper:
- def __init__(self, object, default, others={}):
+ def __init__(self, object, default, others):
self.object = object
self.default_interface = dbus.Interface(object, default)
self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
- self.TpProperties = \
- dbus.Interface(object, tp_name_prefix + '.Properties')
self.interfaces = dict([
(name, dbus.Interface(object, iface))
for name, iface in others.iteritems()])
@@ -599,47 +440,27 @@ class ProxyWrapper:
return getattr(self.default_interface, name)
-class ConnWrapper(ProxyWrapper):
- def inspect_contact_sync(self, handle):
- return self.inspect_contacts_sync([handle])[0]
-
- def inspect_contacts_sync(self, handles):
- h2asv = self.Contacts.GetContactAttributes(handles, [])
- ret = []
- for h in handles:
- ret.append(h2asv[h][cs.ATTR_CONTACT_ID])
- return ret
-
- def get_contact_handle_sync(self, identifier):
- return self.Contacts.GetContactByID(identifier, [])[0]
-
- def get_contact_handles_sync(self, ids):
- return [self.get_contact_handle_sync(i) for i in ids]
-
def wrap_connection(conn):
- return ConnWrapper(conn, tp_name_prefix + '.Connection',
- dict(
+ return ProxyWrapper(conn, tp_name_prefix + '.Connection',
+ dict([
+ (name, tp_name_prefix + '.Connection.Interface.' + name)
+ for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
+ 'Presence', 'SimplePresence', 'Requests']] +
[('Peer', 'org.freedesktop.DBus.Peer'),
- ('Aliasing', cs.CONN_IFACE_ALIASING),
- ('Avatars', cs.CONN_IFACE_AVATARS),
- ('Contacts', cs.CONN_IFACE_CONTACTS),
('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS),
('ContactInfo', cs.CONN_IFACE_CONTACT_INFO),
('Location', cs.CONN_IFACE_LOCATION),
- ('Presence', cs.CONN_IFACE_PRESENCE),
- ('Requests', cs.CONN_IFACE_REQUESTS),
('Future', tp_name_prefix + '.Connection.FUTURE'),
('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION),
('ContactList', cs.CONN_IFACE_CONTACT_LIST),
('ContactGroups', cs.CONN_IFACE_CONTACT_GROUPS),
('PowerSaving', cs.CONN_IFACE_POWER_SAVING),
- ('Addressing', cs.CONN_IFACE_ADDRESSING),
]))
def wrap_channel(chan, type_, extra=None):
interfaces = {
type_: tp_name_prefix + '.Channel.Type.' + type_,
- 'Group': cs.CHANNEL_IFACE_GROUP,
+ 'Group': tp_name_prefix + '.Channel.Interface.Group',
}
if extra:
@@ -649,26 +470,14 @@ def wrap_channel(chan, type_, extra=None):
return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)
-
-def wrap_content(chan, extra=None):
- interfaces = { }
-
- if extra:
- interfaces.update(dict([
- (name, tp_name_prefix + '.Call1.Content.Interface.' + name)
- for name in extra]))
-
- return ProxyWrapper(chan, tp_name_prefix + '.Call1.Content', interfaces)
-
def make_connection(bus, event_func, name, proto, params):
cm = bus.get_object(
tp_name_prefix + '.ConnectionManager.%s' % name,
- tp_path_prefix + '/ConnectionManager/%s' % name,
- introspect=False)
+ tp_path_prefix + '/ConnectionManager/%s' % name)
cm_iface = dbus.Interface(cm, tp_name_prefix + '.ConnectionManager')
connection_name, connection_path = cm_iface.RequestConnection(
- proto, dbus.Dictionary(params, signature='sv'))
+ proto, params)
conn = wrap_connection(bus.get_object(connection_name, connection_path))
return conn
@@ -815,16 +624,6 @@ def install_colourer():
sys.stdout = Colourer(sys.stdout, patterns)
return sys.stdout
-# this is just to shut up unittest.
-class DummyStream(object):
- def write(self, s):
- if 'CHECK_TWISTED_VERBOSE' in os.environ:
- print s,
-
- def flush(self):
- pass
-
if __name__ == '__main__':
- stream = DummyStream()
- runner = unittest.TextTestRunner(stream=stream)
- unittest.main(testRunner=runner)
+ unittest.main()
+