summaryrefslogtreecommitdiff
path: root/test/tools.py
blob: 3e318a5b6c60b7ab8fda06d9c6c45211548a13e2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# -*- coding: utf-8 -*-
import bottle
import threading
import urllib
import urllib2
import sys
import time
import unittest
import wsgiref
import wsgiref.simple_server
import wsgiref.util
from StringIO import StringIO
try:
    from io import BytesIO
except:
    pass
import mimetypes
import uuid

def tob(data):
    return data.encode('utf8') if isinstance(data, unicode) else data

class ServerTestBase(unittest.TestCase):
    def setUp(self):
        ''' Create a new Bottle app set it as default_app and register it to urllib2 '''
        self.port = 8080
        self.host = 'localhost'
        self.app = bottle.app.push()

    def urlopen(self, path, method='GET', post='', env=None):
        result = {'code':0, 'status':'error', 'header':{}, 'body':tob('')}
        def start_response(status, header):
            result['code'] = int(status.split()[0])
            result['status'] = status.split(None, 1)[-1]
            for name, value in header:
                name = name.title()
                if name in result['header']:
                    result['header'][name] += ', ' + value
                else:
                    result['header'][name] = value
        env = env if env else {}
        wsgiref.util.setup_testing_defaults(env)
        env['REQUEST_METHOD'] = method.upper().strip()
        env['PATH_INFO'] = path
        if post:
            env['REQUEST_METHOD'] = 'POST'
            env['CONTENT_LENGTH'] = len(tob(post))
            env['wsgi.input'].write(tob(post))
            env['wsgi.input'].seek(0)
        for part in self.app(env, start_response):
            try:
                result['body'] += part
            except TypeError:
                raise TypeError('WSGI app yielded non-byte object %s', type(part))
        return result
        
    def postmultipart(self, path, fields, files):
        return self.urlopen(path, multipart_environ(env), method='POST')

    def tearDown(self):
        bottle.app.pop()

    def assertStatus(self, code, route, **kargs):
        self.assertEqual(code, self.urlopen(route, **kargs)['code'])

    def assertBody(self, body, route, **kargs):
        self.assertEqual(tob(body), self.urlopen(route, **kargs)['body'])

    def assertInBody(self, body, route, **kargs):
        self.assertTrue(tob(body) in self.urlopen(route, **kargs)['body'])

    def assertHeader(self, name, value, route, **kargs):
        self.assertEqual(value, self.urlopen(route, **kargs)['header'].get(name))

    def assertHeaderAny(self, name, route, **kargs):
        self.assertTrue(self.urlopen(route, **kargs)['header'].get(name, None))

def multipart_environ(fields, files):
    boundary = str(uuid.uuid1())
    env = {'REQUEST_METHOD':'POST',
           'CONTENT_TYPE':  'multipart/form-data; boundary='+boundary}
    wsgiref.util.setup_testing_defaults(env)
    boundary = '--' + boundary
    body = ''
    for name, value in fields:
        body += boundary + '\n'
        body += 'Content-Disposition: form-data; name="%s"\n\n' % name
        body += value + '\n'
    for name, filename, content in files:
        mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
        body += boundary + '\n'
        body += 'Content-Disposition: file; name="%s"; filename="%s"\n' % \
             (name, filename)
        body += 'Content-Type: %s\n\n' % mimetype
        body += content + '\n'
    body += boundary + '--\n'
    env['CONTENT_LENGTH'] = str(len(body))
    if hasattr(body, 'encode'):
        body = body.encode('utf8')
    env['wsgi.input'].write(body)
    env['wsgi.input'].seek(0)
    return env