summaryrefslogtreecommitdiff
path: root/tests/twisted/avahi/tubes/request-muc-tubes.py
blob: 21100f15d057319d37f0f3d99b4fd2d3cb01e759 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

"""
Test requesting of muc tubes channels using the old and new request API.
"""

import dbus
import avahitest

from twisted.words.xish import domish

from saluttest import exec_test, wait_for_contact_list
from servicetest import call_async, EventPattern, wrap_channel, pretty
import constants as cs

def test(q, bus, conn):
    self_name = 'testsuite' + '@' + avahitest.get_host_name()

    conn.Connect()

    q.expect('dbus-signal', signal='StatusChanged', args=[0L, 0L])

    # FIXME: this is a hack to be sure to have all the contact list channels
    # announced so they won't interfere with the muc ones announces.
    wait_for_contact_list(q, conn)

    # check if we can request tube channels
    properties = conn.Properties.GetAll(cs.CONN_IFACE_REQUESTS)
    assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
             cs.TARGET_HANDLE_TYPE: cs.HT_ROOM},
             [cs.TARGET_HANDLE, cs.TARGET_ID, cs.STREAM_TUBE_SERVICE],
             ) in properties.get('RequestableChannelClasses'),\
                     properties['RequestableChannelClasses']

    # create muc channel using new API
    call_async(q, conn.Requests, 'CreateChannel',
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
              cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
              cs.TARGET_ID: 'my-second-room',
              cs.STREAM_TUBE_SERVICE: 'loldongs',
              })

    ret, new_sig = q.expect_many(
        EventPattern('dbus-return', method='CreateChannel'),
        EventPattern('dbus-signal', signal='NewChannels'),
        )
    tube_path = ret.value[0]
    chan = wrap_channel(bus.get_object(conn.bus_name, tube_path),
                        'StreamTube')

    tube_props = ret.value[1]
    assert tube_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAM_TUBE
    assert tube_props[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
    assert tube_props[cs.TARGET_ID] == 'my-second-room'
    assert tube_props[cs.REQUESTED] == True
    assert tube_props[cs.INITIATOR_HANDLE] == conn.Properties.Get(cs.CONN, "SelfHandle")
    assert tube_props[cs.INITIATOR_ID] == self_name

    # text and tube channels are announced
    channels = new_sig.args[0]
    assert len(channels) == 1

    handle = tube_props[cs.TARGET_HANDLE]

    path, props = channels[0]
    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAM_TUBE
    assert path == tube_path
    assert props == tube_props
    assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
    assert props[cs.TARGET_HANDLE] == handle
    assert props[cs.TARGET_ID] == 'my-second-room'
    assert props[cs.INITIATOR_HANDLE] == conn.Properties.Get(cs.CONN, "SelfHandle")
    assert props[cs.INITIATOR_ID] == self_name

    # ensure the same channel

# TODO: the muc channel doesn't bother to look at existing tubes
# before creating a new one. once that's fixed, uncomment this.
#    yours, ensured_path, _ = conn.Requests.EnsureChannel(
#            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
#              cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
#              cs.TARGET_HANDLE: handle,
#              cs.STREAM_TUBE_SERVICE: 'loldongs',
#              })

#    assert not yours
#    assert ensured_path == tube_path, (ensured_path, tube_path)

    conn.Disconnect()

    q.expect_many(
            EventPattern('dbus-signal', signal='Closed',
                path=tube_path),
            EventPattern('dbus-signal', signal='ChannelClosed', args=[tube_path]),
            EventPattern('dbus-signal', signal='StatusChanged', args=[2, 1]),
            )

if __name__ == '__main__':
    exec_test(test)