""" Infrastructure code for testing Salut """ import os import sys import time import re from subprocess import Popen import servicetest from servicetest import call_async, EventPattern, Event, unwrap from twisted.internet import reactor import constants as cs from twisted.words.protocols.jabber.client import IQ from twisted.words.xish import domish, xpath import ns import dbus import glib # keep sync with src/capabilities.c:self_advertised_features fixed_features = [ns.SI, ns.TUBES, ns.IQ_OOB, ns.X_OOB, ns.TP_FT_METADATA] def make_result_iq(iq): result = IQ(None, "result") result["id"] = iq["id"] query = iq.firstChildElement() if query: result.addElement((query.uri, query.name)) return result def sync_stream(q, xmpp_connection): """Used to ensure that Salut has processed all stanzas sent to it on this xmpp_connection.""" iq = IQ(None, "get") iq.addElement(('http://jabber.org/protocol/disco#info', 'query')) xmpp_connection.send(iq) q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info') def make_connection(bus, event_func, params=None): default_params = { 'published-name': 'testsuite', 'first-name': 'test', 'last-name': 'suite', 'nickname': re.sub('(.*tests/twisted/|\./)', '', sys.argv[0]), } if params: default_params.update(params) return servicetest.make_connection(bus, event_func, 'salut', 'local_xmpp', default_params) def ensure_avahi_is_running(): bus = dbus.SystemBus() bus_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus') if bus_obj.NameHasOwner('org.freedesktop.Avahi', dbus_interface='org.freedesktop.DBus'): return loop = glib.MainLoop() def name_owner_changed_cb(name, old_owner, new_owner): loop.quit() noc = bus.add_signal_receiver(name_owner_changed_cb, signal_name='NameOwnerChanged', dbus_interface='org.freedesktop.DBus', arg0='org.freedesktop.Avahi') # Cannot use D-Bus activation because we have no way to pass to activated # clients the address of the system bus and we cannot host the service in # this process because we are going to make blocking calls and we would # deadlock. tests_dir = os.path.dirname(__file__) avahimock_path = os.path.join(tests_dir, 'avahimock.py') Popen([avahimock_path]) loop.run() noc.remove() def exec_test_deferred (fun, params, protocol=None, timeout=None, make_conn=True): colourer = None if 'SALUT_TEST_REAL_AVAHI' not in os.environ: ensure_avahi_is_running() if sys.stdout.isatty() or 'CHECK_FORCE_COLOR' in os.environ: colourer = servicetest.install_colourer() bus = dbus.SessionBus() queue = servicetest.IteratingEventQueue(timeout) queue.verbose = ( os.environ.get('CHECK_TWISTED_VERBOSE', '') != '' or '-v' in sys.argv) if make_conn: try: conn = make_connection(bus, queue.append, params) except Exception, e: # This is normally because the connection's still kicking around # on the bus from a previous test. Let's bail out unceremoniously. print e os._exit(1) else: conn = None def signal_receiver(*args, **kw): queue.append(Event('dbus-signal', path=unwrap(kw['path']), signal=kw['member'], args=map(unwrap, args), interface=kw['interface'])) bus.add_signal_receiver( signal_receiver, None, # signal name None, # interface None, path_keyword='path', member_keyword='member', interface_keyword='interface', byte_arrays=True ) error = None try: fun(queue, bus, conn) except Exception, e: import traceback traceback.print_exc() error = e queue.verbose = False if colourer: sys.stdout = colourer.fh if bus.name_has_owner(conn.object.bus_name): # Connection hasn't already been disconnected and destroyed try: if conn.Properties.Get(cs.CONN, 'Status') == cs.CONN_STATUS_CONNECTED: # Connection is connected, properly disconnect it call_async(queue, conn, 'Disconnect') queue.expect_many(EventPattern('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]), EventPattern('dbus-return', method='Disconnect')) else: # Connection is not connected, call Disconnect() to destroy it conn.Disconnect() except dbus.DBusException, e: pass try: conn.Disconnect() raise AssertionError("Connection didn't disappear; " "all subsequent tests will probably fail") except dbus.DBusException, e: pass except Exception, e: import traceback traceback.print_exc() error = e if error is None: reactor.callLater(0, reactor.crash) else: # please ignore the POSIX behind the curtain os._exit(1) if 'SALUT_TEST_REFDBG' in os.environ: # we have to wait that Salut timeouts so the process is properly # exited and refdbg can generates its report time.sleep(5.5) def exec_test(fun, params=None, protocol=None, timeout=None, make_conn=True): reactor.callWhenRunning (exec_test_deferred, fun, params, protocol, timeout, make_conn) reactor.run() def wait_for_contact_in_publish(q, bus, conn, contact_name): handle = 0 # Wait until the record shows up in publish while handle == 0: e = q.expect('dbus-signal', signal='ContactsChanged', path=conn.object_path) for h, state in e.args[0].items(): name = e.args[1][h] if name == contact_name and state[1] == cs.SUBSCRIPTION_STATE_YES: handle = h return handle def _elem_add(elem, *children): for child in children: if isinstance(child, domish.Element): elem.addChild(child) elif isinstance(child, unicode): elem.addContent(child) else: raise ValueError( 'invalid child object %r (must be element or unicode)', child) def elem(a, b=None, attrs={}, **kw): r""" >>> elem('foo')().toXml() u'' >>> elem('foo', x='1')().toXml() u"" >>> elem('foo', x='1')(u'hello').toXml() u"hello" >>> elem('foo', x='1')(u'hello', ... elem('http://foo.org', 'bar', y='2')(u'bye')).toXml() u"hellobye" >>> elem('foo', attrs={'xmlns:bar': 'urn:bar', 'bar:cake': 'yum'})( ... elem('bar:e')(u'i') ... ).toXml() u"i" """ class _elem(domish.Element): def __call__(self, *children): _elem_add(self, *children) return self if b is not None: elem = _elem((a, b)) else: elem = _elem((None, a)) # Can't just update kw into attrs, because that *modifies the parameter's # default*. Thanks python. allattrs = {} allattrs.update(kw) allattrs.update(attrs) # First, let's pull namespaces out realattrs = {} for k, v in allattrs.iteritems(): if k.startswith('xmlns:'): abbr = k[len('xmlns:'):] elem.localPrefixes[abbr] = v else: realattrs[k] = v for k, v in realattrs.iteritems(): if k == 'from_': elem['from'] = v else: elem[k] = v return elem def elem_iq(server, type, **kw): class _iq(IQ): def __call__(self, *children): _elem_add(self, *children) return self iq = _iq(server, type) for k, v in kw.iteritems(): if k == 'from_': iq['from'] = v else: iq[k] = v return iq def make_presence(_from, to, type=None, show=None, status=None, caps=None, photo=None): presence = domish.Element((None, 'presence')) presence['from'] = _from presence['to'] = to if type is not None: presence['type'] = type if show is not None: presence.addElement('show', content=show) if status is not None: presence.addElement('status', content=status) if caps is not None: cel = presence.addElement(('http://jabber.org/protocol/caps', 'c')) for key,value in caps.items(): cel[key] = value # 4a1... if photo is not None: x = presence.addElement((ns.VCARD_TEMP_UPDATE, 'x')) x.addElement('photo').addContent(photo) return presence