diff options
Diffstat (limited to 'tests/twisted/servicetest.py')
-rw-r--r-- | tests/twisted/servicetest.py | 249 |
1 files changed, 24 insertions, 225 deletions
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py index 8a813f50..bca3b13b 100644 --- a/tests/twisted/servicetest.py +++ b/tests/twisted/servicetest.py @@ -1,23 +1,6 @@ -# Copyright (C) 2009 Nokia Corporation -# Copyright (C) 2009-2013 Collabora Ltd. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA -# 02110-1301 USA """ -Infrastructure code for testing Telepathy services. +Infrastructure code for testing connection managers. """ from twisted.internet import glib2reactor @@ -25,22 +8,18 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory glib2reactor.install() import sys import time -import os import pprint import unittest -import dbus -import dbus.lowlevel -from dbus.mainloop.glib import DBusGMainLoop -DBusGMainLoop(set_as_default=True) +import dbus.glib from twisted.internet import reactor import constants as cs -tp_name_prefix = cs.PREFIX -tp_path_prefix = cs.PATH_PREFIX +tp_name_prefix = 'org.freedesktop.Telepathy' +tp_path_prefix = '/org/freedesktop/Telepathy' class DictionarySupersetOf (object): """Utility class for expecting "a dictionary with at least these keys".""" @@ -61,19 +40,16 @@ class DictionarySupersetOf (object): except TypeError: # other is not iterable return False -class Event(object): +class Event: def __init__(self, type, **kw): self.__dict__.update(kw) self.type = type (self.subqueue, self.subtype) = type.split ("-", 1) - def __str__(self): - return '\n'.join([ str(type(self)) ] + format_event(self)) - def format_event(event): ret = ['- type %s' % event.type] - for key in sorted(dir(event)): + for key in dir(event): if key != 'type' and not key.startswith('_'): ret.append('- %s: %s' % ( key, pprint.pformat(getattr(event, key)))) @@ -175,12 +151,6 @@ class BaseEventQueue: """ self.forbidden_events.difference_update(set(patterns)) - def unforbid_all(self): - """ - Remove all patterns from the set of forbidden events. - """ - self.forbidden_events.clear() - def _check_forbidden(self, event): for e in self.forbidden_events: if e.match(event): @@ -309,11 +279,6 @@ class IteratingEventQueue(BaseEventQueue): def __init__(self, timeout=None): BaseEventQueue.__init__(self, timeout) - self._dbus_method_impls = [] - self._buses = [] - # a message filter which will claim we handled everything - self._dbus_dev_null = \ - lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED def wait(self, queues=None): stop = [False] @@ -338,127 +303,6 @@ class IteratingEventQueue(BaseEventQueue): else: raise TimeoutError - def add_dbus_method_impl(self, cb, bus=None, **kwargs): - if bus is None: - bus = self._buses[0] - - self._dbus_method_impls.append( - (EventPattern('dbus-method-call', **kwargs), cb)) - - def dbus_emit(self, path, iface, name, *a, **k): - bus = k.pop('bus', self._buses[0]) - assert 'signature' in k, k - message = dbus.lowlevel.SignalMessage(path, iface, name) - message.append(*a, **k) - bus.send_message(message) - - def dbus_return(self, in_reply_to, *a, **k): - bus = k.pop('bus', self._buses[0]) - assert 'signature' in k, k - reply = dbus.lowlevel.MethodReturnMessage(in_reply_to) - reply.append(*a, **k) - bus.send_message(reply) - - def dbus_raise(self, in_reply_to, name, message=None, bus=None): - if bus is None: - bus = self._buses[0] - - reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message) - bus.send_message(reply) - - def attach_to_bus(self, bus): - if not self._buses: - # first-time setup - self._dbus_filter_bound_method = self._dbus_filter - - self._buses.append(bus) - - # Only subscribe to messages on the first bus connection (assumed to - # be the shared session bus connection used by the simulated connection - # manager and most of the test suite), not on subsequent bus - # connections (assumed to represent extra clients). - # - # When we receive a method call on the other bus connections, ignore - # it - the eavesdropping filter installed on the first bus connection - # will see it too. - # - # This is highly counter-intuitive, but it means our messages are in - # a guaranteed order (we don't have races between messages arriving on - # various connections). - if len(self._buses) > 1: - bus.add_message_filter(self._dbus_dev_null) - return - - try: - # for dbus > 1.5 - bus.add_match_string("eavesdrop=true,type='signal'") - except dbus.DBusException: - bus.add_match_string("type='signal'") - bus.add_match_string("type='method_call'") - else: - bus.add_match_string("eavesdrop=true,type='method_call'") - - bus.add_message_filter(self._dbus_filter_bound_method) - - bus.add_signal_receiver( - lambda *args, **kw: - self.append( - Event('dbus-signal', - path=unwrap(kw['path']), - signal=kw['member'], - args=map(unwrap, args), - interface=kw['interface'])), - None, - None, - None, - path_keyword='path', - member_keyword='member', - interface_keyword='interface', - byte_arrays=True, - ) - - def cleanup(self): - if self._buses: - self._buses[0].remove_message_filter(self._dbus_filter_bound_method) - for bus in self._buses[1:]: - bus.remove_message_filter(self._dbus_dev_null) - - self._buses = [] - self._dbus_method_impls = [] - - def _dbus_filter(self, bus, message): - if isinstance(message, dbus.lowlevel.MethodCallMessage): - - destination = message.get_destination() - sender = message.get_sender() - - if (destination == 'org.freedesktop.DBus' or - sender == self._buses[0].get_unique_name()): - # suppress reply and don't make an Event - return dbus.lowlevel.HANDLER_RESULT_HANDLED - - e = Event('dbus-method-call', message=message, - interface=message.get_interface(), path=message.get_path(), - raw_args=message.get_args_list(byte_arrays=True), - args=map(unwrap, message.get_args_list(byte_arrays=True)), - destination=str(destination), - method=message.get_member(), - sender=message.get_sender(), - handled=False) - - for pair in self._dbus_method_impls: - pattern, cb = pair - if pattern.match(e): - cb(e) - e.handled = True - break - - self.append(e) - - return dbus.lowlevel.HANDLER_RESULT_HANDLED - - return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED - class TestEventQueue(BaseEventQueue): def __init__(self, events): BaseEventQueue.__init__(self) @@ -569,23 +413,20 @@ def call_async(test, proxy, method, *args, **kw): kw.update({'reply_handler': reply_func, 'error_handler': error_func}) method_proxy(*args, **kw) -def sync_dbus(bus, q, proxy): - # Dummy D-Bus method call. We can't use DBus.Peer.Ping() because libdbus - # replies to that message immediately, rather than handing it up to - # dbus-glib and thence the application, which means that Ping()ing the - # application doesn't ensure that it's processed all D-Bus messages prior - # to our ping. - call_async(q, dbus.Interface(proxy, 'org.freedesktop.Telepathy.Tests'), - 'DummySyncDBus') +def sync_dbus(bus, q, conn): + # Dummy D-Bus method call + # This won't do the right thing unless the proxy has a unique name. + assert conn.object.bus_name.startswith(':') + root_object = bus.get_object(conn.object.bus_name, '/') + call_async( + q, dbus.Interface(root_object, 'org.freedesktop.Telepathy.Tests'), 'DummySyncDBus') q.expect('dbus-error', method='DummySyncDBus') class ProxyWrapper: - def __init__(self, object, default, others={}): + def __init__(self, object, default, others): self.object = object self.default_interface = dbus.Interface(object, default) self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE) - self.TpProperties = \ - dbus.Interface(object, tp_name_prefix + '.Properties') self.interfaces = dict([ (name, dbus.Interface(object, iface)) for name, iface in others.iteritems()]) @@ -599,47 +440,27 @@ class ProxyWrapper: return getattr(self.default_interface, name) -class ConnWrapper(ProxyWrapper): - def inspect_contact_sync(self, handle): - return self.inspect_contacts_sync([handle])[0] - - def inspect_contacts_sync(self, handles): - h2asv = self.Contacts.GetContactAttributes(handles, []) - ret = [] - for h in handles: - ret.append(h2asv[h][cs.ATTR_CONTACT_ID]) - return ret - - def get_contact_handle_sync(self, identifier): - return self.Contacts.GetContactByID(identifier, [])[0] - - def get_contact_handles_sync(self, ids): - return [self.get_contact_handle_sync(i) for i in ids] - def wrap_connection(conn): - return ConnWrapper(conn, tp_name_prefix + '.Connection', - dict( + return ProxyWrapper(conn, tp_name_prefix + '.Connection', + dict([ + (name, tp_name_prefix + '.Connection.Interface.' + name) + for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts', + 'Presence', 'SimplePresence', 'Requests']] + [('Peer', 'org.freedesktop.DBus.Peer'), - ('Aliasing', cs.CONN_IFACE_ALIASING), - ('Avatars', cs.CONN_IFACE_AVATARS), - ('Contacts', cs.CONN_IFACE_CONTACTS), ('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS), ('ContactInfo', cs.CONN_IFACE_CONTACT_INFO), ('Location', cs.CONN_IFACE_LOCATION), - ('Presence', cs.CONN_IFACE_PRESENCE), - ('Requests', cs.CONN_IFACE_REQUESTS), ('Future', tp_name_prefix + '.Connection.FUTURE'), ('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION), ('ContactList', cs.CONN_IFACE_CONTACT_LIST), ('ContactGroups', cs.CONN_IFACE_CONTACT_GROUPS), ('PowerSaving', cs.CONN_IFACE_POWER_SAVING), - ('Addressing', cs.CONN_IFACE_ADDRESSING), ])) def wrap_channel(chan, type_, extra=None): interfaces = { type_: tp_name_prefix + '.Channel.Type.' + type_, - 'Group': cs.CHANNEL_IFACE_GROUP, + 'Group': tp_name_prefix + '.Channel.Interface.Group', } if extra: @@ -649,26 +470,14 @@ def wrap_channel(chan, type_, extra=None): return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces) - -def wrap_content(chan, extra=None): - interfaces = { } - - if extra: - interfaces.update(dict([ - (name, tp_name_prefix + '.Call1.Content.Interface.' + name) - for name in extra])) - - return ProxyWrapper(chan, tp_name_prefix + '.Call1.Content', interfaces) - def make_connection(bus, event_func, name, proto, params): cm = bus.get_object( tp_name_prefix + '.ConnectionManager.%s' % name, - tp_path_prefix + '/ConnectionManager/%s' % name, - introspect=False) + tp_path_prefix + '/ConnectionManager/%s' % name) cm_iface = dbus.Interface(cm, tp_name_prefix + '.ConnectionManager') connection_name, connection_path = cm_iface.RequestConnection( - proto, dbus.Dictionary(params, signature='sv')) + proto, params) conn = wrap_connection(bus.get_object(connection_name, connection_path)) return conn @@ -815,16 +624,6 @@ def install_colourer(): sys.stdout = Colourer(sys.stdout, patterns) return sys.stdout -# this is just to shut up unittest. -class DummyStream(object): - def write(self, s): - if 'CHECK_TWISTED_VERBOSE' in os.environ: - print s, - - def flush(self): - pass - if __name__ == '__main__': - stream = DummyStream() - runner = unittest.TextTestRunner(stream=stream) - unittest.main(testRunner=runner) + unittest.main() + |