summaryrefslogtreecommitdiff
path: root/tests/twisted/avahi/aliases.py
blob: 799bd54f301782be3ae5a25d79ead394e16a8247 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
Test that aliases are built as expected from contacts' TXT records, and that
the details show up correctly in ContactInfo.
"""

from servicetest import assertContains, assertEquals, assertLength, call_async
from saluttest import exec_test, wait_for_contact_in_publish
from avahitest import AvahiAnnouncer
from avahitest import get_host_name
import constants as cs

import time

def wait_for_aliases_changed(q, handle):
    e = q.expect('dbus-signal', signal='AliasesChanged',
            predicate=lambda e: handle in e.args[0])
    alias = e.args[0][handle]
    return alias

def wait_for_contact_info_changed(q, handle):
    e = q.expect('dbus-signal', signal='ContactInfoChanged',
            predicate=lambda e: e.args[0] == handle)
    _, info = e.args
    return info

def assertOmitsField(field_name, fields):
    def matches(field):
        return field[0] == field_name

    assertLength(0, filter(matches, fields))

def check_contact_info(info, txt):
    first = txt.get('1st', '')
    last = txt.get('last', '')

    if first != '' or last != '':
        values = [last, first, '', '', '']
        assertContains(('n', [], values), info)

        fn = ' '.join([ x for x in [first, last] if x != ''])
        assertContains(('fn', [], [fn]), info)
    else:
        assertOmitsField('n', info)
        assertOmitsField('fn', info)

    email = txt.get('email', '')
    if email != '':
        assertContains(('email', ['type=internet'], [email]), info)
    else:
        assertOmitsField('email', info)

    jid = txt.get('jid', '')
    if jid != '':
        assertContains(('x-jabber', [], [jid]), info)
    else:
        assertOmitsField('x-jabber', info)

def check_all_contact_info_methods(conn, handle, keys):
    attrs = conn.Contacts.GetContactAttributes([handle],
        [cs.CONN_IFACE_CONTACT_INFO])[handle]
    info = attrs[cs.CONN_IFACE_CONTACT_INFO + "/info"]
    check_contact_info(info, keys)

    info = conn.ContactInfo.RequestContactInfo(handle)
    check_contact_info(info, keys)

def test(q, bus, conn):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_CONNECTED, cs.CSR_NONE_SPECIFIED])

    flags = conn.Properties.Get(cs.CONN_IFACE_ALIASING, "AliasFlags")
    assertEquals(flags, 0)

    assertContains(cs.CONN_IFACE_CONTACT_INFO,
        conn.Properties.Get(cs.CONN, "Interfaces"))
    ci_props = conn.Properties.GetAll(cs.CONN_IFACE_CONTACT_INFO)
    assertEquals(cs.CONTACT_INFO_FLAG_PUSH, ci_props['ContactInfoFlags'])
    assertEquals(
        [ ('n', [], cs.CONTACT_INFO_FIELD_FLAG_PARAMETERS_EXACT, 1),
          ('fn', [], cs.CONTACT_INFO_FIELD_FLAG_PARAMETERS_EXACT, 1),
          ('email', ['type=internet'],
           cs.CONTACT_INFO_FIELD_FLAG_PARAMETERS_EXACT, 1),
          ('x-jabber', [], cs.CONTACT_INFO_FIELD_FLAG_PARAMETERS_EXACT, 1),

        ],
        ci_props['SupportedFields'])

    # Just to verify that RCI does approximately nothing and doesn't crash.
    conn.ContactInfo.RefreshContactInfo([21,42,88])

    basic_txt = { "txtvers": "1", "status": "avail" }

    contact_name = "aliastest@" + get_host_name()
    announcer = AvahiAnnouncer(contact_name, "_presence._tcp", 1234, basic_txt)

    handle = wait_for_contact_in_publish(q, bus, conn, contact_name)
    alias = wait_for_aliases_changed(q, handle)
    assertEquals(contact_name, alias)

    for (alias, dict, expect_contact_info_changed) in [
      # Contact publishes just one of 1st and last
      ("last", { "last": "last" }, True),
      ("1st", { "1st": "1st"}, True),
      # Contact publishes a meaningful value for one of 1st and last, and an
      # empty value for the other one and for "nick". Empty values should be
      # treated as if missing.
      ("last", { "last": "last", "1st": "", "nick": "" }, True),
      ("1st", { "1st": "1st", "last": "", "nick": "" }, True),
      # When a contact publishes both 1st and last, we have to join them
      # together in a stupid anglo-centric way, like iChat does.
      ("1st last", { "1st": "1st", "last": "last" }, True),
      # Nickname should be preferred as the alias to 1st or last. Since we
      # don't report nicknames in ContactInfo, and nothing else has changed
      # from the last update, no ContactInfo changes should be announced.
      ("nickname", { "1st": "1st", "last": "last", "nick": "nickname" }, False),
      # If the contact stops publishing any of this stuff, we should fall back
      # to their JID as their alias.
      (contact_name, {}, True) ]:
        txt = basic_txt.copy()
        txt.update(dict)

        announcer.set(txt)

        a = wait_for_aliases_changed (q, handle)
        assert a == alias, (a, alias, txt)

        if expect_contact_info_changed:
            info = wait_for_contact_info_changed(q, handle)
            check_contact_info(info, dict)

        attrs = conn.Contacts.GetContactAttributes([handle],
            [cs.CONN_IFACE_ALIASING])[handle]
        assertEquals(alias, attrs[cs.CONN_IFACE_ALIASING + "/alias"])

        check_all_contact_info_methods(conn, handle, dict)

    for keys in [ # Check a few neat transitions, with no empty fields
                  { "email": "foo@bar.com" },
                  { "jid": "nyan@gmail.com", "email": "foo@bar.com" },
                  { "jid": "orly@example.com" },
                  # Check that empty fields are treated as if omitted
                  { "email": "foo@bar.com", "jid": "" },
                  { "jid": "orly@example.com", "email": "" },
                ]:
        txt = basic_txt.copy()
        txt.update(keys)

        announcer.set(txt)
        info = wait_for_contact_info_changed(q, handle)
        check_contact_info(info, keys)

        check_all_contact_info_methods(conn, handle, keys)

    # Try an invalid handle. Request should return InvalidHandle.
    # (Technically so should RefreshContactInfo but I am lazy.)
    call_async(q, conn.ContactInfo, 'RequestContactInfo', 42)
    q.expect('dbus-error', method='RequestContactInfo', name=cs.INVALID_HANDLE)

    # Try a valid handle for whom we have no data from the network.
    # Request should fail.
    h = conn.Contacts.GetContactByID('rthrtha@octopus', [])[0]
    call_async(q, conn.ContactInfo, 'RequestContactInfo', h)
    q.expect('dbus-error', method='RequestContactInfo', name=cs.NOT_AVAILABLE)

if __name__ == '__main__':
    exec_test(test)