summaryrefslogtreecommitdiff
path: root/tests/twisted/avahi/caps-self.py
blob: e73c1fa34c2b6729a1c5062876b2cf62a7abb485 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
Basic test of SetSelfCapabilities on interface
org.freedesktop.Telepathy.Connection.Interface.ContactCapabilities
"""

from saluttest import exec_test
from avahitest import AvahiAnnouncer, AvahiListener
from avahitest import get_host_name
from avahitest import txt_get_key
import avahi

from xmppstream import setup_stream_listener, connect_to_stream
from servicetest import make_channel_proxy
from saluttest import fixed_features

from twisted.words.xish import xpath, domish
from caps_helper import compute_caps_hash, check_caps_txt
from config import PACKAGE_STRING
import ns
import constants as cs

import time
import dbus

HT_CONTACT = 1

def test(q, bus, conn):
    # last value of the "ver" key we resolved. We use it to be sure that the
    # modified caps has already be announced.
    old_ver = None

    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged', args=[0L, 0L])

    self_handle = conn.GetSelfHandle()
    self_handle_name =  conn.InspectHandles(HT_CONTACT, [self_handle])[0]

    AvahiListener(q).listen_for_service("_presence._tcp")
    e = q.expect('service-added', name = self_handle_name,
        protocol = avahi.PROTO_INET)
    service = e.service
    service.resolve()

    e = q.expect('service-resolved', service = service)

    ver = txt_get_key(e.txt, "ver")
    while ver == old_ver:
        # be sure that the announced caps actually changes
        e = q.expect('service-resolved', service=service)
        ver = txt_get_key(e.txt, "ver")
    old_ver = ver

    # We support OOB file transfer
    caps = compute_caps_hash(['client/pc//%s' % PACKAGE_STRING],
        fixed_features, {})
    check_caps_txt(e.txt, caps)

    conn_caps_iface = dbus.Interface(conn, cs.CONN_IFACE_CONTACT_CAPS)

    # Advertise nothing
    conn_caps_iface.UpdateCapabilities([])

    service.resolve()
    e = q.expect('service-resolved', service = service)

    # Announced capa didn't change
    caps = compute_caps_hash(['client/pc//%s' % PACKAGE_STRING],
        fixed_features, {})

    check_caps_txt(e.txt, caps)

if __name__ == '__main__':
    exec_test(test)