diff options
author | Jason R. Coombs <jaraco@jaraco.com> | 2017-11-17 21:56:59 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-17 21:56:59 -0500 |
commit | 38a92a919634c4bb99120d19ecd0119a2d4e6911 (patch) | |
tree | 57ac23260e8e936a10ef2021fcae7c10bbc24401 | |
parent | b35b149df1d9d793d31cb7f979de01080e6cef06 (diff) | |
parent | 3787d447de0d4497952b0b6f462e478c90a8b996 (diff) | |
download | cherrypy-git-38a92a919634c4bb99120d19ecd0119a2d4e6911.tar.gz |
Merge branch 'master' into feature/1625-remove-timeouts
-rw-r--r-- | CHANGES.rst | 9 | ||||
-rw-r--r-- | cherrypy/test/__init__.py | 3 | ||||
-rw-r--r-- | cherrypy/test/helper.py | 3 | ||||
-rw-r--r-- | cherrypy/test/modwsgi.py | 4 | ||||
-rw-r--r-- | cherrypy/test/test_conn.py | 4 | ||||
-rw-r--r-- | cherrypy/test/webtest.py | 622 | ||||
-rwxr-xr-x | setup.py | 2 |
7 files changed, 21 insertions, 626 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index 3d9715dd..955068be 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -26,6 +26,15 @@ v12.0.0 ticket describing the use case, and we'll help devise a solution or bring the exception back. +v11.3.0 +------- + +* Bump to cheroot 5.9.0. + +* ``cherrypy.test.webtest`` module is now merged with the + ``cheroot.test.webtest`` module. The CherryPy name is retained + for now for compatibility and will be removed eventually. + v11.2.0 ------- diff --git a/cherrypy/test/__init__.py b/cherrypy/test/__init__.py index 068382be..884fca84 100644 --- a/cherrypy/test/__init__.py +++ b/cherrypy/test/__init__.py @@ -5,6 +5,9 @@ Regression test suite for CherryPy. import os import sys +# for compatibility, expose cheroot webtest here +webtest = __import__('cheroot.test.webtest') + def newexit(): os._exit(1) diff --git a/cherrypy/test/helper.py b/cherrypy/test/helper.py index 95eba01e..bdf74f86 100644 --- a/cherrypy/test/helper.py +++ b/cherrypy/test/helper.py @@ -15,11 +15,12 @@ import portend import pytest import six +from cheroot.test import webtest + import cherrypy from cherrypy._cpcompat import text_or_bytes, HTTPSConnection, ntob from cherrypy.lib import httputil from cherrypy.lib import gctools -from cherrypy.test import webtest log = logging.getLogger(__name__) thisdir = os.path.abspath(os.path.dirname(__file__)) diff --git a/cherrypy/test/modwsgi.py b/cherrypy/test/modwsgi.py index 1a563742..f558e223 100644 --- a/cherrypy/test/modwsgi.py +++ b/cherrypy/test/modwsgi.py @@ -39,8 +39,10 @@ import time import portend +from cheroot.test import webtest + import cherrypy -from cherrypy.test import helper, webtest +from cherrypy.test import helper curdir = os.path.abspath(os.path.dirname(__file__)) diff --git a/cherrypy/test/test_conn.py b/cherrypy/test/test_conn.py index 1b691d3d..06676286 100644 --- a/cherrypy/test/test_conn.py +++ b/cherrypy/test/test_conn.py @@ -11,9 +11,11 @@ from six.moves.http_client import BadStatusLine, HTTPConnection, NotConnected import pytest +from cheroot.test import webtest + import cherrypy from cherrypy._cpcompat import HTTPSConnection, ntob, tonative -from cherrypy.test import helper, webtest +from cherrypy.test import helper timeout = 1 diff --git a/cherrypy/test/webtest.py b/cherrypy/test/webtest.py deleted file mode 100644 index bf4b3fec..00000000 --- a/cherrypy/test/webtest.py +++ /dev/null @@ -1,622 +0,0 @@ -"""Extensions to unittest for web frameworks. - -Use the WebCase.getPage method to request a page from your HTTP server. - -Framework Integration -===================== - -If you have control over your server process, you can handle errors -in the server-side of the HTTP conversation a bit better. You must run -both the client (your WebCase tests) and the server in the same process -(but in separate threads, obviously). - -When an error occurs in the framework, call server_error. It will print -the traceback to stdout, and keep any assertions you have from running -(the assumption is that, if the server errors, the page output will not -be of further significance to your tests). -""" - -import pprint -import re -import socket -import sys -import time -import traceback -import types -import os -import json -import unittest - -import six -from six.moves.http_client import HTTPConnection - -from cherrypy._cpcompat import text_or_bytes, HTTPSConnection - - -def interface(host): - """Return an IP address for a client connection given the server host. - - If the server is listening on '0.0.0.0' (INADDR_ANY) - or '::' (IN6ADDR_ANY), this will return the proper localhost.""" - if host == '0.0.0.0': - # INADDR_ANY, which should respond on localhost. - return '127.0.0.1' - if host == '::': - # IN6ADDR_ANY, which should respond on localhost. - return '::1' - return host - - -class TerseTestResult(unittest._TextTestResult): - - def printErrors(self): - # Overridden to avoid unnecessary empty line - if self.errors or self.failures: - if self.dots or self.showAll: - self.stream.writeln() - self.printErrorList('ERROR', self.errors) - self.printErrorList('FAIL', self.failures) - - -class TerseTestRunner(unittest.TextTestRunner): - - """A test runner class that displays results in textual form.""" - - def _makeResult(self): - return TerseTestResult(self.stream, self.descriptions, self.verbosity) - - def run(self, test): - 'Run the given test case or test suite.' - # Overridden to remove unnecessary empty lines and separators - result = self._makeResult() - test(result) - result.printErrors() - if not result.wasSuccessful(): - self.stream.write('FAILED (') - failed, errored = list(map(len, (result.failures, result.errors))) - if failed: - self.stream.write('failures=%d' % failed) - if errored: - if failed: - self.stream.write(', ') - self.stream.write('errors=%d' % errored) - self.stream.writeln(')') - return result - - -class ReloadingTestLoader(unittest.TestLoader): - - def loadTestsFromName(self, name, module=None): - """Return a suite of all tests cases given a string specifier. - - The name may resolve either to a module, a test case class, a - test method within a test case class, or a callable object which - returns a TestCase or TestSuite instance. - - The method optionally resolves the names relative to a given module. - """ - parts = name.split('.') - unused_parts = [] - if module is None: - if not parts: - raise ValueError('incomplete test name: %s' % name) - else: - parts_copy = parts[:] - while parts_copy: - target = '.'.join(parts_copy) - if target in sys.modules: - module = six.moves.reload_module(sys.modules[target]) - parts = unused_parts - break - else: - try: - module = __import__(target) - parts = unused_parts - break - except ImportError: - unused_parts.insert(0, parts_copy[-1]) - del parts_copy[-1] - if not parts_copy: - raise - parts = parts[1:] - obj = module - for part in parts: - obj = getattr(obj, part) - - if isinstance(obj, types.ModuleType): - return self.loadTestsFromModule(obj) - elif ( - ( - (six.PY3 and isinstance(obj, type)) or - isinstance(obj, (type, types.ClassType)) - ) and - issubclass(obj, unittest.TestCase)): - return self.loadTestsFromTestCase(obj) - elif isinstance(obj, types.UnboundMethodType): - if six.PY3: - return obj.__self__.__class__(obj.__name__) - else: - return obj.im_class(obj.__name__) - elif hasattr(obj, '__call__'): - test = obj() - if not isinstance(test, unittest.TestCase) and \ - not isinstance(test, unittest.TestSuite): - raise ValueError('calling %s returned %s, ' - 'not a test' % (obj, test)) - return test - else: - raise ValueError('do not know how to make test from: %s' % obj) - - -try: - # Jython support - if sys.platform[:4] == 'java': - def getchar(): - # Hopefully this is enough - return sys.stdin.read(1) - else: - # On Windows, msvcrt.getch reads a single char without output. - import msvcrt - - def getchar(): - return msvcrt.getch() -except ImportError: - # Unix getchr - import tty - import termios - - def getchar(): - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(sys.stdin.fileno()) - ch = sys.stdin.read(1) - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - return ch - - -# from jaraco.properties -class NonDataProperty(object): - def __init__(self, fget): - assert fget is not None, 'fget cannot be none' - assert callable(fget), 'fget must be callable' - self.fget = fget - - def __get__(self, obj, objtype=None): - if obj is None: - return self - return self.fget(obj) - - -class WebCase(unittest.TestCase): - HOST = '127.0.0.1' - PORT = 8000 - HTTP_CONN = HTTPConnection - PROTOCOL = 'HTTP/1.1' - - scheme = 'http' - url = None - - status = None - headers = None - body = None - - encoding = 'utf-8' - - time = None - - def get_conn(self, auto_open=False): - """Return a connection to our HTTP server.""" - if self.scheme == 'https': - cls = HTTPSConnection - else: - cls = HTTPConnection - conn = cls(self.interface(), self.PORT) - # Automatically re-connect? - conn.auto_open = auto_open - conn.connect() - return conn - - def set_persistent(self, on=True, auto_open=False): - """Make our HTTP_CONN persistent (or not). - - If the 'on' argument is True (the default), then self.HTTP_CONN - will be set to an instance of HTTPConnection (or HTTPS - if self.scheme is "https"). This will then persist across requests. - - We only allow for a single open connection, so if you call this - and we currently have an open connection, it will be closed. - """ - try: - self.HTTP_CONN.close() - except (TypeError, AttributeError): - pass - - if on: - self.HTTP_CONN = self.get_conn(auto_open=auto_open) - else: - if self.scheme == 'https': - self.HTTP_CONN = HTTPSConnection - else: - self.HTTP_CONN = HTTPConnection - - def _get_persistent(self): - return hasattr(self.HTTP_CONN, '__class__') - - def _set_persistent(self, on): - self.set_persistent(on) - persistent = property(_get_persistent, _set_persistent) - - def interface(self): - """Return an IP address for a client connection. - - If the server is listening on '0.0.0.0' (INADDR_ANY) - or '::' (IN6ADDR_ANY), this will return the proper localhost.""" - return interface(self.HOST) - - def getPage(self, url, headers=None, method='GET', body=None, - protocol=None, raise_subcls=None): - """Open the url with debugging support. Return status, headers, body. - - `raise_subcls` must be a tuple with the exceptions classes - or a single exception class that are not going to be considered - a socket.error regardless that they were are subclass of a - socket.error and therefore not considered for a connection retry. - """ - ServerError.on = False - - if isinstance(url, six.text_type): - url = url.encode('utf-8') - if isinstance(body, six.text_type): - body = body.encode('utf-8') - - self.url = url - self.time = None - start = time.time() - result = openURL(url, headers, method, body, self.HOST, self.PORT, - self.HTTP_CONN, protocol or self.PROTOCOL, - raise_subcls) - self.time = time.time() - start - self.status, self.headers, self.body = result - - # Build a list of request cookies from the previous response cookies. - self.cookies = [('Cookie', v) for k, v in self.headers - if k.lower() == 'set-cookie'] - - if ServerError.on: - raise ServerError() - return result - - @NonDataProperty - def interactive(self): - """ - Load interactivity setting from environment, where - the value can be numeric or a string like true or - False or 1 or 0. - """ - env_str = os.environ.get('WEBTEST_INTERACTIVE', 'True') - return bool(json.loads(env_str.lower())) - - console_height = 30 - - def _handlewebError(self, msg): - print('') - print(' ERROR: %s' % msg) - - if not self.interactive: - raise self.failureException(msg) - - p = (' Show: ' - '[B]ody [H]eaders [S]tatus [U]RL; ' - '[I]gnore, [R]aise, or sys.e[X]it >> ') - sys.stdout.write(p) - sys.stdout.flush() - while True: - i = getchar().upper() - if not isinstance(i, type('')): - i = i.decode('ascii') - if i not in 'BHSUIRX': - continue - print(i.upper()) # Also prints new line - if i == 'B': - for x, line in enumerate(self.body.splitlines()): - if (x + 1) % self.console_height == 0: - # The \r and comma should make the next line overwrite - sys.stdout.write('<-- More -->\r') - m = getchar().lower() - # Erase our "More" prompt - sys.stdout.write(' \r') - if m == 'q': - break - print(line) - elif i == 'H': - pprint.pprint(self.headers) - elif i == 'S': - print(self.status) - elif i == 'U': - print(self.url) - elif i == 'I': - # return without raising the normal exception - return - elif i == 'R': - raise self.failureException(msg) - elif i == 'X': - self.exit() - sys.stdout.write(p) - sys.stdout.flush() - - def exit(self): - sys.exit() - - def assertStatus(self, status, msg=None): - """Fail if self.status != status.""" - if isinstance(status, text_or_bytes): - if not self.status == status: - if msg is None: - msg = 'Status (%r) != %r' % (self.status, status) - self._handlewebError(msg) - elif isinstance(status, int): - code = int(self.status[:3]) - if code != status: - if msg is None: - msg = 'Status (%r) != %r' % (self.status, status) - self._handlewebError(msg) - else: - # status is a tuple or list. - match = False - for s in status: - if isinstance(s, text_or_bytes): - if self.status == s: - match = True - break - elif int(self.status[:3]) == s: - match = True - break - if not match: - if msg is None: - msg = 'Status (%r) not in %r' % (self.status, status) - self._handlewebError(msg) - - def assertHeader(self, key, value=None, msg=None): - """Fail if (key, [value]) not in self.headers.""" - lowkey = key.lower() - for k, v in self.headers: - if k.lower() == lowkey: - if value is None or str(value) == v: - return v - - if msg is None: - if value is None: - msg = '%r not in headers' % key - else: - msg = '%r:%r not in headers' % (key, value) - self._handlewebError(msg) - - def assertHeaderIn(self, key, values, msg=None): - """Fail if header indicated by key doesn't have one of the values.""" - lowkey = key.lower() - for k, v in self.headers: - if k.lower() == lowkey: - matches = [value for value in values if str(value) == v] - if matches: - return matches - - if msg is None: - msg = '%(key)r not in %(values)r' % vars() - self._handlewebError(msg) - - def assertHeaderItemValue(self, key, value, msg=None): - """Fail if the header does not contain the specified value""" - actual_value = self.assertHeader(key, msg=msg) - header_values = map(str.strip, actual_value.split(',')) - if value in header_values: - return value - - if msg is None: - msg = '%r not in %r' % (value, header_values) - self._handlewebError(msg) - - def assertNoHeader(self, key, msg=None): - """Fail if key in self.headers.""" - lowkey = key.lower() - matches = [k for k, v in self.headers if k.lower() == lowkey] - if matches: - if msg is None: - msg = '%r in headers' % key - self._handlewebError(msg) - - def assertNoHeaderItemValue(self, key, value, msg=None): - """Fail if the header contains the specified value""" - lowkey = key.lower() - hdrs = self.headers - matches = [k for k, v in hdrs if k.lower() == lowkey and v == value] - if matches: - if msg is None: - msg = '%r:%r in %r' % (key, value, hdrs) - self._handlewebError(msg) - - def assertBody(self, value, msg=None): - """Fail if value != self.body.""" - if isinstance(value, six.text_type): - value = value.encode(self.encoding) - if value != self.body: - if msg is None: - msg = 'expected body:\n%r\n\nactual body:\n%r' % ( - value, self.body) - self._handlewebError(msg) - - def assertInBody(self, value, msg=None): - """Fail if value not in self.body.""" - if isinstance(value, six.text_type): - value = value.encode(self.encoding) - if value not in self.body: - if msg is None: - msg = '%r not in body: %s' % (value, self.body) - self._handlewebError(msg) - - def assertNotInBody(self, value, msg=None): - """Fail if value in self.body.""" - if isinstance(value, six.text_type): - value = value.encode(self.encoding) - if value in self.body: - if msg is None: - msg = '%r found in body' % value - self._handlewebError(msg) - - def assertMatchesBody(self, pattern, msg=None, flags=0): - """Fail if value (a regex pattern) is not in self.body.""" - if isinstance(pattern, six.text_type): - pattern = pattern.encode(self.encoding) - if re.search(pattern, self.body, flags) is None: - if msg is None: - msg = 'No match for %r in body' % pattern - self._handlewebError(msg) - - -methods_with_bodies = ('POST', 'PUT', 'PATCH') - - -def cleanHeaders(headers, method, body, host, port): - """Return request headers, with required headers added (if missing).""" - if headers is None: - headers = [] - - # Add the required Host request header if not present. - # [This specifies the host:port of the server, not the client.] - found = False - for k, v in headers: - if k.lower() == 'host': - found = True - break - if not found: - if port == 80: - headers.append(('Host', host)) - else: - headers.append(('Host', '%s:%s' % (host, port))) - - if method in methods_with_bodies: - # Stick in default type and length headers if not present - found = False - for k, v in headers: - if k.lower() == 'content-type': - found = True - break - if not found: - headers.append( - ('Content-Type', 'application/x-www-form-urlencoded')) - headers.append(('Content-Length', str(len(body or '')))) - - return headers - - -def shb(response): - """Return status, headers, body the way we like from a response.""" - if six.PY3: - h = response.getheaders() - else: - h = [] - key, value = None, None - for line in response.msg.headers: - if line: - if line[0] in ' \t': - value += line.strip() - else: - if key and value: - h.append((key, value)) - key, value = line.split(':', 1) - key = key.strip() - value = value.strip() - if key and value: - h.append((key, value)) - - return '%s %s' % (response.status, response.reason), h, response.read() - - -def openURL(url, headers=None, method='GET', body=None, - host='127.0.0.1', port=8000, http_conn=HTTPConnection, - protocol='HTTP/1.1', raise_subcls=None): - """ - Open the given HTTP resource and return status, headers, and body. - - `raise_subcls` must be a tuple with the exceptions classes - or a single exception class that are not going to be considered - a socket.error regardless that they were are subclass of a - socket.error and therefore not considered for a connection retry. - """ - headers = cleanHeaders(headers, method, body, host, port) - - # Trying 10 times is simply in case of socket errors. - # Normal case--it should run once. - for trial in range(10): - try: - # Allow http_conn to be a class or an instance - if hasattr(http_conn, 'host'): - conn = http_conn - else: - conn = http_conn(interface(host), port) - - conn._http_vsn_str = protocol - conn._http_vsn = int(''.join([x for x in protocol if x.isdigit()])) - - if six.PY3 and isinstance(url, bytes): - url = url.decode() - conn.putrequest(method.upper(), url, skip_host=True, - skip_accept_encoding=True) - - for key, value in headers: - conn.putheader(key, value.encode('Latin-1')) - conn.endheaders() - - if body is not None: - conn.send(body) - - # Handle response - response = conn.getresponse() - - s, h, b = shb(response) - - if not hasattr(http_conn, 'host'): - # We made our own conn instance. Close it. - conn.close() - - return s, h, b - except socket.error as e: - if raise_subcls is not None and isinstance(e, raise_subcls): - raise - else: - time.sleep(0.5) - if trial == 9: - raise - - -# Add any exceptions which your web framework handles -# normally (that you don't want server_error to trap). -ignored_exceptions = [] - -# You'll want set this to True when you can't guarantee -# that each response will immediately follow each request; -# for example, when handling requests via multiple threads. -ignore_all = False - - -class ServerError(Exception): - on = False - - -def server_error(exc=None): - """Server debug hook. Return True if exception handled, False if ignored. - - You probably want to wrap this, so you can still handle an error using - your framework when it's ignored. - """ - if exc is None: - exc = sys.exc_info() - - if ignore_all or exc[0] in ignored_exceptions: - return False - else: - ServerError.on = True - print('') - print(''.join(traceback.format_exception(*exc))) - return True @@ -58,7 +58,7 @@ packages = [ install_requires = [ 'six>=1.11.0', - 'cheroot>=5.8.3', + 'cheroot>=5.9', 'portend>=2.1.1', 'jaraco.classes', ] |