1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
"""
Test requesting of muc tubes channels using the old and new request API.
"""
import dbus
import avahitest
from twisted.words.xish import domish
from saluttest import exec_test, wait_for_contact_list
from servicetest import call_async, EventPattern, wrap_channel, pretty
from constants import *
def test(q, bus, conn):
self_name = 'testsuite' + '@' + avahitest.get_host_name()
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged', args=[0L, 0L])
# FIXME: this is a hack to be sure to have all the contact list channels
# announced so they won't interfere with the muc ones announces.
wait_for_contact_list(q, conn)
# check if we can request tube channels
properties = conn.Properties.GetAll(CONN_IFACE_REQUESTS)
assert ({CHANNEL_TYPE: CHANNEL_TYPE_STREAM_TUBE,
TARGET_HANDLE_TYPE: HT_ROOM},
[TARGET_HANDLE, TARGET_ID, STREAM_TUBE_SERVICE],
) in properties.get('RequestableChannelClasses'),\
properties['RequestableChannelClasses']
# create muc channel using new API
call_async(q, conn.Requests, 'CreateChannel',
{ CHANNEL_TYPE: CHANNEL_TYPE_STREAM_TUBE,
TARGET_HANDLE_TYPE: HT_ROOM,
TARGET_ID: 'my-second-room',
STREAM_TUBE_SERVICE: 'loldongs',
})
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
EventPattern('dbus-signal', signal='NewChannel'),
EventPattern('dbus-signal', signal='NewChannels'),
)
tube_path = ret.value[0]
chan = wrap_channel(bus.get_object(conn.bus_name, tube_path),
'StreamTube')
handle = conn.RequestHandles(HT_ROOM, ['my-second-room'])[0]
tube_props = ret.value[1]
assert tube_props[CHANNEL_TYPE] == CHANNEL_TYPE_STREAM_TUBE
assert tube_props[TARGET_HANDLE_TYPE] == HT_ROOM
assert tube_props[TARGET_HANDLE] == handle
assert tube_props[TARGET_ID] == 'my-second-room'
assert tube_props[REQUESTED] == True
assert tube_props[INITIATOR_HANDLE] == conn.GetSelfHandle()
assert tube_props[INITIATOR_ID] == self_name
# text and tube channels are announced
channels = new_sig.args[0]
assert len(channels) == 2
got_text, got_tube = False, False
for path, props in channels:
if props[CHANNEL_TYPE] == CHANNEL_TYPE_TEXT:
got_text = True
assert props[REQUESTED] == False
elif props[CHANNEL_TYPE] == CHANNEL_TYPE_STREAM_TUBE:
got_tube = True
assert path == tube_path
assert props == tube_props
else:
assert False
assert props[TARGET_HANDLE_TYPE] == HT_ROOM
assert props[TARGET_HANDLE] == handle
assert props[TARGET_ID] == 'my-second-room'
assert props[INITIATOR_HANDLE] == conn.GetSelfHandle()
assert props[INITIATOR_ID] == self_name
# ensure the same channel
# yours, ensured_path, ensured_props = conn.Requests.EnsureChannel(
# { CHANNEL_TYPE: CHANNEL_TYPE_STREAM_TUBE,
# TARGET_HANDLE_TYPE: HT_ROOM,
# TARGET_HANDLE: handle,
# STREAM_TUBE_SERVICE: 'loldongs',
# })
# assert not yours
# assert ensured_path == tube_path, (ensured_path, tube_path2)
conn.Disconnect()
q.expect_many(
EventPattern('dbus-signal', signal='Closed',
path=tube_path),
EventPattern('dbus-signal', signal='ChannelClosed', args=[tube_path]),
EventPattern('dbus-signal', signal='StatusChanged', args=[2, 1]),
)
if __name__ == '__main__':
exec_test(test)
|