summaryrefslogtreecommitdiff
path: root/test/test_server.py
blob: dacdfc153fe32ee2610f37c238fbce493174c89a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
import unittest
import time
import sys
import os
import signal
import socket
from subprocess import Popen, PIPE
from . import tools
from bottle import server_names, tob

try:
    from urllib.request import urlopen
except:
    from urllib2 import urlopen

serverscript = os.path.join(os.path.dirname(__file__), 'servertest.py')


def ping(server, port):
    ''' Check if a server accepts connections on a specific TCP port '''
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.connect((server, port))
        return True
    except socket.error:
        return False
    finally:
        s.close()


class TestServer(unittest.TestCase):
    server = 'wsgiref'
    skip   = False

    def setUp(self):
        self.skip = self.skip or 'fast' in sys.argv
        if self.skip: return
        # Find a free port
        for port in range(8800, 8900):
            self.port = port
            # Start servertest.py in a subprocess
            cmd = [sys.executable, serverscript, self.server, str(port)]
            cmd += sys.argv[1:] # pass cmdline arguments to subprocesses
            self.p = Popen(cmd, stdout=PIPE, stderr=PIPE)
            # Wait for the socket to accept connections
            for i in range(100):
                time.sleep(0.1)
                # Accepts connections?
                if ping('127.0.0.1', port): return
                # Server died for some reason...
                if not self.p.poll() is None: break
            rv = self.p.poll()
            if rv is None:
                raise AssertionError("Server took too long to start up.")
            if rv == 128: # Import error
                if os.environ.get('CI') != 'true' or \
                        os.environ.get('TRAVIS_PYTHON_VERSION') not in ('2.7', '3.6'):
                    tools.warn("Skipping %r test (ImportError)." % self.server)
                    self.skip = True
                    return
            if rv == 3: # Port in use
                continue
            raise AssertionError("Server exited with error code %d" % rv)
        raise AssertionError("Could not find a free port to test server.")

    def tearDown(self):
        if self.skip: return

        if self.p.poll() == None:
            os.kill(self.p.pid, signal.SIGINT)
            time.sleep(0.5)
        if self.p.poll() == None:
            os.kill(self.p.pid, signal.SIGTERM)
            time.sleep(0.5)
        while self.p.poll() == None:
            tools.warn("Trying to kill server %r with pid %d." %
                       (self.server, self.p.pid))
            os.kill(self.p.pid, signal.SIGKILL)
            time.sleep(1)

        lines = [line for stream in (self.p.stdout, self.p.stderr) for line in stream]
        for line in lines:
            if tob('warning') in line.lower():
               tools.warn(line.strip().decode('utf8'))
            elif tob('error') in line.lower():
                raise AssertionError(line.strip().decode('utf8'))

    def fetch(self, url):
        try:
            return urlopen('http://127.0.0.1:%d/%s' % (self.port, url), None, 5).read()
        except Exception as E:
            return repr(E)

    def test_simple(self):
        ''' Test a simple static page with this server adapter. '''
        if self.skip: return
        self.assertEqual(tob('OK'), self.fetch('test'))


blacklist = ['cgi', 'flup', 'gae', 'wsgiref', 'fapws3']

if sys.version_info.major == 2:
    blacklist += [
        'aiohttp',
        'uvloop',
    ]
else:
    blacklist += [
        'bjoern',
        'diesel',
        'flup',
        'gevent',
    ]


for name in set(server_names) - set(blacklist):
    classname = 'TestServerAdapter_'+name
    setattr(sys.modules[__name__], classname,
            type(classname, (TestServer,), {'server': name}))