summaryrefslogtreecommitdiff
path: root/tests/twisted/avahi/set-presence.py
blob: b6f38f1ea2bb1f449be26422b924bed804d85465 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

"""
Test requesting of muc text channels using the old and new request API.
"""

import dbus
import avahi

from saluttest import exec_test
from avahitest import AvahiListener, txt_get_key
import constants as cs

def test(q, bus, conn):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_CONNECTED, cs.CSR_NONE_SPECIFIED])

    self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
    self_handle_name = conn.Properties.Get(cs.CONN, "SelfID")

    AvahiListener(q).listen_for_service("_presence._tcp")
    e = q.expect('service-added', name = self_handle_name, protocol = avahi.PROTO_INET)
    service = e.service

    service.resolve()

    def wait_for_presence_announce():
        e = q.expect('service-resolved', service=service)
        return txt_get_key(e.txt, 'status'), txt_get_key(e.txt, 'msg')

    # initial presence is available
    status, msg = wait_for_presence_announce()
    assert status == 'avail', status
    assert msg is None, msg

    statuses = conn.Get(cs.CONN_IFACE_PRESENCE, 'Statuses', dbus_interface=dbus.PROPERTIES_IFACE)
    assert 'available' in statuses
    assert 'dnd' in statuses
    assert 'away' in statuses


    simple_presence = dbus.Interface(conn, cs.CONN_IFACE_PRESENCE)
    # set your status to away
    simple_presence.SetPresence('away', 'At the pub')

    status, msg = wait_for_presence_announce()
    assert status == 'away', status
    assert msg == 'At the pub', msg

    # set your status to available without msg
    simple_presence.SetPresence('available', '')

    status, msg = wait_for_presence_announce()
    assert status == 'avail', status
    assert msg is None, msg

    conn.Disconnect()
    q.expect('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED])

if __name__ == '__main__':
    exec_test(test)