1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
"""
Test requesting of muc text channels using the old and new request API.
"""
import dbus
import avahi
from saluttest import exec_test
from avahitest import AvahiListener, txt_get_key
import constants as cs
def test(q, bus, conn):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_CONNECTED, cs.CSR_NONE_SPECIFIED])
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
self_handle_name = conn.Properties.Get(cs.CONN, "SelfID")
AvahiListener(q).listen_for_service("_presence._tcp")
e = q.expect('service-added', name = self_handle_name, protocol = avahi.PROTO_INET)
service = e.service
service.resolve()
def wait_for_presence_announce():
e = q.expect('service-resolved', service=service)
return txt_get_key(e.txt, 'status'), txt_get_key(e.txt, 'msg')
# initial presence is available
status, msg = wait_for_presence_announce()
assert status == 'avail', status
assert msg is None, msg
statuses = conn.Get(cs.CONN_IFACE_PRESENCE, 'Statuses', dbus_interface=dbus.PROPERTIES_IFACE)
assert 'available' in statuses
assert 'dnd' in statuses
assert 'away' in statuses
simple_presence = dbus.Interface(conn, cs.CONN_IFACE_PRESENCE)
# set your status to away
simple_presence.SetPresence('away', 'At the pub')
status, msg = wait_for_presence_announce()
assert status == 'away', status
assert msg == 'At the pub', msg
# set your status to available without msg
simple_presence.SetPresence('available', '')
status, msg = wait_for_presence_announce()
assert status == 'avail', status
assert msg is None, msg
conn.Disconnect()
q.expect('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED])
if __name__ == '__main__':
exec_test(test)
|