summaryrefslogtreecommitdiff
path: root/tests/twisted/gateways.py
blob: ebe1b2ef76a540075feabbb33b832dbbd5422e3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Test the gateways plugin
"""

import dbus
from twisted.words.xish import domish, xpath

from servicetest import call_async, EventPattern, assertEquals
from gabbletest import exec_test, send_error_reply, acknowledge_iq, make_presence
import constants as cs
import ns
from config import PLUGINS_ENABLED

PLUGIN_IFACE = cs.PREFIX + ".Gabble.Plugin.Gateways"

if not PLUGINS_ENABLED:
    print "NOTE: built without --enable-plugins, not testing plugins"
    raise SystemExit(77)

def test_success(q, gateways_iface, stream):
    call_async(q, gateways_iface, 'Register',
            'talkd.example.com', '1970', 's3kr1t')
    e = q.expect('stream-iq', iq_type='set', query_name='query',
            query_ns=ns.REGISTER, to='talkd.example.com')
    assertEquals('1970', xpath.queryForString('/query/username', e.query))
    assertEquals('s3kr1t', xpath.queryForString('/query/password', e.query))
    acknowledge_iq(stream, e.stanza)
    q.expect_many(
            EventPattern('dbus-return', method='Register'),
            EventPattern('stream-presence', presence_type='subscribe',
                to='talkd.example.com'),
            )
    stream.send(make_presence('talkd.example.com', type='subscribed'))

def test_conflict(q, gateways_iface, stream):
    call_async(q, gateways_iface, 'Register',
            'sip.example.com', '8675309', 'jenny')
    e = q.expect('stream-iq', iq_type='set', query_name='query',
            query_ns=ns.REGISTER, to='sip.example.com')
    assertEquals('8675309', xpath.queryForString('/query/username', e.query))
    assertEquals('jenny', xpath.queryForString('/query/password', e.query))
    error = domish.Element((None, 'error'))
    error['type'] = 'cancel'
    error['code'] = '409'
    error.addElement((ns.STANZA, 'conflict'))
    send_error_reply(stream, e.stanza, error)
    q.expect('dbus-error', method='Register', name=cs.REGISTRATION_EXISTS)

def test_not_acceptable(q, gateways_iface, stream):
    call_async(q, gateways_iface, 'Register',
            'fully-captcha-enabled.example.com', 'lalala', 'stoats')
    e = q.expect('stream-iq', iq_type='set', query_name='query',
            query_ns=ns.REGISTER, to='fully-captcha-enabled.example.com')
    assertEquals('lalala', xpath.queryForString('/query/username', e.query))
    assertEquals('stoats', xpath.queryForString('/query/password', e.query))
    error = domish.Element((None, 'error'))
    error['type'] = 'modify'
    error['code'] = '406'
    error.addElement((ns.STANZA, 'not-acceptable'))
    send_error_reply(stream, e.stanza, error)
    q.expect('dbus-error', method='Register', name=cs.NOT_AVAILABLE)

def test(q, bus, conn, stream):
    # Request a sidecar thate we support before we're connected; it should just
    # wait around until we're connected.
    call_async(q, conn.Sidecars1, 'EnsureSidecar', PLUGIN_IFACE)

    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    # Now we're connected, the call we made earlier should return.
    path, props = q.expect('dbus-return', method='EnsureSidecar').value
    # This sidecar doesn't even implement get_immutable_properties; it
    # should just get the empty dict filled in for it.
    assertEquals({}, props)

    gateways_iface = dbus.Interface(bus.get_object(conn.bus_name, path),
            PLUGIN_IFACE)

    test_success(q, gateways_iface, stream)
    test_conflict(q, gateways_iface, stream)
    test_not_acceptable(q, gateways_iface, stream)

    call_async(q, conn, 'Disconnect')

    q.expect_many(
        EventPattern('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]),
        EventPattern('stream-closed'),
        )

    stream.sendFooter()
    q.expect('dbus-return', method='Disconnect')

if __name__ == '__main__':
    exec_test(test, do_connect=False)