1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# -*- coding: utf-8 -*-
import bottle
import threading
import urllib
import urllib2
import sys
import time
import unittest
import wsgiref
import wsgiref.simple_server
import wsgiref.util
from StringIO import StringIO
try:
from io import BytesIO
except:
pass
import mimetypes
import uuid
def tob(data):
return data.encode('utf8') if isinstance(data, unicode) else data
class ServerTestBase(unittest.TestCase):
def setUp(self):
''' Create a new Bottle app set it as default_app and register it to urllib2 '''
self.port = 8080
self.host = 'localhost'
self.app = bottle.app.push()
def urlopen(self, path, method='GET', post='', env=None):
result = {'code':0, 'status':'error', 'header':{}, 'body':tob('')}
def start_response(status, header):
result['code'] = int(status.split()[0])
result['status'] = status.split(None, 1)[-1]
for name, value in header:
name = name.title()
if name in result['header']:
result['header'][name] += ', ' + value
else:
result['header'][name] = value
env = env if env else {}
wsgiref.util.setup_testing_defaults(env)
env['REQUEST_METHOD'] = method.upper().strip()
env['PATH_INFO'] = path
if post:
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_LENGTH'] = len(tob(post))
env['wsgi.input'].write(tob(post))
env['wsgi.input'].seek(0)
for part in self.app(env, start_response):
try:
result['body'] += part
except TypeError:
raise TypeError('WSGI app yielded non-byte object %s', type(part))
return result
def postmultipart(self, path, fields, files):
return self.urlopen(path, multipart_environ(env), method='POST')
def tearDown(self):
bottle.app.pop()
def assertStatus(self, code, route, **kargs):
self.assertEqual(code, self.urlopen(route, **kargs)['code'])
def assertBody(self, body, route, **kargs):
self.assertEqual(tob(body), self.urlopen(route, **kargs)['body'])
def assertInBody(self, body, route, **kargs):
self.assertTrue(tob(body) in self.urlopen(route, **kargs)['body'])
def assertHeader(self, name, value, route, **kargs):
self.assertEqual(value, self.urlopen(route, **kargs)['header'].get(name))
def assertHeaderAny(self, name, route, **kargs):
self.assertTrue(self.urlopen(route, **kargs)['header'].get(name, None))
def multipart_environ(fields, files):
boundary = str(uuid.uuid1())
env = {'REQUEST_METHOD':'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary='+boundary}
wsgiref.util.setup_testing_defaults(env)
boundary = '--' + boundary
body = ''
for name, value in fields:
body += boundary + '\n'
body += 'Content-Disposition: form-data; name="%s"\n\n' % name
body += value + '\n'
for name, filename, content in files:
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
body += boundary + '\n'
body += 'Content-Disposition: file; name="%s"; filename="%s"\n' % \
(name, filename)
body += 'Content-Type: %s\n\n' % mimetype
body += content + '\n'
body += boundary + '--\n'
env['CONTENT_LENGTH'] = str(len(body))
if hasattr(body, 'encode'):
body = body.encode('utf8')
env['wsgi.input'].write(body)
env['wsgi.input'].seek(0)
return env
|