1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import avahi
import urllib
import BaseHTTPServer
import SocketServer
import socket
from saluttest import exec_test
from file_transfer_helper import ReceiveFileTest
from avahitest import AvahiAnnouncer, get_host_name, AvahiListener,\
check_ipv6_enabled
from xmppstream import connect_to_stream6, setup_stream_listener6
from twisted.words.xish import domish
if not check_ipv6_enabled():
print "Skipped test as IPv6 doesn't seem to be available"
# exiting 77 causes automake to consider the test to have been skipped
raise SystemExit(77)
import constants as cs
class TestReceiveFileIPv6(ReceiveFileTest):
CONTACT_NAME = 'test-ft'
service_name = ''
metadata = {}
def announce_contact(self, name=CONTACT_NAME):
basic_txt = { "txtvers": "1", "status": "avail" }
self.contact_name = '%s@%s' % (name, get_host_name())
self.listener, port = setup_stream_listener6(self.q, self.contact_name)
self.contact_service = AvahiAnnouncer(self.contact_name, "_presence._tcp", port,
basic_txt, proto=avahi.PROTO_INET6)
def _resolve_salut_presence(self):
AvahiListener(self.q).listen_for_service("_presence._tcp")
e = self.q.expect('service-added', name = self.self_handle_name,
protocol = avahi.PROTO_INET6)
service = e.service
service.resolve()
e = self.q.expect('service-resolved', service = service,
protocol = avahi.PROTO_INET6)
return str(e.pt), e.port
def accept_file(self):
ReceiveFileTest.accept_file(self, cs.SOCKET_ADDRESS_TYPE_IPV6);
def receive_file(self):
ReceiveFileTest.receive_file(self, socket.AF_INET6);
def connect_to_salut(self):
host, port = self._resolve_salut_presence()
self.outbound = connect_to_stream6(self.q, self.contact_name,
self.self_handle_name, host, port)
e = self.q.expect('connection-result')
assert e.succeeded, e.reason
self.q.expect('stream-opened', connection = self.outbound)
def send_ft_offer_iq(self):
iq = domish.Element((None, 'iq'))
iq['to'] = self.self_handle_name
iq['from'] = self.contact_name
iq['type'] = 'set'
iq['id'] = 'gibber-file-transfer-0'
query = iq.addElement(('jabber:iq:oob', 'query'))
url = 'http://[::1]:%u/gibber-file-transfer-0/%s' % \
(self.httpd.server_port, urllib.quote(self.file.name))
url_node = query.addElement('url', content=url)
url_node['type'] = 'file'
url_node['size'] = str(self.file.size)
url_node['mimeType'] = self.file.content_type
query.addElement('desc', content=self.file.description)
self.outbound.send(iq)
def _get_http_server_class(self):
class HTTPServer6(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
address_family = getattr(socket, 'AF_INET6', None)
return HTTPServer6
if __name__ == '__main__':
test = TestReceiveFileIPv6()
exec_test(test.test)
|