summaryrefslogtreecommitdiff
path: root/tests/twisted/servicetest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/twisted/servicetest.py')
-rw-r--r--tests/twisted/servicetest.py276
1 files changed, 252 insertions, 24 deletions
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index bca3b13b..5ff61a46 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -1,6 +1,23 @@
+# Copyright (C) 2009 Nokia Corporation
+# Copyright (C) 2009-2013 Collabora Ltd.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA
"""
-Infrastructure code for testing connection managers.
+Infrastructure code for testing Telepathy services.
"""
from twisted.internet import glib2reactor
@@ -8,18 +25,22 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
glib2reactor.install()
import sys
import time
+import os
import pprint
import unittest
-import dbus.glib
+import dbus
+import dbus.lowlevel
+from dbus.mainloop.glib import DBusGMainLoop
+DBusGMainLoop(set_as_default=True)
from twisted.internet import reactor
import constants as cs
-tp_name_prefix = 'org.freedesktop.Telepathy'
-tp_path_prefix = '/org/freedesktop/Telepathy'
+tp_name_prefix = cs.PREFIX
+tp_path_prefix = cs.PATH_PREFIX
class DictionarySupersetOf (object):
"""Utility class for expecting "a dictionary with at least these keys"."""
@@ -40,16 +61,19 @@ class DictionarySupersetOf (object):
except TypeError: # other is not iterable
return False
-class Event:
+class Event(object):
def __init__(self, type, **kw):
self.__dict__.update(kw)
self.type = type
(self.subqueue, self.subtype) = type.split ("-", 1)
+ def __str__(self):
+ return '\n'.join([ str(type(self)) ] + format_event(self))
+
def format_event(event):
ret = ['- type %s' % event.type]
- for key in dir(event):
+ for key in sorted(dir(event)):
if key != 'type' and not key.startswith('_'):
ret.append('- %s: %s' % (
key, pprint.pformat(getattr(event, key))))
@@ -151,6 +175,12 @@ class BaseEventQueue:
"""
self.forbidden_events.difference_update(set(patterns))
+ def unforbid_all(self):
+ """
+ Remove all patterns from the set of forbidden events.
+ """
+ self.forbidden_events.clear()
+
def _check_forbidden(self, event):
for e in self.forbidden_events:
if e.match(event):
@@ -167,7 +197,14 @@ class BaseEventQueue:
t = time.time()
while True:
- event = self.wait([pattern.subqueue])
+ try:
+ event = self.wait([pattern.subqueue])
+ except TimeoutError:
+ self.log('timeout')
+ self.log('still expecting:')
+ self.log(' - %r' % pattern)
+ raise
+
self._check_forbidden(event)
if pattern.match(event):
@@ -279,6 +316,11 @@ class IteratingEventQueue(BaseEventQueue):
def __init__(self, timeout=None):
BaseEventQueue.__init__(self, timeout)
+ self._dbus_method_impls = []
+ self._buses = []
+ # a message filter which will claim we handled everything
+ self._dbus_dev_null = \
+ lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED
def wait(self, queues=None):
stop = [False]
@@ -303,6 +345,127 @@ class IteratingEventQueue(BaseEventQueue):
else:
raise TimeoutError
+ def add_dbus_method_impl(self, cb, bus=None, **kwargs):
+ if bus is None:
+ bus = self._buses[0]
+
+ self._dbus_method_impls.append(
+ (EventPattern('dbus-method-call', **kwargs), cb))
+
+ def dbus_emit(self, path, iface, name, *a, **k):
+ bus = k.pop('bus', self._buses[0])
+ assert 'signature' in k, k
+ message = dbus.lowlevel.SignalMessage(path, iface, name)
+ message.append(*a, **k)
+ bus.send_message(message)
+
+ def dbus_return(self, in_reply_to, *a, **k):
+ bus = k.pop('bus', self._buses[0])
+ assert 'signature' in k, k
+ reply = dbus.lowlevel.MethodReturnMessage(in_reply_to)
+ reply.append(*a, **k)
+ bus.send_message(reply)
+
+ def dbus_raise(self, in_reply_to, name, message=None, bus=None):
+ if bus is None:
+ bus = self._buses[0]
+
+ reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message)
+ bus.send_message(reply)
+
+ def attach_to_bus(self, bus):
+ if not self._buses:
+ # first-time setup
+ self._dbus_filter_bound_method = self._dbus_filter
+
+ self._buses.append(bus)
+
+ # Only subscribe to messages on the first bus connection (assumed to
+ # be the shared session bus connection used by the simulated connection
+ # manager and most of the test suite), not on subsequent bus
+ # connections (assumed to represent extra clients).
+ #
+ # When we receive a method call on the other bus connections, ignore
+ # it - the eavesdropping filter installed on the first bus connection
+ # will see it too.
+ #
+ # This is highly counter-intuitive, but it means our messages are in
+ # a guaranteed order (we don't have races between messages arriving on
+ # various connections).
+ if len(self._buses) > 1:
+ bus.add_message_filter(self._dbus_dev_null)
+ return
+
+ try:
+ # for dbus > 1.5
+ bus.add_match_string("eavesdrop=true,type='signal'")
+ except dbus.DBusException:
+ bus.add_match_string("type='signal'")
+ bus.add_match_string("type='method_call'")
+ else:
+ bus.add_match_string("eavesdrop=true,type='method_call'")
+
+ bus.add_message_filter(self._dbus_filter_bound_method)
+
+ bus.add_signal_receiver(
+ lambda *args, **kw:
+ self.append(
+ Event('dbus-signal',
+ path=unwrap(kw['path']),
+ signal=kw['member'],
+ args=map(unwrap, args),
+ interface=kw['interface'])),
+ None,
+ None,
+ None,
+ path_keyword='path',
+ member_keyword='member',
+ interface_keyword='interface',
+ byte_arrays=True,
+ )
+
+ def cleanup(self):
+ if self._buses:
+ self._buses[0].remove_message_filter(self._dbus_filter_bound_method)
+ for bus in self._buses[1:]:
+ bus.remove_message_filter(self._dbus_dev_null)
+
+ self._buses = []
+ self._dbus_method_impls = []
+
+ def _dbus_filter(self, bus, message):
+ if isinstance(message, dbus.lowlevel.MethodCallMessage):
+
+ destination = message.get_destination()
+ sender = message.get_sender()
+
+ if (destination == 'org.freedesktop.DBus' or
+ sender == self._buses[0].get_unique_name()):
+ # suppress reply and don't make an Event
+ return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+ e = Event('dbus-method-call', message=message,
+ interface=message.get_interface(), path=message.get_path(),
+ raw_args=message.get_args_list(byte_arrays=True),
+ args=map(unwrap, message.get_args_list(byte_arrays=True)),
+ destination=str(destination),
+ method=message.get_member(),
+ sender=message.get_sender(),
+ handled=False)
+
+ for pair in self._dbus_method_impls:
+ pattern, cb = pair
+ if pattern.match(e):
+ cb(e)
+ e.handled = True
+ break
+
+ self.append(e)
+
+ return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+ return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
+
class TestEventQueue(BaseEventQueue):
def __init__(self, events):
BaseEventQueue.__init__(self)
@@ -413,20 +576,23 @@ def call_async(test, proxy, method, *args, **kw):
kw.update({'reply_handler': reply_func, 'error_handler': error_func})
method_proxy(*args, **kw)
-def sync_dbus(bus, q, conn):
- # Dummy D-Bus method call
- # This won't do the right thing unless the proxy has a unique name.
- assert conn.object.bus_name.startswith(':')
- root_object = bus.get_object(conn.object.bus_name, '/')
- call_async(
- q, dbus.Interface(root_object, 'org.freedesktop.Telepathy.Tests'), 'DummySyncDBus')
+def sync_dbus(bus, q, proxy):
+ # Dummy D-Bus method call. We can't use DBus.Peer.Ping() because libdbus
+ # replies to that message immediately, rather than handing it up to
+ # dbus-glib and thence the application, which means that Ping()ing the
+ # application doesn't ensure that it's processed all D-Bus messages prior
+ # to our ping.
+ call_async(q, dbus.Interface(proxy, 'org.freedesktop.Telepathy.Tests'),
+ 'DummySyncDBus')
q.expect('dbus-error', method='DummySyncDBus')
class ProxyWrapper:
- def __init__(self, object, default, others):
+ def __init__(self, object, default, others={}):
self.object = object
self.default_interface = dbus.Interface(object, default)
self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
+ self.TpProperties = \
+ dbus.Interface(object, tp_name_prefix + '.Properties')
self.interfaces = dict([
(name, dbus.Interface(object, iface))
for name, iface in others.iteritems()])
@@ -440,27 +606,64 @@ class ProxyWrapper:
return getattr(self.default_interface, name)
+class ConnWrapper(ProxyWrapper):
+ def inspect_contact_sync(self, handle):
+ return self.inspect_contacts_sync([handle])[0]
+
+ def inspect_contacts_sync(self, handles):
+ h2asv = self.Contacts.GetContactAttributes(handles, [], True)
+ ret = []
+ for h in handles:
+ ret.append(h2asv[h][cs.ATTR_CONTACT_ID])
+ return ret
+
+ def get_contact_handle_sync(self, identifier):
+ return self.Contacts.GetContactByID(identifier, [])[0]
+
+ def get_contact_handles_sync(self, ids):
+ return [self.get_contact_handle_sync(i) for i in ids]
+
def wrap_connection(conn):
- return ProxyWrapper(conn, tp_name_prefix + '.Connection',
+ return ConnWrapper(conn, tp_name_prefix + '.Connection',
dict([
(name, tp_name_prefix + '.Connection.Interface.' + name)
for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
- 'Presence', 'SimplePresence', 'Requests']] +
+ 'SimplePresence', 'Requests']] +
[('Peer', 'org.freedesktop.DBus.Peer'),
('ContactCapabilities', cs.CONN_IFACE_CONTACT_CAPS),
('ContactInfo', cs.CONN_IFACE_CONTACT_INFO),
('Location', cs.CONN_IFACE_LOCATION),
- ('Future', tp_name_prefix + '.Connection.FUTURE'),
('MailNotification', cs.CONN_IFACE_MAIL_NOTIFICATION),
('ContactList', cs.CONN_IFACE_CONTACT_LIST),
('ContactGroups', cs.CONN_IFACE_CONTACT_GROUPS),
+ ('ContactBlocking', cs.CONN_IFACE_CONTACT_BLOCKING),
('PowerSaving', cs.CONN_IFACE_POWER_SAVING),
+ ('Addressing', cs.CONN_IFACE_ADDRESSING),
+ ('ClientTypes', cs.CONN_IFACE_CLIENT_TYPES),
+ ('Renaming', cs.CONN_IFACE_RENAMING),
+ ('Sidecars1', cs.CONN_IFACE_SIDECARS1),
]))
+class ChannelWrapper(ProxyWrapper):
+ def send_msg_sync(self, txt):
+ message = [
+ { 'message-type': cs.MT_NORMAL, },
+ { 'content-type': 'text/plain',
+ 'content': txt
+ }]
+ self.Messages.SendMessage(message, 0)
+
def wrap_channel(chan, type_, extra=None):
interfaces = {
type_: tp_name_prefix + '.Channel.Type.' + type_,
- 'Group': tp_name_prefix + '.Channel.Interface.Group',
+ 'Channel': cs.CHANNEL,
+ 'Group': cs.CHANNEL_IFACE_GROUP,
+ 'Hold': cs.CHANNEL_IFACE_HOLD,
+ 'Messages': cs.CHANNEL_IFACE_MESSAGES,
+ 'RoomConfig1': cs.CHANNEL_IFACE_ROOM_CONFIG,
+ 'ChatState': cs.CHANNEL_IFACE_CHAT_STATE,
+ 'Destroyable': cs.CHANNEL_IFACE_DESTROYABLE,
+ 'Password': cs.CHANNEL_IFACE_PASSWORD,
}
if extra:
@@ -468,16 +671,31 @@ def wrap_channel(chan, type_, extra=None):
(name, tp_name_prefix + '.Channel.Interface.' + name)
for name in extra]))
- return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)
+ return ChannelWrapper(chan, tp_name_prefix + '.Channel', interfaces)
+
+
+def wrap_content(chan, extra=None):
+ interfaces = {
+ 'DTMF': cs.CALL_CONTENT_IFACE_DTMF,
+ 'Media': cs.CALL_CONTENT_IFACE_MEDIA,
+ }
+
+ if extra:
+ interfaces.update(dict([
+ (name, tp_name_prefix + '.Call1.Content.Interface.' + name)
+ for name in extra]))
+
+ return ProxyWrapper(chan, tp_name_prefix + '.Call1.Content', interfaces)
def make_connection(bus, event_func, name, proto, params):
cm = bus.get_object(
tp_name_prefix + '.ConnectionManager.%s' % name,
- tp_path_prefix + '/ConnectionManager/%s' % name)
+ tp_path_prefix + '/ConnectionManager/%s' % name,
+ introspect=False)
cm_iface = dbus.Interface(cm, tp_name_prefix + '.ConnectionManager')
connection_name, connection_path = cm_iface.RequestConnection(
- proto, params)
+ proto, dbus.Dictionary(params, signature='sv'))
conn = wrap_connection(bus.get_object(connection_name, connection_path))
return conn
@@ -624,6 +842,16 @@ def install_colourer():
sys.stdout = Colourer(sys.stdout, patterns)
return sys.stdout
-if __name__ == '__main__':
- unittest.main()
+# this is just to shut up unittest.
+class DummyStream(object):
+ def write(self, s):
+ if 'CHECK_TWISTED_VERBOSE' in os.environ:
+ print s,
+ def flush(self):
+ pass
+
+if __name__ == '__main__':
+ stream = DummyStream()
+ runner = unittest.TextTestRunner(stream=stream)
+ unittest.main(testRunner=runner)