diff options
author | Joseph Tate <none@none> | 2010-02-24 15:47:23 +0000 |
---|---|---|
committer | Joseph Tate <none@none> | 2010-02-24 15:47:23 +0000 |
commit | 87cc005cbc16b53eee368f0a303bc8e91a41236f (patch) | |
tree | 810ba59c323a8a3bdb61aec08c5d6ed764d2d8a9 /cherrypy | |
parent | c2fbf4231ca5170b292b0b54baddb8a6293ff120 (diff) | |
download | cherrypy-git-87cc005cbc16b53eee368f0a303bc8e91a41236f.tar.gz |
Convert the tests to use nose instead of our own runner. This strips out much coverage and profiling (handled by nose) and lets you focus on writing tests.
The biggest changes that have to be done in the tests classes is you have to put the "setup_server" method on the class(es) that need them when running. If you need it for multiple classes, you can use staticmethod() to attach it to multiple classes without using inheritance.
Diffstat (limited to 'cherrypy')
41 files changed, 2484 insertions, 2851 deletions
diff --git a/cherrypy/__init__.py b/cherrypy/__init__.py index 9f2ba5d7..d10b86a9 100644 --- a/cherrypy/__init__.py +++ b/cherrypy/__init__.py @@ -67,40 +67,40 @@ class _AttributeDocstrings(type): """Metaclass for declaring docstrings for class attributes.""" # The full docstring for this type is down in the __init__ method so # that it doesn't show up in help() for every consumer class. - + def __init__(cls, name, bases, dct): '''Metaclass for declaring docstrings for class attributes. - + Base Python doesn't provide any syntax for setting docstrings on 'data attributes' (non-callables). This metaclass allows class definitions to follow the declaration of a data attribute with a docstring for that attribute; the attribute docstring will be popped from the class dict and folded into the class docstring. - + The naming convention for attribute docstrings is: <attrname> + "__doc". For example: - + class Thing(object): """A thing and its properties.""" - + __metaclass__ = cherrypy._AttributeDocstrings - + height = 50 height__doc = """The height of the Thing in inches.""" - + In which case, help(Thing) starts like this: - + >>> help(mod.Thing) Help on class Thing in module pkg.mod: - + class Thing(__builtin__.object) | A thing and its properties. - | + | | height [= 50]: | The height of the Thing in inches. - | - + | + The benefits of this approach over hand-edited class docstrings: 1. Places the docstring nearer to the attribute declaration. 2. Makes attribute docs more uniform ("name (default): doc"). @@ -108,25 +108,25 @@ class _AttributeDocstrings(type): the declaration and the documentation. 4. Reduces mismatches of attribute default _values_ between the declaration and the documentation. - + The benefits of a metaclass approach over other approaches: 1. Simpler ("less magic") than interface-based solutions. 2. __metaclass__ can be specified at the module global level for classic classes. - + For various formatting reasons, you should write multiline docs with a leading newline and not a trailing one: - + response__doc = """ The response object for the current thread. In the main thread, and any threads which are not HTTP requests, this is None.""" - + The type of the attribute is intentionally not included, because that's not How Python Works. Quack. ''' - + newdoc = [cls.__doc__ or ""] - + dctkeys = dct.keys() dctkeys.sort() for name in dctkeys: @@ -134,21 +134,21 @@ class _AttributeDocstrings(type): # Remove the magic doc attribute. if hasattr(cls, name): delattr(cls, name) - + # Make a uniformly-indented docstring from it. val = '\n'.join([' ' + line.strip() for line in dct[name].split('\n')]) - + # Get the default value. attrname = name[:-5] try: attrval = getattr(cls, attrname) except AttributeError: attrval = "missing" - + # Add the complete attribute docstring to our list. newdoc.append("%s [= %r]:\n%s" % (attrname, attrval, val)) - + # Add our list of new docstrings to the class docstring. cls.__doc__ = "\n\n".join(newdoc) @@ -182,20 +182,20 @@ except ImportError: # Timeout monitor class _TimeoutMonitor(process.plugins.Monitor): - + def __init__(self, bus): self.servings = [] process.plugins.Monitor.__init__(self, bus, self.run) - + def acquire(self): self.servings.append((serving.request, serving.response)) - + def release(self): try: self.servings.remove((serving.request, serving.response)) except ValueError: pass - + def run(self): """Check timeout on all responses. (Internal)""" for req, resp in self.servings: @@ -219,7 +219,7 @@ server.subscribe() def quickstart(root=None, script_name="", config=None): """Mount the given root, start the builtin server (and engine), then block. - + root: an instance of a "controller class" (a collection of page handler methods) which represents the root of the application. script_name: a string containing the "mount point" of the application. @@ -227,7 +227,7 @@ def quickstart(root=None, script_name="", config=None): at which to mount the given root. For example, if root.index() will handle requests to "http://www.example.com:8080/dept/app1/", then the script_name argument would be "/dept/app1". - + It MUST NOT end in a slash. If the script_name refers to the root of the URI, it MUST be an empty string (not "/"). config: a file or dict containing application config. If this contains @@ -236,14 +236,14 @@ def quickstart(root=None, script_name="", config=None): """ if config: _global_conf_alias.update(config) - + tree.mount(root, script_name, config) - + if hasattr(engine, "signal_handler"): engine.signal_handler.subscribe() if hasattr(engine, "console_control_handler"): engine.console_control_handler.subscribe() - + engine.start() engine.block() @@ -255,7 +255,7 @@ except ImportError: class _Serving(_local): """An interface for registering request and response objects. - + Rather than have a separate "thread local" object for the request and the response, this class works as a single threadlocal container for both objects (and any others which developers wish to define). In this @@ -263,24 +263,24 @@ class _Serving(_local): conversation, yet still refer to them as module-level globals in a thread-safe way. """ - + __metaclass__ = _AttributeDocstrings - + request = _cprequest.Request(_httputil.Host("127.0.0.1", 80), _httputil.Host("127.0.0.1", 1111)) request__doc = """ The request object for the current thread. In the main thread, and any threads which are not receiving HTTP requests, this is None.""" - + response = _cprequest.Response() response__doc = """ The response object for the current thread. In the main thread, and any threads which are not receiving HTTP requests, this is None.""" - + def load(self, request, response): self.request = request self.response = response - + def clear(self): """Remove all attributes of self.""" self.__dict__.clear() @@ -289,54 +289,54 @@ serving = _Serving() class _ThreadLocalProxy(object): - + __slots__ = ['__attrname__', '__dict__'] - + def __init__(self, attrname): self.__attrname__ = attrname - + def __getattr__(self, name): child = getattr(serving, self.__attrname__) return getattr(child, name) - + def __setattr__(self, name, value): if name in ("__attrname__", ): object.__setattr__(self, name, value) else: child = getattr(serving, self.__attrname__) setattr(child, name, value) - + def __delattr__(self, name): child = getattr(serving, self.__attrname__) delattr(child, name) - + def _get_dict(self): child = getattr(serving, self.__attrname__) d = child.__class__.__dict__.copy() d.update(child.__dict__) return d __dict__ = property(_get_dict) - + def __getitem__(self, key): child = getattr(serving, self.__attrname__) return child[key] - + def __setitem__(self, key, value): child = getattr(serving, self.__attrname__) child[key] = value - + def __delitem__(self, key): child = getattr(serving, self.__attrname__) del child[key] - + def __contains__(self, key): child = getattr(serving, self.__attrname__) return key in child - + def __len__(self): child = getattr(serving, self.__attrname__) return len(child) - + def __nonzero__(self): child = getattr(serving, self.__attrname__) return bool(child) @@ -375,7 +375,7 @@ except ImportError: from cherrypy import _cplogging class _GlobalLogManager(_cplogging.LogManager): - + def __call__(self, *args, **kwargs): # Do NOT use try/except here. See http://www.cherrypy.org/ticket/945 if hasattr(request, 'app') and hasattr(request.app, 'log'): @@ -383,7 +383,7 @@ class _GlobalLogManager(_cplogging.LogManager): else: log = self return log.error(*args, **kwargs) - + def access(self): try: return request.app.log.access() @@ -416,7 +416,7 @@ def expose(func=None, alias=None): for a in alias: parents[a.replace(".", "_")] = func return func - + import sys, types if isinstance(func, (types.FunctionType, types.MethodType)): if alias is None: @@ -447,23 +447,23 @@ def expose(func=None, alias=None): def url(path="", qs="", script_name=None, base=None, relative=None): """Create an absolute URL for the given path. - + If 'path' starts with a slash ('/'), this will return (base + script_name + path + qs). If it does not start with a slash, this returns (base + script_name [+ request.path_info] + path + qs). - + If script_name is None, cherrypy.request will be used to find a script_name, if available. - + If base is None, cherrypy.request.base will be used (if available). Note that you can use cherrypy.tools.proxy to change this. - + Finally, note that this function can be used to obtain an absolute URL for the current request path (minus the querystring) by passing no args. If you call url(qs=cherrypy.request.query_string), you should get the original browser URL (assuming no internal redirections). - + If relative is None or not provided, request.app.relative_urls will be used (if available, else False). If False, the output will be an absolute URL (including the scheme, host, vhost, and script_name). @@ -476,7 +476,7 @@ def url(path="", qs="", script_name=None, base=None, relative=None): qs = _urlencode(qs) if qs: qs = '?' + qs - + if request.app: if not path.startswith("/"): # Append/remove trailing slash from path_info as needed @@ -489,17 +489,17 @@ def url(path="", qs="", script_name=None, base=None, relative=None): elif request.is_index is False: if pi.endswith('/') and pi != '/': pi = pi[:-1] - + if path == "": path = pi else: path = _urljoin(pi, path) - + if script_name is None: script_name = request.script_name if base is None: base = request.base - + newurl = base + script_name + path + qs else: # No request.app (we're being called outside a request). @@ -508,10 +508,10 @@ def url(path="", qs="", script_name=None, base=None, relative=None): # if you're using vhosts or tools.proxy. if base is None: base = server.base() - + path = (script_name or "") + path newurl = base + path + qs - + if './' in newurl: # Normalize the URL by removing ./ and ../ atoms = [] @@ -523,12 +523,12 @@ def url(path="", qs="", script_name=None, base=None, relative=None): else: atoms.append(atom) newurl = '/'.join(atoms) - + # At this point, we should have a fully-qualified absolute URL. - + if relative is None: relative = getattr(request.app, "relative_urls", False) - + # See http://www.ietf.org/rfc/rfc2396.txt if relative == 'server': # "A relative reference beginning with a single slash character is @@ -548,7 +548,7 @@ def url(path="", qs="", script_name=None, base=None, relative=None): new.pop(0) new = (['..'] * len(old)) + new newurl = '/'.join(new) - + return newurl diff --git a/cherrypy/test/__init__.py b/cherrypy/test/__init__.py index eef14d44..7d27c148 100644 --- a/cherrypy/test/__init__.py +++ b/cherrypy/test/__init__.py @@ -3,3 +3,18 @@ Run test.py to exercise all tests. """ +import sys +def newexit(): + raise SystemExit('Exit called') + +def setup(): + # We want to monkey patch sys.exit so that we can get some + # information about where exit is being called. + newexit._old = sys.exit + sys.exit = newexit + +def teardown(): + try: + sys.exit = sys.exit._old + except AttributeError: + sys.exit = sys._exit diff --git a/cherrypy/test/test_states_demo.py b/cherrypy/test/_test_states_demo.py index 3f8f196c..3f8f196c 100644 --- a/cherrypy/test/test_states_demo.py +++ b/cherrypy/test/_test_states_demo.py diff --git a/cherrypy/test/helper.py b/cherrypy/test/helper.py index cdafd6fc..8b639086 100644 --- a/cherrypy/test/helper.py +++ b/cherrypy/test/helper.py @@ -28,48 +28,134 @@ import warnings import cherrypy from cherrypy.lib import httputil, profiler -from cherrypy.test import webtest +from cherrypy.test import test, webtest +import logging +log = logging.getLogger(__name__) class CPWebCase(webtest.WebCase): - script_name = "" scheme = "http" - + + available_servers = {'wsgi': test.LocalWSGISupervisor, + 'wsgi_u': test.get_wsgi_u_supervisor, + 'native': test.NativeServerSupervisor, + 'cpmodpy': test.get_cpmodpy_supervisor, + 'modpygw': test.get_modpygw_supervisor, + 'modwsgi': test.get_modwsgi_supervisor, + 'modfcgid': test.get_modfcgid_supervisor, + } + default_server = "wsgi" + supervisor_factory = None + + @classmethod + def _setup_server(cls, supervisor, conf): + v = sys.version.split()[0] + log.info("Python version used to run this test script: %s" % v) + log.info("CherryPy version: %s" % cherrypy.__version__) + if supervisor.scheme == "https": + ssl = " (ssl)" + else: + ssl = "" + log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl)) + log.info("PID: %s" % os.getpid()) + + cherrypy.server.using_apache = supervisor.using_apache + cherrypy.server.using_wsgi = supervisor.using_wsgi + + if isinstance(conf, basestring): + parser = cherrypy.lib.reprconf.Parser() + conf = parser.dict_from_file(conf).get('global', {}) + else: + conf = conf or {} + baseconf = conf.copy() + baseconf.update({'server.socket_host': supervisor.host, + 'server.socket_port': supervisor.port, + 'server.protocol_version': supervisor.protocol, + 'environment': "test_suite", + }) + if supervisor.scheme == "https": + baseconf['server.ssl_certificate'] = serverpem + baseconf['server.ssl_private_key'] = serverpem + + # helper must be imported lazily so the coverage tool + # can run against module-level statements within cherrypy. + # Also, we have to do "from cherrypy.test import helper", + # exactly like each test module does, because a relative import + # would stick a second instance of webtest in sys.modules, + # and we wouldn't be able to globally override the port anymore. + if supervisor.scheme == "https": + webtest.WebCase.HTTP_CONN = HTTPSConnection + return baseconf + + @classmethod + def setup_class(cls): + '' + #Creates a server + conf = test.get_tst_config() + if not cls.supervisor_factory: + cls.supervisor_factory = cls.available_servers.get(conf.get('server', 'wsgi')) + if cls.supervisor_factory is None: + raise RuntimeError('Unknown server in config: %s' % conf['server']) + supervisor = cls.supervisor_factory(**conf) + + #Copied from "run_test_suite" + cherrypy.config.reset() + baseconf = cls._setup_server(supervisor, conf) + cherrypy.config.update(baseconf) + setup_client() + + if hasattr(cls, 'setup_server'): + # Clear the cherrypy tree and clear the wsgi server so that + # it can be updated with the new root + cherrypy.tree = cherrypy._cptree.Tree() + cherrypy.server.httpserver = None + cls.setup_server() + supervisor.start(cls.__module__) + + cls.supervisor = supervisor + + + @classmethod + def teardown_class(cls): + '' + if hasattr(cls, 'setup_server'): + cls.supervisor.stop() + def prefix(self): return self.script_name.rstrip("/") - + def base(self): if ((self.scheme == "http" and self.PORT == 80) or (self.scheme == "https" and self.PORT == 443)): port = "" else: port = ":%s" % self.PORT - + return "%s://%s%s%s" % (self.scheme, self.HOST, port, self.script_name.rstrip("/")) - + def exit(self): sys.exit() - + def getPage(self, url, headers=None, method="GET", body=None, protocol=None): """Open the url. Return status, headers, body.""" if self.script_name: url = httputil.urljoin(self.script_name, url) return webtest.WebCase.getPage(self, url, headers, method, body, protocol) - + def skip(self, msg='skipped '): sys.stderr.write(msg) - + def assertErrorPage(self, status, message=None, pattern=''): """Compare the response body with a built in error page. - + The function will optionally look for the regexp pattern, within the exception embedded in the error page.""" - + # This will never contain a traceback page = cherrypy._cperror.get_error_page(status, message=message) - + # First, test the response body without checking the traceback. # Stick a match-all group (.*) in to grab the traceback. esc = re.escape @@ -80,7 +166,7 @@ class CPWebCase(webtest.WebCase): if not m: self._handlewebError('Error page does not match; expected:\n' + page) return - + # Now test the pattern against the traceback if pattern is None: # Special-case None to mean that there should be *no* traceback. @@ -92,14 +178,14 @@ class CPWebCase(webtest.WebCase): m.group(1))): msg = 'Error page does not contain %s in traceback' self._handlewebError(msg % repr(pattern)) - + date_tolerance = 2 - + def assertEqualDates(self, dt1, dt2, seconds=None): """Assert abs(dt1 - dt2) is within Y seconds.""" if seconds is None: seconds = self.date_tolerance - + if dt1 > dt2: diff = dt1 - dt2 else: @@ -109,76 +195,18 @@ class CPWebCase(webtest.WebCase): (dt1, dt2, seconds)) -CPTestLoader = webtest.ReloadingTestLoader() -CPTestRunner = webtest.TerseTestRunner(verbosity=2) - - -def run_test_suite(moduleNames, conf, supervisor): - """Run the given test modules using the given supervisor and [global] conf. - - The 'supervisor' arg should be an object with 'start' and 'stop' methods. - See test/test.py. - """ - # The Pybots automatic testing system needs the suite to exit - # with a non-zero value if there were any problems. - test_success = True - - for testmod in moduleNames: - cherrypy.config.reset() - cherrypy.config.update(conf) - setup_client(supervisor) - - if '.' in testmod: - package, test_name = testmod.rsplit('.', 1) - p = __import__(package, globals(), locals(), fromlist=[test_name]) - m = getattr(p, test_name) - else: - m = __import__(testmod, globals(), locals()) - suite = CPTestLoader.loadTestsFromName(testmod) - - setup = getattr(m, "setup_server", None) - if setup: supervisor.start(testmod) - try: - result = CPTestRunner.run(suite) - test_success &= result.wasSuccessful() - finally: - if setup: supervisor.stop() - - if test_success: - return 0 - else: - return 1 - -def setup_client(supervisor): +def setup_client(): """Set up the WebCase classes to match the server's socket settings.""" webtest.WebCase.PORT = cherrypy.server.socket_port webtest.WebCase.HOST = cherrypy.server.socket_host if cherrypy.server.ssl_certificate: CPWebCase.scheme = 'https' -def testmain(conf=None): - """Run __main__ as a test module, with webtest debugging.""" - # Comment me out to see ENGINE messages etc. when running a test standalone. - cherrypy.config.update({'environment': "test_suite"}) - cherrypy.server.socket_host = '127.0.0.1' - - from cherrypy.test.test import LocalWSGISupervisor - supervisor = LocalWSGISupervisor(host=cherrypy.server.socket_host, - port=cherrypy.server.socket_port) - setup_client(supervisor) - supervisor.start('__main__') - try: - return webtest.main() - finally: - supervisor.stop() - - - # --------------------------- Spawning helpers --------------------------- # class CPProcess(object): - + pid_file = os.path.join(thisdir, 'test.pid') config_file = os.path.join(thisdir, 'test.conf') config_template = """[global] @@ -193,14 +221,14 @@ log.access_file: r'%(access_log)s' """ error_log = os.path.join(thisdir, 'test.error.log') access_log = os.path.join(thisdir, 'test.access.log') - + def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None): self.wait = wait self.daemonize = daemonize self.ssl = ssl self.host = socket_host or cherrypy.server.socket_host self.port = socket_port or cherrypy.server.socket_port - + def write_conf(self, extra=""): if self.ssl: serverpem = os.path.join(thisdir, 'test.pem') @@ -210,7 +238,7 @@ server.ssl_private_key: r'%s' """ % (serverpem, serverpem) else: ssl = "" - + conf = self.config_template % { 'host': self.host, 'port': self.port, @@ -222,39 +250,39 @@ server.ssl_private_key: r'%s' f = open(self.config_file, 'wb') f.write(conf) f.close() - + def start(self, imports=None): """Start cherryd in a subprocess.""" cherrypy._cpserver.wait_for_free_port(self.host, self.port) - + args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'), '-c', self.config_file, '-p', self.pid_file] - + if not isinstance(imports, (list, tuple)): imports = [imports] for i in imports: if i: args.append('-i') args.append(i) - + if self.daemonize: args.append('-d') - + if self.wait: self.exit_code = os.spawnl(os.P_WAIT, sys.executable, *args) else: os.spawnl(os.P_NOWAIT, sys.executable, *args) cherrypy._cpserver.wait_for_occupied_port(self.host, self.port) - + # Give the engine a wee bit more time to finish STARTING if self.daemonize: time.sleep(2) else: time.sleep(1) - + def get_pid(self): return int(open(self.pid_file, 'rb').read()) - + def join(self): """Wait for the process to exit.""" try: diff --git a/cherrypy/test/native-server.ini b/cherrypy/test/native-server.ini new file mode 100644 index 00000000..b32d98dd --- /dev/null +++ b/cherrypy/test/native-server.ini @@ -0,0 +1,9 @@ +[supervisor] +scheme="http" +protocol="HTTP/1.1" +port= 8080 +host= "127.0.0.1" +profile= False +validate= False +conquer= False +server="wsgi" diff --git a/cherrypy/test/test.py b/cherrypy/test/test.py index 25484626..9adf6be4 100644 --- a/cherrypy/test/test.py +++ b/cherrypy/test/test.py @@ -17,175 +17,122 @@ localDir = os.path.dirname(__file__) serverpem = os.path.join(os.getcwd(), localDir, 'test.pem') import sys import warnings +import cherrypy +from cherrypy.test import webtest +from cherrypy.lib.reprconf import unrepr + +_testconfig = None + +def get_tst_config(overconf = {}): + global _testconfig + if _testconfig is None: + conf = { + 'scheme': 'http', + 'protocol': "HTTP/1.1", + 'port': 8080, + 'host': '127.0.0.1', + 'validate': False, + 'conquer': False, + } + try: + import testconfig + conf.update(testconfig.config.get('supervisor', None)) + except ImportError: + pass + _testconfig = conf + conf = _testconfig.copy() + conf.update(overconf) -from cherrypy.lib import profiler - - -class TestHarness(object): - """A test harness for the CherryPy framework and CherryPy applications.""" - - def __init__(self, supervisor, tests, interactive=True): - """Constructor to populate the TestHarness instance. - - tests should be a list of module names (strings). - """ - self.supervisor = supervisor - self.tests = tests - self.interactive = interactive - - def run(self, conf=None): - """Run the test harness (using the given [global] conf).""" - import cherrypy - v = sys.version.split()[0] - print("Python version used to run this test script: %s" % v) - print("CherryPy version: %s" % cherrypy.__version__) - if self.supervisor.scheme == "https": - ssl = " (ssl)" - else: - ssl = "" - print("HTTP server version: %s%s" % (self.supervisor.protocol, ssl)) - print("PID: %s" % os.getpid()) - print("") - - cherrypy.server.using_apache = self.supervisor.using_apache - cherrypy.server.using_wsgi = self.supervisor.using_wsgi - - if isinstance(conf, basestring): - parser = cherrypy.lib.reprconf.Parser() - conf = parser.dict_from_file(conf).get('global', {}) - else: - conf = conf or {} - baseconf = conf.copy() - baseconf.update({'server.socket_host': self.supervisor.host, - 'server.socket_port': self.supervisor.port, - 'server.protocol_version': self.supervisor.protocol, - 'environment': "test_suite", - }) - if self.supervisor.scheme == "https": - baseconf['server.ssl_certificate'] = serverpem - baseconf['server.ssl_private_key'] = serverpem - - # helper must be imported lazily so the coverage tool - # can run against module-level statements within cherrypy. - # Also, we have to do "from cherrypy.test import helper", - # exactly like each test module does, because a relative import - # would stick a second instance of webtest in sys.modules, - # and we wouldn't be able to globally override the port anymore. - from cherrypy.test import helper, webtest - webtest.WebCase.interactive = self.interactive - if self.supervisor.scheme == "https": - webtest.WebCase.HTTP_CONN = HTTPSConnection - print("") - print("Running tests: %s" % self.supervisor) - - return helper.run_test_suite(self.tests, baseconf, self.supervisor) - + for k, v in conf.iteritems(): + if isinstance(v, basestring): + conf[k] = unrepr(v) + return conf class Supervisor(object): """Base class for modeling and controlling servers during testing.""" - + def __init__(self, **kwargs): for k, v in kwargs.iteritems(): + if k == 'port': + setattr(self, k, int(v)) setattr(self, k, v) class LocalSupervisor(Supervisor): """Base class for modeling/controlling servers which run in the same process. - + When the server side runs in a different process, start/stop can dump all state between each test module easily. When the server side runs in the same process as the client, however, we have to do a bit more work to ensure config and mounted apps are reset between tests. """ - + using_apache = False using_wsgi = False - + def __init__(self, **kwargs): for k, v in kwargs.iteritems(): setattr(self, k, v) - - import cherrypy + cherrypy.server.httpserver = self.httpserver_class - + engine = cherrypy.engine if hasattr(engine, "signal_handler"): engine.signal_handler.subscribe() if hasattr(engine, "console_control_handler"): engine.console_control_handler.subscribe() #engine.subscribe('log', lambda msg, level: sys.stderr.write(msg + os.linesep)) - + def start(self, modulename=None): """Load and start the HTTP server.""" - import cherrypy - if modulename: - # Replace the Tree wholesale. - cherrypy.tree = cherrypy._cptree.Tree() - # Unhook httpserver so cherrypy.server.start() creates a new # one (with config from setup_server, if declared). cherrypy.server.httpserver = None - cherrypy.tree = cherrypy._cptree.Tree() - - if '.' in modulename: - package, test_name = modulename.rsplit('.', 1) - p = __import__(package, globals(), locals(), fromlist=[test_name]) - m = getattr(p, test_name) - else: - m = __import__(modulename, globals(), locals()) - setup = getattr(m, "setup_server", None) - if setup: - setup() - self.teardown = getattr(m, "teardown_server", None) - + cherrypy.engine.start() - + self.sync_apps() - + def sync_apps(self): """Tell the server about any apps which the setup functions mounted.""" pass - + def stop(self): - if self.teardown: - self.teardown() - import cherrypy + td = getattr(self, 'teardown', None) + if td: + td() cherrypy.engine.exit() class NativeServerSupervisor(LocalSupervisor): """Server supervisor for the builtin HTTP server.""" - + httpserver_class = "cherrypy._cpnative_server.CPHTTPServer" using_apache = False using_wsgi = False - + def __str__(self): return "Builtin HTTP Server on %s:%s" % (self.host, self.port) class LocalWSGISupervisor(LocalSupervisor): """Server supervisor for the builtin WSGI server.""" - + httpserver_class = "cherrypy._cpwsgi_server.CPWSGIServer" using_apache = False using_wsgi = True - + def __str__(self): return "Builtin WSGI Server on %s:%s" % (self.host, self.port) - + def sync_apps(self): """Hook a new WSGI app into the origin server.""" - import cherrypy cherrypy.server.httpserver.wsgi_app = self.get_app() - + def get_app(self): """Obtain a new (decorated) WSGI app to hook into the origin server.""" - import cherrypy app = cherrypy.tree - if self.profile: - app = profiler.make_app(app, aggregate=False) if self.conquer: try: import wsgiconq @@ -199,8 +146,9 @@ class LocalWSGISupervisor(LocalSupervisor): except ImportError: warnings.warn("Error importing wsgiref. The validator will not run.") else: + #wraps the app in the validator app = validate.validator(app) - + return app @@ -226,330 +174,5 @@ def get_modfcgid_supervisor(**options): return modfcgid.ModFCGISupervisor(**options) def get_wsgi_u_supervisor(**options): - import cherrypy cherrypy.server.wsgi_version = ('u', 0) return LocalWSGISupervisor(**options) - - -class CommandLineParser(object): - available_servers = {'wsgi': LocalWSGISupervisor, - 'wsgi_u': get_wsgi_u_supervisor, - 'native': NativeServerSupervisor, - 'cpmodpy': get_cpmodpy_supervisor, - 'modpygw': get_modpygw_supervisor, - 'modwsgi': get_modwsgi_supervisor, - 'modfcgid': get_modfcgid_supervisor, - } - default_server = "wsgi" - - supervisor_factory = None - supervisor_options = { - 'scheme': 'http', - 'protocol': "HTTP/1.1", - 'port': 8080, - 'host': '127.0.0.1', - 'profile': False, - 'validate': False, - 'conquer': False, - } - - cover = False - basedir = None - interactive = True - - shortopts = [] - longopts = ['cover', 'profile', 'validate', 'conquer', 'dumb', '1.0', - 'ssl', 'help', 'basedir=', 'port=', 'server=', 'host='] - - def __init__(self, available_tests, args=sys.argv[1:]): - """Constructor to populate the TestHarness instance. - - available_tests should be a list of module names (strings). - - args defaults to sys.argv[1:], but you can provide a different - set of args if you like. - """ - self.available_tests = available_tests - self.supervisor_options = self.supervisor_options.copy() - - longopts = self.longopts[:] - longopts.extend(self.available_tests) - try: - opts, args = getopt.getopt(args, self.shortopts, longopts) - except getopt.GetoptError: - # print help information and exit - self.help() - sys.exit(2) - - self.tests = [] - - for o, a in opts: - if o == '--help': - self.help() - sys.exit() - elif o == "--cover": - self.cover = True - elif o == "--profile": - self.supervisor_options['profile'] = True - elif o == "--validate": - self.supervisor_options['validate'] = True - elif o == "--conquer": - self.supervisor_options['conquer'] = True - elif o == "--dumb": - self.interactive = False - elif o == "--1.0": - self.supervisor_options['protocol'] = "HTTP/1.0" - elif o == "--ssl": - self.supervisor_options['scheme'] = "https" - elif o == "--basedir": - self.basedir = a - elif o == "--port": - self.supervisor_options['port'] = int(a) - elif o == "--host": - self.supervisor_options['host'] = a - elif o == "--server": - if a not in self.available_servers: - print('Error: The --server argument must be one of %s.' % - '|'.join(self.available_servers.keys())) - sys.exit(2) - self.supervisor_factory = self.available_servers[a] - else: - o = o[2:] - if o in self.available_tests and o not in self.tests: - self.tests.append(o) - - import cherrypy - if self.cover and self.supervisor_options['profile']: - # Print error message and exit - print('Error: you cannot run the profiler and the ' - 'coverage tool at the same time.') - sys.exit(2) - - if not self.supervisor_factory: - self.supervisor_factory = self.available_servers[self.default_server] - - if not self.tests: - self.tests = self.available_tests[:] - - def help(self): - """Print help for test.py command-line options.""" - - import cherrypy - print("""CherryPy Test Program - Usage: - test.py --help --server=* --host=%s --port=%s --1.0 --ssl --cover - --basedir=path --profile --validate --conquer --dumb --tests** - """ % (self.__class__.supervisor_options['host'], - self.__class__.supervisor_options['port'])) - print(' * servers:') - for name in self.available_servers: - if name == self.default_server: - print(' --server=%s (default)' % name) - else: - print(' --server=%s' % name) - - print(""" - --host=<name or IP addr>: use a host other than the default (%s). - Not yet available with mod_python servers. - --port=<int>: use a port other than the default (%s). - --1.0: use HTTP/1.0 servers instead of default HTTP/1.1. - - --cover: turn on code-coverage tool. - --basedir=path: display coverage stats for some path other than cherrypy. - - --profile: turn on WSGI profiling tool. - --validate: use wsgiref.validate (builtin in Python 2.5). - --conquer: use wsgiconq (which uses pyconquer) to trace calls. - --dumb: turn off the interactive output features. - """ % (self.__class__.supervisor_options['host'], - self.__class__.supervisor_options['port'])) - - print(' ** tests:') - for name in self.available_tests: - print(' --' + name) - - def start_coverage(self): - """Start the coverage tool. - - To use this feature, you need to download 'coverage.py', - either Gareth Rees' original implementation: - http://www.garethrees.org/2001/12/04/python-coverage/ - - or Ned Batchelder's enhanced version: - http://www.nedbatchelder.com/code/modules/coverage.html - - If neither module is found in PYTHONPATH, - coverage is silently(!) disabled. - """ - try: - from coverage import coverage - if self.basedir: - c = os.path.join(self.basedir, 'coverage.cache') - else: - c = os.path.join(os.path.dirname(__file__), "../lib/coverage.cache") - cov = coverage(data_file=c) - #start coverage before importing cherrypy - cov.erase() - cov.start() - import cherrypy - from cherrypy.lib import covercp - covercp.the_coverage = cov - cherrypy.engine.subscribe('start', covercp.start) - except ImportError: - cov = None - self.coverage = cov - - def stop_coverage(self): - """Stop the coverage tool, save results, and report.""" - import cherrypy - from cherrypy.lib import covercp - cherrypy.engine.unsubscribe('start', covercp.start) - if self.coverage: - self.coverage.save() - self.report_coverage() - print("run cherrypy/lib/covercp.py as a script to serve " - "coverage results on port 8080") - - def report_coverage(self): - """Print a summary from the code coverage tool.""" - - import cherrypy - basedir = self.basedir - if basedir is None: - # Assume we want to cover everything in "../../cherrypy/" - basedir = os.path.normpath(os.path.join(os.getcwd(), localDir, '../')) - else: - if not os.path.isabs(basedir): - basedir = os.path.normpath(os.path.join(os.getcwd(), basedir)) - basedir = basedir.lower() - - self.coverage.load() - morfs = [x for x in self.coverage.data.executed_files() - if x.lower().startswith(basedir)] - - total_statements = 0 - total_executed = 0 - - print("") - sys.stdout.write("CODE COVERAGE (this might take a while)") - for morf in morfs: - sys.stdout.write(".") - sys.stdout.flush() -## name = os.path.split(morf)[1] - if morf.find('test') != -1: - continue - try: - _, statements, _, missing, readable = self.coverage.analysis2(morf) - n = len(statements) - m = n - len(missing) - total_statements = total_statements + n - total_executed = total_executed + m - except KeyboardInterrupt: - raise - except: - # No, really! We truly want to ignore any other errors. - pass - - pc = 100.0 - if total_statements > 0: - pc = 100.0 * total_executed / total_statements - - print("\nTotal: %s Covered: %s Percent: %2d%%" - % (total_statements, total_executed, pc)) - - def run(self, conf=None): - """Run the test harness (using the given [global] conf).""" - conf = conf or {} - - # Start the coverage tool before importing cherrypy, - # so module-level global statements are covered. - if self.cover: - self.start_coverage() - - supervisor = self.supervisor_factory(**self.supervisor_options) - - if supervisor.using_apache and 'test_conn' in self.tests: - self.tests.remove('test_conn') - - h = TestHarness(supervisor, self.tests, self.interactive) - success = h.run(conf) - - if self.supervisor_options['profile']: - print("") - print("run /cherrypy/lib/profiler.py as a script to serve " - "profiling results on port 8080") - - if self.cover: - self.stop_coverage() - - return success - - -def prefer_parent_path(): - # Place this __file__'s grandparent (../../) at the start of sys.path, - # so that all cherrypy/* imports are from this __file__'s package. - curpath = os.path.normpath(os.path.join(os.getcwd(), localDir)) - grandparent = os.path.normpath(os.path.join(curpath, '../../')) - if grandparent not in sys.path: - sys.path.insert(0, grandparent) - -def run(): - - prefer_parent_path() - - testList = [ - 'test_auth_basic', - 'test_auth_digest', - 'test_bus', - 'test_proxy', - 'test_caching', - 'test_config', - 'test_conn', - 'test_core', - 'test_tools', - 'test_encoding', - 'test_etags', - 'test_http', - 'test_httpauth', - 'test_httplib', - 'test_json', - 'test_logging', - 'test_objectmapping', - 'test_dynamicobjectmapping', - 'test_misc_tools', - 'test_request_obj', - 'test_static', - 'test_tutorials', - 'test_virtualhost', - 'test_mime', - 'test_session', - 'test_sessionauthenticate', - 'test_states', - 'test_config_server', - 'test_xmlrpc', - 'test_wsgiapps', - 'test_wsgi_ns', - 'test_wsgi_vhost', - - # Run refleak test as late as possible to - # catch refleaks from all exercised tests. - 'test_refleaks', - ] - - try: - import routes - testList.append('test_routes') - except ImportError: - pass - - clp = CommandLineParser(testList) - success = clp.run() - import cherrypy - if clp.interactive: - print("") - raw_input('hit enter') - sys.exit(success) - - -if __name__ == '__main__': - run() diff --git a/cherrypy/test/test_auth_basic.py b/cherrypy/test/test_auth_basic.py index abf29bae..4eb8cb54 100644 --- a/cherrypy/test/test_auth_basic.py +++ b/cherrypy/test/test_auth_basic.py @@ -2,9 +2,6 @@ # -*- coding: utf-8 -*- # vim:ts=4:sw=4:expandtab:fileencoding=utf-8 -from cherrypy.test import test -test.prefer_parent_path() - try: from hashlib import md5 except ImportError: @@ -14,45 +11,46 @@ except ImportError: import cherrypy from cherrypy.lib import auth_basic -def setup_server(): - class Root: - def index(self): - return "This is public." - index.exposed = True - - class BasicProtected: - def index(self): - return "Hello %s, you've been authorized." % cherrypy.request.login - index.exposed = True - - class BasicProtected2: - def index(self): - return "Hello %s, you've been authorized." % cherrypy.request.login - index.exposed = True - - userpassdict = {'xuser' : 'xpassword'} - userhashdict = {'xuser' : md5('xpassword').hexdigest()} - - def checkpasshash(realm, user, password): - p = userhashdict.get(user) - return p and p == md5(password).hexdigest() or False - - conf = {'/basic': {'tools.auth_basic.on': True, - 'tools.auth_basic.realm': 'wonderland', - 'tools.auth_basic.checkpassword': auth_basic.checkpassword_dict(userpassdict)}, - '/basic2': {'tools.auth_basic.on': True, - 'tools.auth_basic.realm': 'wonderland', - 'tools.auth_basic.checkpassword': checkpasshash}, - } - - root = Root() - root.basic = BasicProtected() - root.basic2 = BasicProtected2() - cherrypy.tree.mount(root, config=conf) from cherrypy.test import helper class BasicAuthTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def index(self): + return "This is public." + index.exposed = True + + class BasicProtected: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + class BasicProtected2: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + userpassdict = {'xuser' : 'xpassword'} + userhashdict = {'xuser' : md5('xpassword').hexdigest()} + + def checkpasshash(realm, user, password): + p = userhashdict.get(user) + return p and p == md5(password).hexdigest() or False + + conf = {'/basic': {'tools.auth_basic.on': True, + 'tools.auth_basic.realm': 'wonderland', + 'tools.auth_basic.checkpassword': auth_basic.checkpassword_dict(userpassdict)}, + '/basic2': {'tools.auth_basic.on': True, + 'tools.auth_basic.realm': 'wonderland', + 'tools.auth_basic.checkpassword': checkpasshash}, + } + + root = Root() + root.basic = BasicProtected() + root.basic2 = BasicProtected2() + cherrypy.tree.mount(root, config=conf) def testPublic(self): self.getPage("/") diff --git a/cherrypy/test/test_auth_digest.py b/cherrypy/test/test_auth_digest.py index c5dbdcbb..ed1b44d7 100644 --- a/cherrypy/test/test_auth_digest.py +++ b/cherrypy/test/test_auth_digest.py @@ -3,47 +3,48 @@ # vim:ts=4:sw=4:expandtab:fileencoding=utf-8 from cherrypy.test import test -test.prefer_parent_path() + import cherrypy from cherrypy.lib import auth_digest -def setup_server(): - class Root: - def index(self): - return "This is public." - index.exposed = True - - class DigestProtected: - def index(self): - return "Hello %s, you've been authorized." % cherrypy.request.login - index.exposed = True - - def fetch_users(): - return {'test': 'test'} - - - get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(fetch_users()) - conf = {'/digest': {'tools.auth_digest.on': True, - 'tools.auth_digest.realm': 'localhost', - 'tools.auth_digest.get_ha1': get_ha1, - 'tools.auth_digest.key': 'a565c27146791cfb', - 'tools.auth_digest.debug': 'True'}} - - root = Root() - root.digest = DigestProtected() - cherrypy.tree.mount(root, config=conf) - from cherrypy.test import helper class DigestAuthTest(helper.CPWebCase): - def testPublic(self): - self.getPage("/") - self.assertStatus('200 OK') - self.assertHeader('Content-Type', 'text/html;charset=utf-8') - self.assertBody('This is public.') + @staticmethod + def setup_server(): + class Root: + def index(self): + return "This is public." + index.exposed = True + + class DigestProtected: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + def fetch_users(): + return {'test': 'test'} + + + get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(fetch_users()) + conf = {'/digest': {'tools.auth_digest.on': True, + 'tools.auth_digest.realm': 'localhost', + 'tools.auth_digest.get_ha1': get_ha1, + 'tools.auth_digest.key': 'a565c27146791cfb', + 'tools.auth_digest.debug': 'True'}} + + root = Root() + root.digest = DigestProtected() + cherrypy.tree.mount(root, config=conf) + + def testPublic(self): + self.getPage("/") + self.assertStatus('200 OK') + self.assertHeader('Content-Type', 'text/html;charset=utf-8') + self.assertBody('This is public.') def testDigest(self): self.getPage("/digest/") diff --git a/cherrypy/test/test_bus.py b/cherrypy/test/test_bus.py index aab17819..7705830d 100644 --- a/cherrypy/test/test_bus.py +++ b/cherrypy/test/test_bus.py @@ -1,5 +1,4 @@ from cherrypy.test import test -test.prefer_parent_path() try: set @@ -22,85 +21,85 @@ class PublishSubscribeTests(unittest.TestCase): def listener(arg=None): self.responses.append(msg % (index, channel, arg)) return listener - + def test_builtin_channels(self): b = wspbus.Bus() - + self.responses, expected = [], [] - + for channel in b.listeners: for index, priority in enumerate([100, 50, 0, 51]): b.subscribe(channel, self.get_listener(channel, index), priority) - + for channel in b.listeners: b.publish(channel) expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)]) b.publish(channel, arg=79347) expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)]) - + self.assertEqual(self.responses, expected) - + def test_custom_channels(self): b = wspbus.Bus() - + self.responses, expected = [], [] - + custom_listeners = ('hugh', 'louis', 'dewey') for channel in custom_listeners: for index, priority in enumerate([None, 10, 60, 40]): b.subscribe(channel, self.get_listener(channel, index), priority) - + for channel in custom_listeners: b.publish(channel, 'ah so') expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)]) b.publish(channel) expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)]) - + self.assertEqual(self.responses, expected) - + def test_listener_errors(self): b = wspbus.Bus() - + self.responses, expected = [], [] channels = [c for c in b.listeners if c != 'log'] - + for channel in channels: b.subscribe(channel, self.get_listener(channel, 1)) # This will break since the lambda takes no args. b.subscribe(channel, lambda: None, priority=20) - + for channel in channels: self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123) expected.append(msg % (1, channel, 123)) - + self.assertEqual(self.responses, expected) class BusMethodTests(unittest.TestCase): - + def log(self, bus): self._log_entries = [] def logit(msg, level): self._log_entries.append(msg) bus.subscribe('log', logit) - + def assertLog(self, entries): self.assertEqual(self._log_entries, entries) - + def get_listener(self, channel, index): def listener(arg=None): self.responses.append(msg % (index, channel, arg)) return listener - + def test_start(self): b = wspbus.Bus() self.log(b) - + self.responses = [] num = 3 for index in range(num): b.subscribe('start', self.get_listener('start', index)) - + b.start() try: # The start method MUST call all 'start' listeners. @@ -114,18 +113,18 @@ class BusMethodTests(unittest.TestCase): finally: # Exit so the atexit handler doesn't complain. b.exit() - + def test_stop(self): b = wspbus.Bus() self.log(b) - + self.responses = [] num = 3 for index in range(num): b.subscribe('stop', self.get_listener('stop', index)) - + b.stop() - + # The stop method MUST call all 'stop' listeners. self.assertEqual(set(self.responses), set([msg % (i, 'stop', None) for i in range(num)])) @@ -133,36 +132,36 @@ class BusMethodTests(unittest.TestCase): self.assertEqual(b.state, b.states.STOPPED) # The stop method MUST log its states. self.assertLog(['Bus STOPPING', 'Bus STOPPED']) - + def test_graceful(self): b = wspbus.Bus() self.log(b) - + self.responses = [] num = 3 for index in range(num): b.subscribe('graceful', self.get_listener('graceful', index)) - + b.graceful() - + # The graceful method MUST call all 'graceful' listeners. self.assertEqual(set(self.responses), set([msg % (i, 'graceful', None) for i in range(num)])) # The graceful method MUST log its states. self.assertLog(['Bus graceful']) - + def test_exit(self): b = wspbus.Bus() self.log(b) - + self.responses = [] num = 3 for index in range(num): b.subscribe('stop', self.get_listener('stop', index)) b.subscribe('exit', self.get_listener('exit', index)) - + b.exit() - + # The exit method MUST call all 'stop' listeners, # and then all 'exit' listeners. self.assertEqual(set(self.responses), @@ -172,14 +171,14 @@ class BusMethodTests(unittest.TestCase): self.assertEqual(b.state, b.states.EXITING) # The exit method MUST log its states. self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED']) - + def test_wait(self): b = wspbus.Bus() - + def f(method): time.sleep(0.2) getattr(b, method)() - + for method, states in [('start', [b.states.STARTED]), ('stop', [b.states.STOPPED]), ('start', [b.states.STARTING, b.states.STARTED]), @@ -187,15 +186,15 @@ class BusMethodTests(unittest.TestCase): ]: threading.Thread(target=f, args=(method,)).start() b.wait(states) - + # The wait method MUST wait for the given state(s). if b.state not in states: self.fail("State %r not in %r" % (b.state, states)) - + def test_block(self): b = wspbus.Bus() self.log(b) - + def f(): time.sleep(0.2) b.exit() @@ -204,9 +203,9 @@ class BusMethodTests(unittest.TestCase): threading.Thread(target=f).start() threading.Thread(target=g).start() self.assertEqual(len(threading.enumerate()), 3) - + b.block() - + # The block method MUST wait for the EXITING state. self.assertEqual(b.state, b.states.EXITING) # The block method MUST wait for ALL non-main threads to finish. @@ -214,7 +213,7 @@ class BusMethodTests(unittest.TestCase): self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED', 'Waiting for child threads to terminate...']) - + def test_start_with_callback(self): b = wspbus.Bus() self.log(b) @@ -228,26 +227,26 @@ class BusMethodTests(unittest.TestCase): b.start_with_callback(f, (1, 3, 5), {"foo": "bar"}) # Give wait() time to run f() time.sleep(0.2) - + # The callback method MUST wait for the STARTED state. self.assertEqual(b.state, b.states.STARTED) # The callback method MUST run after all start methods. self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})]) finally: b.exit() - + def test_log(self): b = wspbus.Bus() self.log(b) self.assertLog([]) - + # Try a normal message. expected = [] for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]: b.log(msg) expected.append(msg) self.assertLog(expected) - + # Try an error message try: foo diff --git a/cherrypy/test/test_caching.py b/cherrypy/test/test_caching.py index 68dbbdeb..89a759b2 100644 --- a/cherrypy/test/test_caching.py +++ b/cherrypy/test/test_caching.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + import datetime import gzip @@ -19,110 +19,112 @@ gif_bytes = ('GIF89a\x01\x00\x01\x00\x82\x00\x01\x99"\x1e\x00\x00\x00\x00\x00' '\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x02\x03\x02\x08\t\x00;') -def setup_server(): - - class Root: - - _cp_config = {'tools.caching.on': True} - - def __init__(self): - self.counter = 0 - self.control_counter = 0 - self.longlock = threading.Lock() - - def index(self): - self.counter += 1 - msg = "visit #%s" % self.counter - return msg - index.exposed = True - - def control(self): - self.control_counter += 1 - return "visit #%s" % self.control_counter - control.exposed = True - - def a_gif(self): - cherrypy.response.headers['Last-Modified'] = httputil.HTTPDate() - return gif_bytes - a_gif.exposed = True - - def long_process(self, seconds='1'): - try: - self.longlock.acquire() - time.sleep(float(seconds)) - finally: - self.longlock.release() - return 'success!' - long_process.exposed = True - - def clear_cache(self, path): - cherrypy._cache.store[cherrypy.request.base + path].clear() - clear_cache.exposed = True - - class VaryHeaderCachingServer(object): - - _cp_config = {'tools.caching.on': True, - 'tools.response_headers.on': True, - 'tools.response_headers.headers': [('Vary', 'Our-Varying-Header')], - } - - def __init__(self): - self.counter = count(1) - - def index(self): - return "visit #%s" % self.counter.next() - index.exposed = True - - class UnCached(object): - _cp_config = {'tools.expires.on': True, - 'tools.expires.secs': 60, - 'tools.staticdir.on': True, - 'tools.staticdir.dir': 'static', - 'tools.staticdir.root': curdir, - } - def force(self): - cherrypy.response.headers['Etag'] = 'bibbitybobbityboo' - self._cp_config['tools.expires.force'] = True - self._cp_config['tools.expires.secs'] = 0 - return "being forceful" - force.exposed = True - force._cp_config = {'tools.expires.secs': 0} +from cherrypy.test import helper + +class CacheTest(helper.CPWebCase): - def dynamic(self): - cherrypy.response.headers['Etag'] = 'bibbitybobbityboo' - cherrypy.response.headers['Cache-Control'] = 'private' - return "D-d-d-dynamic!" - dynamic.exposed = True + @staticmethod + def setup_server(): + + class Root: + + _cp_config = {'tools.caching.on': True} + + def __init__(self): + self.counter = 0 + self.control_counter = 0 + self.longlock = threading.Lock() + + def index(self): + self.counter += 1 + msg = "visit #%s" % self.counter + return msg + index.exposed = True + + def control(self): + self.control_counter += 1 + return "visit #%s" % self.control_counter + control.exposed = True + + def a_gif(self): + cherrypy.response.headers['Last-Modified'] = httputil.HTTPDate() + return gif_bytes + a_gif.exposed = True + + def long_process(self, seconds='1'): + try: + self.longlock.acquire() + time.sleep(float(seconds)) + finally: + self.longlock.release() + return 'success!' + long_process.exposed = True + + def clear_cache(self, path): + cherrypy._cache.store[cherrypy.request.base + path].clear() + clear_cache.exposed = True + + class VaryHeaderCachingServer(object): + + _cp_config = {'tools.caching.on': True, + 'tools.response_headers.on': True, + 'tools.response_headers.headers': [('Vary', 'Our-Varying-Header')], + } + + def __init__(self): + self.counter = count(1) + + def index(self): + return "visit #%s" % self.counter.next() + index.exposed = True + + class UnCached(object): + _cp_config = {'tools.expires.on': True, + 'tools.expires.secs': 60, + 'tools.staticdir.on': True, + 'tools.staticdir.dir': 'static', + 'tools.staticdir.root': curdir, + } - def cacheable(self): - cherrypy.response.headers['Etag'] = 'bibbitybobbityboo' - return "Hi, I'm cacheable." - cacheable.exposed = True + def force(self): + cherrypy.response.headers['Etag'] = 'bibbitybobbityboo' + self._cp_config['tools.expires.force'] = True + self._cp_config['tools.expires.secs'] = 0 + return "being forceful" + force.exposed = True + force._cp_config = {'tools.expires.secs': 0} - def specific(self): - cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable' - return "I am being specific" - specific.exposed = True - specific._cp_config = {'tools.expires.secs': 86400} + def dynamic(self): + cherrypy.response.headers['Etag'] = 'bibbitybobbityboo' + cherrypy.response.headers['Cache-Control'] = 'private' + return "D-d-d-dynamic!" + dynamic.exposed = True - class Foo(object):pass - - def wrongtype(self): - cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable' - return "Woops" - wrongtype.exposed = True - wrongtype._cp_config = {'tools.expires.secs': Foo()} - - cherrypy.tree.mount(Root()) - cherrypy.tree.mount(UnCached(), "/expires") - cherrypy.tree.mount(VaryHeaderCachingServer(), "/varying_headers") - cherrypy.config.update({'tools.gzip.on': True}) + def cacheable(self): + cherrypy.response.headers['Etag'] = 'bibbitybobbityboo' + return "Hi, I'm cacheable." + cacheable.exposed = True + def specific(self): + cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable' + return "I am being specific" + specific.exposed = True + specific._cp_config = {'tools.expires.secs': 86400} -from cherrypy.test import helper + class Foo(object):pass + + def wrongtype(self): + cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable' + return "Woops" + wrongtype.exposed = True + wrongtype._cp_config = {'tools.expires.secs': Foo()} + + cherrypy.tree.mount(Root()) + cherrypy.tree.mount(UnCached(), "/expires") + cherrypy.tree.mount(VaryHeaderCachingServer(), "/varying_headers") + cherrypy.config.update({'tools.gzip.on': True}) -class CacheTest(helper.CPWebCase): def testCaching(self): elapsed = 0.0 diff --git a/cherrypy/test/test_config.py b/cherrypy/test/test_config.py index 7e5a47d0..a28170ae 100644 --- a/cherrypy/test/test_config.py +++ b/cherrypy/test/test_config.py @@ -1,7 +1,6 @@ """Tests for the CherryPy configuration system.""" from cherrypy.test import test -test.prefer_parent_path() import os, sys localDir = os.path.join(os.getcwd(), os.path.dirname(__file__)) @@ -15,67 +14,67 @@ import unittest import cherrypy def setup_server(): - + class Root: - + _cp_config = {'foo': 'this', 'bar': 'that'} - + def __init__(self): cherrypy.config.namespaces['db'] = self.db_namespace - + def db_namespace(self, k, v): if k == "scheme": self.db = v - + # @cherrypy.expose(alias=('global_', 'xyz')) def index(self, key): return cherrypy.request.config.get(key, "None") index = cherrypy.expose(index, alias=('global_', 'xyz')) - + def repr(self, key): return repr(cherrypy.request.config.get(key, None)) repr.exposed = True - + def dbscheme(self): return self.db dbscheme.exposed = True - + def plain(self, x): return x plain.exposed = True plain._cp_config = {'request.body.attempt_charsets': ['utf-16']} - + favicon_ico = cherrypy.tools.staticfile.handler( filename=os.path.join(localDir, '../favicon.ico')) - + class Foo: - + _cp_config = {'foo': 'this2', 'baz': 'that2'} - + def index(self, key): return cherrypy.request.config.get(key, "None") index.exposed = True nex = index - + def silly(self): return 'Hello world' silly.exposed = True silly._cp_config = {'response.headers.X-silly': 'sillyval'} - + def bar(self, key): return repr(cherrypy.request.config.get(key, None)) bar.exposed = True bar._cp_config = {'foo': 'this3', 'bax': 'this4'} - + class Another: - + def index(self, key): return str(cherrypy.request.config.get(key, "None")) index.exposed = True - - + + def raw_namespace(key, value): if key == 'input.map': handler = cherrypy.request.handler @@ -94,16 +93,16 @@ def setup_server(): # 'value' is a type (like int or str). return value(handler()) cherrypy.request.handler = wrapper - + class Raw: - + _cp_config = {'raw.output': repr} - + def incr(self, num): return num + 1 incr.exposed = True incr._cp_config = {'raw.input.map': {'num': int}} - + ioconf = StringIO(""" [/] neg: -1234 @@ -118,13 +117,13 @@ stradd: %%(ones)s + %%(twos)s + "33" [/favicon.ico] tools.staticfile.filename = %r """ % os.path.join(localDir, 'static/dirback.jpg')) - + root = Root() root.foo = Foo() root.raw = Raw() app = cherrypy.tree.mount(root, config=ioconf) app.request_class.namespaces['raw'] = raw_namespace - + cherrypy.tree.mount(Another(), "/another") cherrypy.config.update({'luxuryyacht': 'throatwobblermangrove', 'db.scheme': r"sqlite///memory", @@ -136,7 +135,8 @@ tools.staticfile.filename = %r from cherrypy.test import helper class ConfigTests(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def testConfig(self): tests = [ ('/', 'nex', 'None'), @@ -154,7 +154,7 @@ class ConfigTests(helper.CPWebCase): for path, key, expected in tests: self.getPage(path + "?key=" + key) self.assertBody(expected) - + expectedconf = { # From CP defaults 'tools.log_headers.on': False, @@ -176,27 +176,27 @@ class ConfigTests(helper.CPWebCase): for key, expected in expectedconf.items(): self.getPage("/foo/bar?key=" + key) self.assertBody(repr(expected)) - + def testUnrepr(self): self.getPage("/repr?key=neg") self.assertBody("-1234") - + self.getPage("/repr?key=filename") self.assertBody(repr(os.path.join(sys.prefix, "hello.py"))) - + self.getPage("/repr?key=thing1") self.assertBody(repr(cherrypy.lib.httputil.response_codes[404])) - + if not getattr(cherrypy.server, "using_apache", False): # The object ID's won't match up when using Apache, since the # server and client are running in different processes. self.getPage("/repr?key=thing2") from cherrypy.tutorial import thing2 self.assertBody(repr(thing2)) - + self.getPage("/repr?key=complex") self.assertBody("(3+2j)") - + self.getPage("/repr?key=stradd") self.assertBody(repr("112233")) @@ -204,14 +204,14 @@ class ConfigTests(helper.CPWebCase): self.getPage("/foo/silly") self.assertHeader('X-silly', 'sillyval') self.assertBody('Hello world') - + def testCustomNamespaces(self): self.getPage("/raw/incr?num=12") self.assertBody("13") - + self.getPage("/dbscheme") self.assertBody(r"sqlite///memory") - + def testHandlerToolConfigOverride(self): # Assert that config overrides tool constructor args. Above, we set # the favicon in the page handler to be '../favicon.ico', @@ -219,7 +219,7 @@ class ConfigTests(helper.CPWebCase): self.getPage("/favicon.ico") self.assertBody(open(os.path.join(localDir, "static/dirback.jpg"), "rb").read()) - + def test_request_body_namespace(self): self.getPage("/plain", method='POST', headers=[ ('Content-Type', 'application/x-www-form-urlencoded'), @@ -229,10 +229,11 @@ class ConfigTests(helper.CPWebCase): class VariableSubstitutionTests(unittest.TestCase): - + setup_server = staticmethod(setup_server) + def test_config(self): from textwrap import dedent - + # variable substitution with [DEFAULT] conf = dedent(""" [DEFAULT] diff --git a/cherrypy/test/test_config_server.py b/cherrypy/test/test_config_server.py index 0afa19e6..e5cc7164 100644 --- a/cherrypy/test/test_config_server.py +++ b/cherrypy/test/test_config_server.py @@ -1,7 +1,7 @@ """Tests for the CherryPy configuration system.""" from cherrypy.test import test -test.prefer_parent_path() + import os, sys localDir = os.path.join(os.getcwd(), os.path.dirname(__file__)) @@ -10,47 +10,47 @@ import time import cherrypy -def setup_server(): - - class Root: - def index(self): - return cherrypy.request.wsgi_environ['SERVER_PORT'] - index.exposed = True - - def upload(self, file): - return "Size: %s" % len(file.file.read()) - upload.exposed = True - - def tinyupload(self): - return cherrypy.request.body.read() - tinyupload.exposed = True - tinyupload._cp_config = {'request.body.maxbytes': 100} - - cherrypy.tree.mount(Root()) - - cherrypy.config.update({ - 'server.socket_host': '0.0.0.0', - 'server.socket_port': 9876, - 'server.max_request_body_size': 200, - 'server.max_request_header_size': 500, - 'server.socket_timeout': 0.5, - - # Test explicit server.instance - 'server.2.instance': 'cherrypy._cpwsgi_server.CPWSGIServer', - 'server.2.socket_port': 9877, - - # Test non-numeric <servername> - # Also test default server.instance = builtin server - 'server.yetanother.socket_port': 9878, - }) - # Client-side code # from cherrypy.test import helper class ServerConfigTests(helper.CPWebCase): - + @staticmethod + def setup_server(): + + class Root: + def index(self): + return cherrypy.request.wsgi_environ['SERVER_PORT'] + index.exposed = True + + def upload(self, file): + return "Size: %s" % len(file.file.read()) + upload.exposed = True + + def tinyupload(self): + return cherrypy.request.body.read() + tinyupload.exposed = True + tinyupload._cp_config = {'request.body.maxbytes': 100} + + cherrypy.tree.mount(Root()) + + cherrypy.config.update({ + 'server.socket_host': '0.0.0.0', + 'server.socket_port': 9876, + 'server.max_request_body_size': 200, + 'server.max_request_header_size': 500, + 'server.socket_timeout': 0.5, + + # Test explicit server.instance + 'server.2.instance': 'cherrypy._cpwsgi_server.CPWSGIServer', + 'server.2.socket_port': 9877, + + # Test non-numeric <servername> + # Also test default server.instance = builtin server + 'server.yetanother.socket_port': 9878, + }) + PORT = 9876 def testBasicConfig(self): diff --git a/cherrypy/test/test_conn.py b/cherrypy/test/test_conn.py index e2774bb5..92c71470 100644 --- a/cherrypy/test/test_conn.py +++ b/cherrypy/test/test_conn.py @@ -1,7 +1,6 @@ """Tests for TCP connection handling, including proper and timely close.""" from cherrypy.test import test -test.prefer_parent_path() from httplib import HTTPConnection, HTTPSConnection, NotConnected, BadStatusLine import urllib @@ -19,55 +18,55 @@ from cherrypy import _cperror pov = 'pPeErRsSiIsStTeEnNcCeE oOfF vViIsSiIoOnN' def setup_server(): - + def raise500(): raise cherrypy.HTTPError(500) - + class Root: - + def index(self): return pov index.exposed = True page1 = index page2 = index page3 = index - + def hello(self): return "Hello, world!" hello.exposed = True - + def timeout(self, t): return str(cherrypy.server.httpserver.timeout) timeout.exposed = True - + def stream(self, set_cl=False): if set_cl: cherrypy.response.headers['Content-Length'] = 10 - + def content(): for x in range(10): yield str(x) - + return content() stream.exposed = True stream._cp_config = {'response.stream': True} - + def error(self, code=500): raise cherrypy.HTTPError(code) error.exposed = True - + def upload(self): if not cherrypy.request.method == 'POST': raise AssertionError("'POST' != request.method %r" % cherrypy.request.method) return "thanks for '%s'" % cherrypy.request.body.read() upload.exposed = True - + def custom(self, response_code): cherrypy.response.status = response_code return "Code = %s" % response_code custom.exposed = True - + def err_before_read(self): return "ok" err_before_read.exposed = True @@ -76,7 +75,7 @@ def setup_server(): def one_megabyte_of_a(self): return ["a" * 1024] * 1024 one_megabyte_of_a.exposed = True - + cherrypy.tree.mount(Root()) cherrypy.config.update({ 'server.max_request_body_size': 1001, @@ -87,54 +86,55 @@ def setup_server(): from cherrypy.test import helper class ConnectionCloseTests(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def test_HTTP11(self): if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + self.persistent = True - + # Make the first request and assert there's no "Connection: close". self.getPage("/") self.assertStatus('200 OK') self.assertBody(pov) self.assertNoHeader("Connection") - + # Make another request on the same connection. self.getPage("/page1") self.assertStatus('200 OK') self.assertBody(pov) self.assertNoHeader("Connection") - + # Test client-side close. self.getPage("/page2", headers=[("Connection", "close")]) self.assertStatus('200 OK') self.assertBody(pov) self.assertHeader("Connection", "close") - + # Make another request on the same connection, which should error. self.assertRaises(NotConnected, self.getPage, "/") - + def test_Streaming_no_len(self): self._streaming(set_cl=False) - + def test_Streaming_with_len(self): self._streaming(set_cl=True) - + def _streaming(self, set_cl): if cherrypy.server.protocol_version == "HTTP/1.1": self.PROTOCOL = "HTTP/1.1" - + self.persistent = True - + # Make the first request and assert there's no "Connection: close". self.getPage("/") self.assertStatus('200 OK') self.assertBody(pov) self.assertNoHeader("Connection") - + # Make another, streamed request on the same connection. if set_cl: # When a Content-Length is provided, the content should stream @@ -143,7 +143,7 @@ class ConnectionCloseTests(helper.CPWebCase): self.assertHeader("Content-Length") self.assertNoHeader("Connection", "close") self.assertNoHeader("Transfer-Encoding") - + self.assertStatus('200 OK') self.assertBody('0123456789') else: @@ -154,21 +154,21 @@ class ConnectionCloseTests(helper.CPWebCase): self.assertNoHeader("Content-Length") self.assertStatus('200 OK') self.assertBody('0123456789') - + chunked_response = False for k, v in self.headers: if k.lower() == "transfer-encoding": if str(v) == "chunked": chunked_response = True - + if chunked_response: self.assertNoHeader("Connection", "close") else: self.assertHeader("Connection", "close") - + # Make another request on the same connection, which should error. self.assertRaises(NotConnected, self.getPage, "/") - + # Try HEAD. See http://www.cherrypy.org/ticket/864. self.getPage("/stream", method='HEAD') self.assertStatus('200 OK') @@ -176,15 +176,15 @@ class ConnectionCloseTests(helper.CPWebCase): self.assertNoHeader("Transfer-Encoding") else: self.PROTOCOL = "HTTP/1.0" - + self.persistent = True - + # Make the first request and assert Keep-Alive. self.getPage("/", headers=[("Connection", "Keep-Alive")]) self.assertStatus('200 OK') self.assertBody(pov) self.assertHeader("Connection", "Keep-Alive") - + # Make another, streamed request on the same connection. if set_cl: # When a Content-Length is provided, the content should @@ -202,36 +202,36 @@ class ConnectionCloseTests(helper.CPWebCase): self.getPage("/stream", headers=[("Connection", "Keep-Alive")]) self.assertStatus('200 OK') self.assertBody('0123456789') - + self.assertNoHeader("Content-Length") self.assertNoHeader("Connection", "Keep-Alive") self.assertNoHeader("Transfer-Encoding") - + # Make another request on the same connection, which should error. self.assertRaises(NotConnected, self.getPage, "/") - + def test_HTTP10_KeepAlive(self): self.PROTOCOL = "HTTP/1.0" if self.scheme == "https": self.HTTP_CONN = HTTPSConnection else: self.HTTP_CONN = HTTPConnection - + # Test a normal HTTP/1.0 request. self.getPage("/page2") self.assertStatus('200 OK') self.assertBody(pov) # Apache, for example, may emit a Connection header even for HTTP/1.0 ## self.assertNoHeader("Connection") - + # Test a keep-alive HTTP/1.0 request. self.persistent = True - + self.getPage("/page3", headers=[("Connection", "Keep-Alive")]) self.assertStatus('200 OK') self.assertBody(pov) self.assertHeader("Connection", "Keep-Alive") - + # Remove the keep-alive header again. self.getPage("/page3") self.assertStatus('200 OK') @@ -241,30 +241,31 @@ class ConnectionCloseTests(helper.CPWebCase): class PipelineTests(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def test_HTTP11_Timeout(self): # If we timeout without sending any data, # the server will close the conn with a 408. if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + # Connect but send nothing. self.persistent = True conn = self.HTTP_CONN conn.auto_open = False conn.connect() - + # Wait for our socket timeout time.sleep(timeout * 2) - + # The request should have returned 408 already. response = conn.response_class(conn.sock, method="GET") response.begin() self.assertEqual(response.status, 408) conn.close() - + # Connect but send half the headers only. self.persistent = True conn = self.HTTP_CONN @@ -272,24 +273,24 @@ class PipelineTests(helper.CPWebCase): conn.connect() conn.send('GET /hello HTTP/1.1') conn.send(("Host: %s" % self.HOST).encode('ascii')) - + # Wait for our socket timeout time.sleep(timeout * 2) - + # The conn should have already sent 408. response = conn.response_class(conn.sock, method="GET") response.begin() self.assertEqual(response.status, 408) conn.close() - + def test_HTTP11_Timeout_after_request(self): # If we timeout after at least one request has succeeded, # the server will close the conn without 408. if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + # Make an initial request self.persistent = True conn = self.HTTP_CONN @@ -301,7 +302,7 @@ class PipelineTests(helper.CPWebCase): self.assertEqual(response.status, 200) self.body = response.read() self.assertBody(str(timeout)) - + # Make a second request on the same socket conn._output('GET /hello HTTP/1.1') conn._output("Host: %s" % self.HOST) @@ -311,10 +312,10 @@ class PipelineTests(helper.CPWebCase): self.assertEqual(response.status, 200) self.body = response.read() self.assertBody("Hello, world!") - + # Wait for our socket timeout time.sleep(timeout * 2) - + # Make another request on the same socket, which should error conn._output('GET /hello HTTP/1.1') conn._output("Host: %s" % self.HOST) @@ -332,9 +333,9 @@ class PipelineTests(helper.CPWebCase): self.fail("Writing to timed out socket didn't fail" " as it should have: %s" % response.read()) - + conn.close() - + # Make another request on a new socket, which should work self.persistent = True conn = self.HTTP_CONN @@ -347,7 +348,7 @@ class PipelineTests(helper.CPWebCase): self.body = response.read() self.assertBody(pov) - + # Make another request on the same socket, # but timeout on the headers conn.send('GET /hello HTTP/1.1') @@ -365,9 +366,9 @@ class PipelineTests(helper.CPWebCase): self.fail("Writing to timed out socket didn't fail" " as it should have: %s" % response.read()) - + conn.close() - + # Retry the request on a new connection, which should work self.persistent = True conn = self.HTTP_CONN @@ -380,53 +381,53 @@ class PipelineTests(helper.CPWebCase): self.body = response.read() self.assertBody(pov) conn.close() - + def test_HTTP11_pipelining(self): if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + # Test pipelining. httplib doesn't support this directly. self.persistent = True conn = self.HTTP_CONN - + # Put request 1 conn.putrequest("GET", "/hello", skip_host=True) conn.putheader("Host", self.HOST) conn.endheaders() - + for trial in range(5): # Put next request conn._output('GET /hello HTTP/1.1') conn._output("Host: %s" % self.HOST) conn._send_output() - + # Retrieve previous response response = conn.response_class(conn.sock, method="GET") response.begin() body = response.read() self.assertEqual(response.status, 200) self.assertEqual(body, "Hello, world!") - + # Retrieve final response response = conn.response_class(conn.sock, method="GET") response.begin() body = response.read() self.assertEqual(response.status, 200) self.assertEqual(body, "Hello, world!") - + conn.close() - + def test_100_Continue(self): if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + self.persistent = True conn = self.HTTP_CONN - + # Try a page without an Expect request header first. # Note that httplib's response.begin automatically ignores # 100 Continue responses, so we must manually check for it. @@ -440,7 +441,7 @@ class PipelineTests(helper.CPWebCase): version, status, reason = response._read_status() self.assertNotEqual(status, 100) conn.close() - + # Now try a page with an Expect header... conn.connect() conn.putrequest("POST", "/upload", skip_host=True) @@ -450,7 +451,7 @@ class PipelineTests(helper.CPWebCase): conn.putheader("Expect", "100-continue") conn.endheaders() response = conn.response_class(conn.sock, method="POST") - + # ...assert and then skip the 100 response version, status, reason = response._read_status() self.assertEqual(status, 100) @@ -460,10 +461,10 @@ class PipelineTests(helper.CPWebCase): self.fail("100 Continue should not output any headers. Got %r" % line) else: break - + # ...send the body conn.send("I am a small file") - + # ...get the final response response.begin() self.status, self.headers, self.body = webtest.shb(response) @@ -473,26 +474,27 @@ class PipelineTests(helper.CPWebCase): class ConnectionTests(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def test_readall_or_close(self): if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + if self.scheme == "https": self.HTTP_CONN = HTTPSConnection else: self.HTTP_CONN = HTTPConnection - + # Test a max of 0 (the default) and then reset to what it was above. old_max = cherrypy.server.max_request_body_size for new_max in (0, old_max): cherrypy.server.max_request_body_size = new_max - + self.persistent = True conn = self.HTTP_CONN - + # Get a POST page with an error conn.putrequest("POST", "/err_before_read", skip_host=True) conn.putheader("Host", self.HOST) @@ -501,7 +503,7 @@ class ConnectionTests(helper.CPWebCase): conn.putheader("Expect", "100-continue") conn.endheaders() response = conn.response_class(conn.sock, method="POST") - + # ...assert and then skip the 100 response version, status, reason = response._read_status() self.assertEqual(status, 100) @@ -509,15 +511,15 @@ class ConnectionTests(helper.CPWebCase): skip = response.fp.readline().strip() if not skip: break - + # ...send the body conn.send("x" * 1000) - + # ...get the final response response.begin() self.status, self.headers, self.body = webtest.shb(response) self.assertStatus(500) - + # Now try a working page with an Expect header... conn._output('POST /upload HTTP/1.1') conn._output("Host: %s" % self.HOST) @@ -526,7 +528,7 @@ class ConnectionTests(helper.CPWebCase): conn._output("Expect: 100-continue") conn._send_output() response = conn.response_class(conn.sock, method="POST") - + # ...assert and then skip the 100 response version, status, reason = response._read_status() self.assertEqual(status, 100) @@ -534,61 +536,61 @@ class ConnectionTests(helper.CPWebCase): skip = response.fp.readline().strip() if not skip: break - + # ...send the body conn.send("I am a small file") - + # ...get the final response response.begin() self.status, self.headers, self.body = webtest.shb(response) self.assertStatus(200) self.assertBody("thanks for 'I am a small file'") conn.close() - + def test_No_Message_Body(self): if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + # Set our HTTP_CONN to an instance so it persists between requests. self.persistent = True - + # Make the first request and assert there's no "Connection: close". self.getPage("/") self.assertStatus('200 OK') self.assertBody(pov) self.assertNoHeader("Connection") - + # Make a 204 request on the same connection. self.getPage("/custom/204") self.assertStatus(204) self.assertNoHeader("Content-Length") self.assertBody("") self.assertNoHeader("Connection") - + # Make a 304 request on the same connection. self.getPage("/custom/304") self.assertStatus(304) self.assertNoHeader("Content-Length") self.assertBody("") self.assertNoHeader("Connection") - + def test_Chunked_Encoding(self): if cherrypy.server.protocol_version != "HTTP/1.1": return self.skip() - + if (hasattr(self, 'harness') and "modpython" in self.harness.__class__.__name__.lower()): # mod_python forbids chunked encoding return self.skip() - + self.PROTOCOL = "HTTP/1.1" - + # Set our HTTP_CONN to an instance so it persists between requests. self.persistent = True conn = self.HTTP_CONN - + # Try a normal chunked request (with extensions) body = ("8;key=value\r\nxx\r\nxxxx\r\n5\r\nyyyyy\r\n0\r\n" "Content-Type: application/json\r\n" @@ -607,7 +609,7 @@ class ConnectionTests(helper.CPWebCase): self.status, self.headers, self.body = webtest.shb(response) self.assertStatus('200 OK') self.assertBody("thanks for 'xx\r\nxxxxyyyyy'") - + # Try a chunked request that exceeds server.max_request_body_size. # Note that the delimiters and trailer are included. body = "3e3\r\n" + ("x" * 995) + "\r\n0\r\n\r\n" @@ -623,7 +625,7 @@ class ConnectionTests(helper.CPWebCase): self.status, self.headers, self.body = webtest.shb(response) self.assertStatus(413) conn.close() - + def test_Content_Length(self): # Try a non-chunked request where Content-Length exceeds # server.max_request_body_size. Assert error before body send. @@ -639,7 +641,7 @@ class ConnectionTests(helper.CPWebCase): self.assertStatus(413) self.assertBody("") conn.close() - + def test_598(self): remote_data_conn = urllib.urlopen('%s://%s:%s/one_megabyte_of_a/' % (self.scheme, self.HOST, self.PORT,)) @@ -653,7 +655,7 @@ class ConnectionTests(helper.CPWebCase): else: buf += data remaining -= len(data) - + self.assertEqual(len(buf), 1024 * 1024) self.assertEqual(buf, "a" * 1024 * 1024) self.assertEqual(remaining, 0) @@ -661,10 +663,11 @@ class ConnectionTests(helper.CPWebCase): class BadRequestTests(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def test_No_CRLF(self): self.persistent = True - + conn = self.HTTP_CONN conn.send('GET /hello HTTP/1.1\n\n') response = conn.response_class(conn.sock, method="GET") @@ -672,7 +675,7 @@ class BadRequestTests(helper.CPWebCase): self.body = response.read() self.assertBody("HTTP requires CRLF terminators") conn.close() - + conn.connect() conn.send('GET /hello HTTP/1.1\r\n\n') response = conn.response_class(conn.sock, method="GET") diff --git a/cherrypy/test/test_core.py b/cherrypy/test/test_core.py index 2aa9278c..b37eb339 100644 --- a/cherrypy/test/test_core.py +++ b/cherrypy/test/test_core.py @@ -1,7 +1,7 @@ """Basic tests for the CherryPy core: request handling.""" from cherrypy.test import test -test.prefer_parent_path() + import os localDir = os.path.dirname(__file__) @@ -16,236 +16,236 @@ from cherrypy.lib import httputil, static favicon_path = os.path.join(os.getcwd(), localDir, "../favicon.ico") -def setup_server(): - class Root: - - def index(self): - return "hello" - index.exposed = True - - favicon_ico = tools.staticfile.handler(filename=favicon_path) - - def defct(self, newct): - newct = "text/%s" % newct - cherrypy.config.update({'tools.response_headers.on': True, - 'tools.response_headers.headers': - [('Content-Type', newct)]}) - defct.exposed = True - - def baseurl(self, path_info, relative=None): - return cherrypy.url(path_info, relative=bool(relative)) - baseurl.exposed = True - - root = Root() - - - class TestType(type): - """Metaclass which automatically exposes all functions in each subclass, - and adds an instance of the subclass as an attribute of root. - """ - def __init__(cls, name, bases, dct): - type.__init__(cls, name, bases, dct) - for value in dct.itervalues(): - if isinstance(value, types.FunctionType): - value.exposed = True - setattr(root, name.lower(), cls()) - class Test(object): - __metaclass__ = TestType - - - class URL(Test): - - _cp_config = {'tools.trailing_slash.on': False} - - def index(self, path_info, relative=None): - if relative != 'server': - relative = bool(relative) - return cherrypy.url(path_info, relative=relative) - - def leaf(self, path_info, relative=None): - if relative != 'server': - relative = bool(relative) - return cherrypy.url(path_info, relative=relative) +# Client-side code # +from cherrypy.test import helper - class Status(Test): - - def index(self): - return "normal" +class CoreRequestHandlingTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + + def index(self): + return "hello" + index.exposed = True + + favicon_ico = tools.staticfile.handler(filename=favicon_path) + + def defct(self, newct): + newct = "text/%s" % newct + cherrypy.config.update({'tools.response_headers.on': True, + 'tools.response_headers.headers': + [('Content-Type', newct)]}) + defct.exposed = True + + def baseurl(self, path_info, relative=None): + return cherrypy.url(path_info, relative=bool(relative)) + baseurl.exposed = True - def blank(self): - cherrypy.response.status = "" + root = Root() - # According to RFC 2616, new status codes are OK as long as they - # are between 100 and 599. - # Here is an illegal code... - def illegal(self): - cherrypy.response.status = 781 - return "oops" + class TestType(type): + """Metaclass which automatically exposes all functions in each subclass, + and adds an instance of the subclass as an attribute of root. + """ + def __init__(cls, name, bases, dct): + type.__init__(cls, name, bases, dct) + for value in dct.itervalues(): + if isinstance(value, types.FunctionType): + value.exposed = True + setattr(root, name.lower(), cls()) + class Test(object): + __metaclass__ = TestType - # ...and here is an unknown but legal code. - def unknown(self): - cherrypy.response.status = "431 My custom error" - return "funky" - # Non-numeric code - def bad(self): - cherrypy.response.status = "error" - return "bad news" + class URL(Test): + + _cp_config = {'tools.trailing_slash.on': False} + + def index(self, path_info, relative=None): + if relative != 'server': + relative = bool(relative) + return cherrypy.url(path_info, relative=relative) + + def leaf(self, path_info, relative=None): + if relative != 'server': + relative = bool(relative) + return cherrypy.url(path_info, relative=relative) - class Redirect(Test): - - class Error: - _cp_config = {"tools.err_redirect.on": True, - "tools.err_redirect.url": "/errpage", - "tools.err_redirect.internal": False, - } + class Status(Test): def index(self): - raise NameError("redirect_test") - index.exposed = True - error = Error() - - def index(self): - return "child" - - def by_code(self, code): - raise cherrypy.HTTPRedirect("somewhere else", code) - by_code._cp_config = {'tools.trailing_slash.extra': True} - - def nomodify(self): - raise cherrypy.HTTPRedirect("", 304) - - def proxy(self): - raise cherrypy.HTTPRedirect("proxy", 305) - - def stringify(self): - return str(cherrypy.HTTPRedirect("/")) - - def fragment(self, frag): - raise cherrypy.HTTPRedirect("/some/url#%s" % frag) - - def login_redir(): - if not getattr(cherrypy.request, "login", None): - raise cherrypy.InternalRedirect("/internalredirect/login") - tools.login_redir = _cptools.Tool('before_handler', login_redir) - - def redir_custom(): - raise cherrypy.InternalRedirect("/internalredirect/custom_err") - - class InternalRedirect(Test): - - def index(self): - raise cherrypy.InternalRedirect("/") - - def choke(self): - return 3 / 0 - choke.exposed = True - choke._cp_config = {'hooks.before_error_response': redir_custom} - - def relative(self, a, b): - raise cherrypy.InternalRedirect("cousin?t=6") - - def cousin(self, t): - assert cherrypy.request.prev.closed - return cherrypy.request.prev.query_string - - def petshop(self, user_id): - if user_id == "parrot": - # Trade it for a slug when redirecting - raise cherrypy.InternalRedirect('/image/getImagesByUser?user_id=slug') - elif user_id == "terrier": - # Trade it for a fish when redirecting - raise cherrypy.InternalRedirect('/image/getImagesByUser?user_id=fish') - else: - # This should pass the user_id through to getImagesByUser - raise cherrypy.InternalRedirect( - '/image/getImagesByUser?user_id=%s' % str(user_id)) - - # We support Python 2.3, but the @-deco syntax would look like this: - # @tools.login_redir() - def secure(self): - return "Welcome!" - secure = tools.login_redir()(secure) - # Since calling the tool returns the same function you pass in, - # you could skip binding the return value, and just write: - # tools.login_redir()(secure) - - def login(self): - return "Please log in" - - def custom_err(self): - return "Something went horribly wrong." - - def early_ir(self, arg): - return "whatever" - early_ir._cp_config = {'hooks.before_request_body': redir_custom} - - - class Image(Test): - - def getImagesByUser(self, user_id): - return "0 images for %s" % user_id + return "normal" + + def blank(self): + cherrypy.response.status = "" + + # According to RFC 2616, new status codes are OK as long as they + # are between 100 and 599. + + # Here is an illegal code... + def illegal(self): + cherrypy.response.status = 781 + return "oops" + + # ...and here is an unknown but legal code. + def unknown(self): + cherrypy.response.status = "431 My custom error" + return "funky" + + # Non-numeric code + def bad(self): + cherrypy.response.status = "error" + return "bad news" - class Flatten(Test): - - def as_string(self): - return "content" - - def as_list(self): - return ["con", "tent"] + class Redirect(Test): + + class Error: + _cp_config = {"tools.err_redirect.on": True, + "tools.err_redirect.url": "/errpage", + "tools.err_redirect.internal": False, + } + + def index(self): + raise NameError("redirect_test") + index.exposed = True + error = Error() + + def index(self): + return "child" + + def by_code(self, code): + raise cherrypy.HTTPRedirect("somewhere else", code) + by_code._cp_config = {'tools.trailing_slash.extra': True} + + def nomodify(self): + raise cherrypy.HTTPRedirect("", 304) + + def proxy(self): + raise cherrypy.HTTPRedirect("proxy", 305) + + def stringify(self): + return str(cherrypy.HTTPRedirect("/")) + + def fragment(self, frag): + raise cherrypy.HTTPRedirect("/some/url#%s" % frag) - def as_yield(self): - yield "content" + def login_redir(): + if not getattr(cherrypy.request, "login", None): + raise cherrypy.InternalRedirect("/internalredirect/login") + tools.login_redir = _cptools.Tool('before_handler', login_redir) - def as_dblyield(self): - yield self.as_yield() - as_dblyield._cp_config = {'tools.flatten.on': True} + def redir_custom(): + raise cherrypy.InternalRedirect("/internalredirect/custom_err") - def as_refyield(self): - for chunk in self.as_yield(): - yield chunk - - - class Ranges(Test): + class InternalRedirect(Test): + + def index(self): + raise cherrypy.InternalRedirect("/") + + def choke(self): + return 3 / 0 + choke.exposed = True + choke._cp_config = {'hooks.before_error_response': redir_custom} + + def relative(self, a, b): + raise cherrypy.InternalRedirect("cousin?t=6") + + def cousin(self, t): + assert cherrypy.request.prev.closed + return cherrypy.request.prev.query_string + + def petshop(self, user_id): + if user_id == "parrot": + # Trade it for a slug when redirecting + raise cherrypy.InternalRedirect('/image/getImagesByUser?user_id=slug') + elif user_id == "terrier": + # Trade it for a fish when redirecting + raise cherrypy.InternalRedirect('/image/getImagesByUser?user_id=fish') + else: + # This should pass the user_id through to getImagesByUser + raise cherrypy.InternalRedirect( + '/image/getImagesByUser?user_id=%s' % str(user_id)) + + # We support Python 2.3, but the @-deco syntax would look like this: + # @tools.login_redir() + def secure(self): + return "Welcome!" + secure = tools.login_redir()(secure) + # Since calling the tool returns the same function you pass in, + # you could skip binding the return value, and just write: + # tools.login_redir()(secure) + + def login(self): + return "Please log in" + + def custom_err(self): + return "Something went horribly wrong." + + def early_ir(self, arg): + return "whatever" + early_ir._cp_config = {'hooks.before_request_body': redir_custom} - def get_ranges(self, bytes): - return repr(httputil.get_ranges('bytes=%s' % bytes, 8)) - def slice_file(self): - path = os.path.join(os.getcwd(), os.path.dirname(__file__)) - return static.serve_file(os.path.join(path, "static/index.html")) + class Image(Test): + + def getImagesByUser(self, user_id): + return "0 images for %s" % user_id - class Cookies(Test): + class Flatten(Test): + + def as_string(self): + return "content" + + def as_list(self): + return ["con", "tent"] + + def as_yield(self): + yield "content" + + def as_dblyield(self): + yield self.as_yield() + as_dblyield._cp_config = {'tools.flatten.on': True} + + def as_refyield(self): + for chunk in self.as_yield(): + yield chunk - def single(self, name): - cookie = cherrypy.request.cookie[name] - # Python2's SimpleCookie.__setitem__ won't take unicode keys. - cherrypy.response.cookie[str(name)] = cookie.value - def multiple(self, names): - for name in names: + class Ranges(Test): + + def get_ranges(self, bytes): + return repr(httputil.get_ranges('bytes=%s' % bytes, 8)) + + def slice_file(self): + path = os.path.join(os.getcwd(), os.path.dirname(__file__)) + return static.serve_file(os.path.join(path, "static/index.html")) + + + class Cookies(Test): + + def single(self, name): cookie = cherrypy.request.cookie[name] # Python2's SimpleCookie.__setitem__ won't take unicode keys. cherrypy.response.cookie[str(name)] = cookie.value + + def multiple(self, names): + for name in names: + cookie = cherrypy.request.cookie[name] + # Python2's SimpleCookie.__setitem__ won't take unicode keys. + cherrypy.response.cookie[str(name)] = cookie.value - if sys.version_info >= (2, 5): - from cherrypy.test import py25 - Root.expose_dec = py25.ExposeExamples() - - cherrypy.tree.mount(root) - - -# Client-side code # - -from cherrypy.test import helper + if sys.version_info >= (2, 5): + from cherrypy.test import py25 + Root.expose_dec = py25.ExposeExamples() + + cherrypy.tree.mount(root) -class CoreRequestHandlingTest(helper.CPWebCase): def testStatus(self): self.getPage("/status/") diff --git a/cherrypy/test/test_dynamicobjectmapping.py b/cherrypy/test/test_dynamicobjectmapping.py index 64b64658..0f4f18f0 100644 --- a/cherrypy/test/test_dynamicobjectmapping.py +++ b/cherrypy/test/test_dynamicobjectmapping.py @@ -1,176 +1,176 @@ from cherrypy.test import test from cherrypy._cptree import Application -test.prefer_parent_path() import cherrypy script_names = ["", "/foo", "/users/fred/blog", "/corp/blog"] -def setup_server(): - class SubSubRoot: - def index(self): - return "SubSubRoot index" - index.exposed = True - - def default(self, *args): - return "SubSubRoot default" - default.exposed = True - - def handler(self): - return "SubSubRoot handler" - handler.exposed = True - - def dispatch(self): - return "SubSubRoot dispatch" - dispatch.exposed = True - - subsubnodes = { - '1': SubSubRoot(), - '2': SubSubRoot(), - } - - class SubRoot: - def index(self): - return "SubRoot index" - index.exposed = True - - def default(self, *args): - return "SubRoot %s" % (args,) - default.exposed = True - - def handler(self): - return "SubRoot handler" - handler.exposed = True - - def _cp_dispatch(self, vpath): - return subsubnodes.get(vpath[0], None) - - subnodes = { - '1': SubRoot(), - '2': SubRoot(), - } - class Root: - def index(self): - return "index" - index.exposed = True - - def default(self, *args): - return "default %s" % (args,) - default.exposed = True - - def handler(self): - return "handler" - handler.exposed = True - - def _cp_dispatch(self, vpath): - return subnodes.get(vpath[0]) - - #-------------------------------------------------------------------------- - # DynamicNodeAndMethodDispatcher example. - # This example exposes a fairly naive HTTP api - class User(object): - def __init__(self, id, name): - self.id = id - self.name = name - - def __unicode__(self): - return unicode(self.name) - - user_lookup = { - 1: User(1, 'foo'), - 2: User(2, 'bar'), - } - - def make_user(name, id=None): - if not id: - id = max(*user_lookup.keys()) + 1 - user_lookup[id] = User(id, name) - return id - - class UserContainerNode(object): - exposed = True - - def POST(self, name): - """ - Allow the creation of a new Object - """ - return "POST %d" % make_user(name) - - def GET(self): - keys = user_lookup.keys() - keys.sort() - return unicode(keys) - - def dynamic_dispatch(self, vpath): - try: - id = int(vpath[0]) - except ValueError: - return None - return UserInstanceNode(id) - - class UserInstanceNode(object): - exposed = True - def __init__(self, id): - self.id = id - self.user = user_lookup.get(id, None) - - # For all but PUT methods there MUST be a valid user identified - # by self.id - if not self.user and cherrypy.request.method != 'PUT': - raise cherrypy.HTTPError(404) - - def GET(self, *args, **kwargs): - """ - Return the appropriate representation of the instance. - """ - return unicode(self.user) - - def POST(self, name): - """ - Update the fields of the user instance. - """ - self.user.name = name - return "POST %d" % self.user.id - - def PUT(self, name): - """ - Create a new user with the specified id, or edit it if it already exists - """ - if self.user: - # Edit the current user - self.user.name = name - return "PUT %d" % self.user.id - else: - # Make a new user with said attributes. - return "PUT %d" % make_user(name, self.id) - - def DELETE(self): - """ - Delete the user specified at the id. - """ - id = self.user.id - del user_lookup[self.user.id] - del self.user - return "DELETE %d" % id - - - Root.users = UserContainerNode() - - md = cherrypy.dispatch.MethodDispatcher('dynamic_dispatch') - for url in script_names: - conf = {'/': { - 'user': (url or "/").split("/")[-2], - }, - '/users': { - 'request.dispatch': md - }, - } - cherrypy.tree.mount(Root(), url, conf) - from cherrypy.test import helper class DynamicObjectMappingTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class SubSubRoot: + def index(self): + return "SubSubRoot index" + index.exposed = True + + def default(self, *args): + return "SubSubRoot default" + default.exposed = True + + def handler(self): + return "SubSubRoot handler" + handler.exposed = True + + def dispatch(self): + return "SubSubRoot dispatch" + dispatch.exposed = True + + subsubnodes = { + '1': SubSubRoot(), + '2': SubSubRoot(), + } + + class SubRoot: + def index(self): + return "SubRoot index" + index.exposed = True + + def default(self, *args): + return "SubRoot %s" % (args,) + default.exposed = True + + def handler(self): + return "SubRoot handler" + handler.exposed = True + + def _cp_dispatch(self, vpath): + return subsubnodes.get(vpath[0], None) + + subnodes = { + '1': SubRoot(), + '2': SubRoot(), + } + class Root: + def index(self): + return "index" + index.exposed = True + + def default(self, *args): + return "default %s" % (args,) + default.exposed = True + + def handler(self): + return "handler" + handler.exposed = True + + def _cp_dispatch(self, vpath): + return subnodes.get(vpath[0]) + + #-------------------------------------------------------------------------- + # DynamicNodeAndMethodDispatcher example. + # This example exposes a fairly naive HTTP api + class User(object): + def __init__(self, id, name): + self.id = id + self.name = name + + def __unicode__(self): + return unicode(self.name) + + user_lookup = { + 1: User(1, 'foo'), + 2: User(2, 'bar'), + } + + def make_user(name, id=None): + if not id: + id = max(*user_lookup.keys()) + 1 + user_lookup[id] = User(id, name) + return id + + class UserContainerNode(object): + exposed = True + + def POST(self, name): + """ + Allow the creation of a new Object + """ + return "POST %d" % make_user(name) + + def GET(self): + keys = user_lookup.keys() + keys.sort() + return unicode(keys) + + def dynamic_dispatch(self, vpath): + try: + id = int(vpath[0]) + except ValueError: + return None + return UserInstanceNode(id) + + class UserInstanceNode(object): + exposed = True + def __init__(self, id): + self.id = id + self.user = user_lookup.get(id, None) + + # For all but PUT methods there MUST be a valid user identified + # by self.id + if not self.user and cherrypy.request.method != 'PUT': + raise cherrypy.HTTPError(404) + + def GET(self, *args, **kwargs): + """ + Return the appropriate representation of the instance. + """ + return unicode(self.user) + + def POST(self, name): + """ + Update the fields of the user instance. + """ + self.user.name = name + return "POST %d" % self.user.id + + def PUT(self, name): + """ + Create a new user with the specified id, or edit it if it already exists + """ + if self.user: + # Edit the current user + self.user.name = name + return "PUT %d" % self.user.id + else: + # Make a new user with said attributes. + return "PUT %d" % make_user(name, self.id) + + def DELETE(self): + """ + Delete the user specified at the id. + """ + id = self.user.id + del user_lookup[self.user.id] + del self.user + return "DELETE %d" % id + + + Root.users = UserContainerNode() + + md = cherrypy.dispatch.MethodDispatcher('dynamic_dispatch') + for url in script_names: + conf = {'/': { + 'user': (url or "/").split("/")[-2], + }, + '/users': { + 'request.dispatch': md + }, + } + cherrypy.tree.mount(Root(), url, conf) + def testObjectMapping(self): for url in script_names: diff --git a/cherrypy/test/test_encoding.py b/cherrypy/test/test_encoding.py index 8a68ef5a..b55d3cdf 100644 --- a/cherrypy/test/test_encoding.py +++ b/cherrypy/test/test_encoding.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + import gzip try: @@ -17,79 +17,77 @@ sing8 = sing.encode('utf-8') sing16 = sing.encode('utf-16') -def setup_server(): - class Root: - def index(self, param): - assert param == europoundUnicode, "%r != %r" % (param, europoundUnicode) - yield europoundUnicode - index.exposed = True - - def mao_zedong(self): - return sing - mao_zedong.exposed = True - - def utf8(self): - return sing8 - utf8.exposed = True - utf8._cp_config = {'tools.encode.encoding': 'utf-8'} - - def reqparams(self, *args, **kwargs): - return ', '.join([": ".join((k, v)).encode('utf8') - for k, v in cherrypy.request.params.items()]) - reqparams.exposed = True - - class GZIP: - def index(self): - yield "Hello, world" - index.exposed = True - - def noshow(self): - # Test for ticket #147, where yield showed no exceptions (content- - # encoding was still gzip even though traceback wasn't zipped). - raise IndexError() - yield "Here be dragons" - noshow.exposed = True - # Turn encoding off so the gzip tool is the one doing the collapse. - noshow._cp_config = {'tools.encode.on': False} - - def noshow_stream(self): - # Test for ticket #147, where yield showed no exceptions (content- - # encoding was still gzip even though traceback wasn't zipped). - raise IndexError() - yield "Here be dragons" - noshow_stream.exposed = True - noshow_stream._cp_config = {'response.stream': True} - - class Decode: - def extra_charset(self, *args, **kwargs): - return ', '.join([": ".join((k, v)).encode('utf8') - for k, v in cherrypy.request.params.items()]) - extra_charset.exposed = True - extra_charset._cp_config = { - 'tools.decode.on': True, - 'tools.decode.default_encoding': [u'utf-16'], - } - - def force_charset(self, *args, **kwargs): - return ', '.join([": ".join((k, v)).encode('utf8') - for k, v in cherrypy.request.params.items()]) - force_charset.exposed = True - force_charset._cp_config = { - 'tools.decode.on': True, - 'tools.decode.encoding': u'utf-16', - } - - root = Root() - root.gzip = GZIP() - root.decode = Decode() - cherrypy.tree.mount(root, config={'/gzip': {'tools.gzip.on': True}}) - - - from cherrypy.test import helper class EncodingTests(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def index(self, param): + assert param == europoundUnicode, "%r != %r" % (param, europoundUnicode) + yield europoundUnicode + index.exposed = True + + def mao_zedong(self): + return sing + mao_zedong.exposed = True + + def utf8(self): + return sing8 + utf8.exposed = True + utf8._cp_config = {'tools.encode.encoding': 'utf-8'} + + def reqparams(self, *args, **kwargs): + return ', '.join([": ".join((k, v)).encode('utf8') + for k, v in cherrypy.request.params.items()]) + reqparams.exposed = True + + class GZIP: + def index(self): + yield "Hello, world" + index.exposed = True + + def noshow(self): + # Test for ticket #147, where yield showed no exceptions (content- + # encoding was still gzip even though traceback wasn't zipped). + raise IndexError() + yield "Here be dragons" + noshow.exposed = True + # Turn encoding off so the gzip tool is the one doing the collapse. + noshow._cp_config = {'tools.encode.on': False} + + def noshow_stream(self): + # Test for ticket #147, where yield showed no exceptions (content- + # encoding was still gzip even though traceback wasn't zipped). + raise IndexError() + yield "Here be dragons" + noshow_stream.exposed = True + noshow_stream._cp_config = {'response.stream': True} + + class Decode: + def extra_charset(self, *args, **kwargs): + return ', '.join([": ".join((k, v)).encode('utf8') + for k, v in cherrypy.request.params.items()]) + extra_charset.exposed = True + extra_charset._cp_config = { + 'tools.decode.on': True, + 'tools.decode.default_encoding': [u'utf-16'], + } + + def force_charset(self, *args, **kwargs): + return ', '.join([": ".join((k, v)).encode('utf8') + for k, v in cherrypy.request.params.items()]) + force_charset.exposed = True + force_charset._cp_config = { + 'tools.decode.on': True, + 'tools.decode.encoding': u'utf-16', + } + + root = Root() + root.gzip = GZIP() + root.decode = Decode() + cherrypy.tree.mount(root, config={'/gzip': {'tools.gzip.on': True}}) def test_query_string_decoding(self): europoundUtf8 = europoundUnicode.encode('utf-8') diff --git a/cherrypy/test/test_etags.py b/cherrypy/test/test_etags.py index 4bf6ad17..cafd569d 100644 --- a/cherrypy/test/test_etags.py +++ b/cherrypy/test/test_etags.py @@ -1,36 +1,37 @@ from cherrypy.test import test -test.prefer_parent_path() + import cherrypy -def setup_server(): - class Root: - def resource(self): - return "Oh wah ta goo Siam." - resource.exposed = True - - def fail(self, code): - code = int(code) - if 300 <= code <= 399: - raise cherrypy.HTTPRedirect([], code) - else: - raise cherrypy.HTTPError(code) - fail.exposed = True - - def unicoded(self): - return u'I am a \u1ee4nicode string.' - unicoded.exposed = True - unicoded._cp_config = {'tools.encode.on': True} - - conf = {'/': {'tools.etags.on': True, - 'tools.etags.autotags': True, - }} - cherrypy.tree.mount(Root(), config=conf) - from cherrypy.test import helper class ETagTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def resource(self): + return "Oh wah ta goo Siam." + resource.exposed = True + + def fail(self, code): + code = int(code) + if 300 <= code <= 399: + raise cherrypy.HTTPRedirect([], code) + else: + raise cherrypy.HTTPError(code) + fail.exposed = True + + def unicoded(self): + return u'I am a \u1ee4nicode string.' + unicoded.exposed = True + unicoded._cp_config = {'tools.encode.on': True} + + conf = {'/': {'tools.etags.on': True, + 'tools.etags.autotags': True, + }} + cherrypy.tree.mount(Root(), config=conf) + def test_etags(self): self.getPage("/resource") diff --git a/cherrypy/test/test_http.py b/cherrypy/test/test_http.py index 1c92ee22..125a6a3a 100644 --- a/cherrypy/test/test_http.py +++ b/cherrypy/test/test_http.py @@ -1,7 +1,7 @@ """Tests for managing HTTP issues (malformed requests, etc).""" from cherrypy.test import test -test.prefer_parent_path() + from httplib import HTTPConnection, HTTPSConnection import cherrypy @@ -30,44 +30,44 @@ def encode_multipart_formdata(files): return content_type, body -def setup_server(): - - class Root: - def index(self, *args, **kwargs): - return "Hello world!" - index.exposed = True - - def no_body(self, *args, **kwargs): - return "Hello world!" - no_body.exposed = True - no_body._cp_config = {'request.process_request_body': False} - - def post_multipart(self, file): - """Return a summary ("a * 65536\nb * 65536") of the uploaded file.""" - contents = file.file.read() - summary = [] - curchar = "" - count = 0 - for c in contents: - if c == curchar: - count += 1 - else: - if count: - summary.append("%s * %d" % (curchar, count)) - count = 1 - curchar = c - if count: - summary.append("%s * %d" % (curchar, count)) - return ", ".join(summary) - post_multipart.exposed = True - - cherrypy.tree.mount(Root()) - cherrypy.config.update({'server.max_request_body_size': 30000000}) from cherrypy.test import helper class HTTPTests(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def index(self, *args, **kwargs): + return "Hello world!" + index.exposed = True + + def no_body(self, *args, **kwargs): + return "Hello world!" + no_body.exposed = True + no_body._cp_config = {'request.process_request_body': False} + + def post_multipart(self, file): + """Return a summary ("a * 65536\nb * 65536") of the uploaded file.""" + contents = file.file.read() + summary = [] + curchar = "" + count = 0 + for c in contents: + if c == curchar: + count += 1 + else: + if count: + summary.append("%s * %d" % (curchar, count)) + count = 1 + curchar = c + if count: + summary.append("%s * %d" % (curchar, count)) + return ", ".join(summary) + post_multipart.exposed = True + + cherrypy.tree.mount(Root()) + cherrypy.config.update({'server.max_request_body_size': 30000000}) def test_no_content_length(self): # "The presence of a message-body in a request is signaled by the diff --git a/cherrypy/test/test_httpauth.py b/cherrypy/test/test_httpauth.py index 0709d5a7..60c5b406 100644 --- a/cherrypy/test/test_httpauth.py +++ b/cherrypy/test/test_httpauth.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + try: # Python 2.5+ @@ -11,56 +11,57 @@ except ImportError: import cherrypy from cherrypy.lib import httpauth -def setup_server(): - class Root: - def index(self): - return "This is public." - index.exposed = True - - class DigestProtected: - def index(self): - return "Hello %s, you've been authorized." % cherrypy.request.login - index.exposed = True - - class BasicProtected: - def index(self): - return "Hello %s, you've been authorized." % cherrypy.request.login - index.exposed = True - - class BasicProtected2: - def index(self): - return "Hello %s, you've been authorized." % cherrypy.request.login - index.exposed = True - - def fetch_users(): - return {'test': 'test'} - - def sha_password_encrypter(password): - return sha(password).hexdigest() - - def fetch_password(username): - return sha('test').hexdigest() - - conf = {'/digest': {'tools.digest_auth.on': True, - 'tools.digest_auth.realm': 'localhost', - 'tools.digest_auth.users': fetch_users}, - '/basic': {'tools.basic_auth.on': True, - 'tools.basic_auth.realm': 'localhost', - 'tools.basic_auth.users': {'test': md5('test').hexdigest()}}, - '/basic2': {'tools.basic_auth.on': True, - 'tools.basic_auth.realm': 'localhost', - 'tools.basic_auth.users': fetch_password, - 'tools.basic_auth.encrypt': sha_password_encrypter}} - - root = Root() - root.digest = DigestProtected() - root.basic = BasicProtected() - root.basic2 = BasicProtected2() - cherrypy.tree.mount(root, config=conf) - from cherrypy.test import helper class HTTPAuthTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def index(self): + return "This is public." + index.exposed = True + + class DigestProtected: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + class BasicProtected: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + class BasicProtected2: + def index(self): + return "Hello %s, you've been authorized." % cherrypy.request.login + index.exposed = True + + def fetch_users(): + return {'test': 'test'} + + def sha_password_encrypter(password): + return sha(password).hexdigest() + + def fetch_password(username): + return sha('test').hexdigest() + + conf = {'/digest': {'tools.digest_auth.on': True, + 'tools.digest_auth.realm': 'localhost', + 'tools.digest_auth.users': fetch_users}, + '/basic': {'tools.basic_auth.on': True, + 'tools.basic_auth.realm': 'localhost', + 'tools.basic_auth.users': {'test': md5('test').hexdigest()}}, + '/basic2': {'tools.basic_auth.on': True, + 'tools.basic_auth.realm': 'localhost', + 'tools.basic_auth.users': fetch_password, + 'tools.basic_auth.encrypt': sha_password_encrypter}} + + root = Root() + root.digest = DigestProtected() + root.basic = BasicProtected() + root.basic2 = BasicProtected2() + cherrypy.tree.mount(root, config=conf) + def testPublic(self): self.getPage("/") diff --git a/cherrypy/test/test_httplib.py b/cherrypy/test/test_httplib.py index 7789e57d..09e34f24 100644 --- a/cherrypy/test/test_httplib.py +++ b/cherrypy/test/test_httplib.py @@ -1,7 +1,7 @@ """Tests for cherrypy/lib/httputil.py.""" from cherrypy.test import test -test.prefer_parent_path() + import unittest from cherrypy.lib import httputil diff --git a/cherrypy/test/test_json.py b/cherrypy/test/test_json.py index 514e63ed..b6136ea0 100644 --- a/cherrypy/test/test_json.py +++ b/cherrypy/test/test_json.py @@ -1,5 +1,5 @@ from cherrypy.test import test, helper -test.prefer_parent_path() + import cherrypy @@ -7,39 +7,40 @@ from cherrypy.lib.jsontools import json if json is None: print "skipped (simplejson not found) " else: - def setup_server(): - class Root(object): - def plain(self): - return 'hello' - plain.exposed = True + class JsonTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root(object): + def plain(self): + return 'hello' + plain.exposed = True - def json_string(self): - return 'hello' - json_string.exposed = True - json_string._cp_config = {'tools.json_out.on': True} + def json_string(self): + return 'hello' + json_string.exposed = True + json_string._cp_config = {'tools.json_out.on': True} - def json_list(self): - return ['a', 'b', 42] - json_list.exposed = True - json_list._cp_config = {'tools.json_out.on': True} + def json_list(self): + return ['a', 'b', 42] + json_list.exposed = True + json_list._cp_config = {'tools.json_out.on': True} - def json_dict(self): - return {'answer': 42} - json_dict.exposed = True - json_dict._cp_config = {'tools.json_out.on': True} + def json_dict(self): + return {'answer': 42} + json_dict.exposed = True + json_dict._cp_config = {'tools.json_out.on': True} - def json_post(self): - if cherrypy.request.json == [13, 'c']: - return 'ok' - else: - return 'nok' - json_post.exposed = True - json_post._cp_config = {'tools.json_in.on': True} + def json_post(self): + if cherrypy.request.json == [13, 'c']: + return 'ok' + else: + return 'nok' + json_post.exposed = True + json_post._cp_config = {'tools.json_in.on': True} - root = Root() - cherrypy.tree.mount(root) + root = Root() + cherrypy.tree.mount(root) - class JsonTest(helper.CPWebCase): def test_json_output(self): self.getPage("/plain") self.assertBody("hello") diff --git a/cherrypy/test/test_logging.py b/cherrypy/test/test_logging.py index b90d77d4..0870c36f 100644 --- a/cherrypy/test/test_logging.py +++ b/cherrypy/test/test_logging.py @@ -1,7 +1,7 @@ """Basic tests for the CherryPy core: request handling.""" from cherrypy.test import test -test.prefer_parent_path() + import os localDir = os.path.dirname(__file__) @@ -67,6 +67,7 @@ def setup_server(): from cherrypy.test import helper, logtest class AccessLogTests(helper.CPWebCase, logtest.LogCase): + setup_server = staticmethod(setup_server) logfile = access_log @@ -132,6 +133,7 @@ class AccessLogTests(helper.CPWebCase, logtest.LogCase): class ErrorLogTests(helper.CPWebCase, logtest.LogCase): + setup_server = staticmethod(setup_server) logfile = error_log diff --git a/cherrypy/test/test_mime.py b/cherrypy/test/test_mime.py index 05ffa7e9..3e3444b6 100644 --- a/cherrypy/test/test_mime.py +++ b/cherrypy/test/test_mime.py @@ -1,7 +1,7 @@ """Tests for various MIME issues, including the safe_multipart Tool.""" from cherrypy.test import test -test.prefer_parent_path() + import cherrypy @@ -28,6 +28,7 @@ def setup_server(): from cherrypy.test import helper class MultipartTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def test_multipart(self): text_part = u"This is the text version" @@ -62,7 +63,8 @@ This is the <strong>HTML</strong> version class SafeMultipartHandlingTest(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def test_Flash_Upload(self): headers = [ ('Accept', 'text/*'), diff --git a/cherrypy/test/test_misc_tools.py b/cherrypy/test/test_misc_tools.py index 5ff3e184..05f0a322 100644 --- a/cherrypy/test/test_misc_tools.py +++ b/cherrypy/test/test_misc_tools.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + import os localDir = os.path.dirname(__file__) @@ -95,6 +95,7 @@ def setup_server(): from cherrypy.test import helper class ResponseHeadersTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def testResponseHeadersDecorator(self): self.getPage('/') @@ -108,6 +109,7 @@ class ResponseHeadersTest(helper.CPWebCase): class RefererTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def testReferer(self): self.getPage('/referer/accept') @@ -129,6 +131,7 @@ class RefererTest(helper.CPWebCase): class AcceptTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def test_Accept_Tool(self): # Test with no header provided @@ -193,6 +196,7 @@ class AcceptTest(helper.CPWebCase): class AutoVaryTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def testAutoVary(self): self.getPage('/autovary/') diff --git a/cherrypy/test/test_objectmapping.py b/cherrypy/test/test_objectmapping.py index 638ccd91..38ee76c6 100644 --- a/cherrypy/test/test_objectmapping.py +++ b/cherrypy/test/test_objectmapping.py @@ -1,171 +1,172 @@ from cherrypy.test import test from cherrypy._cptree import Application -test.prefer_parent_path() + import cherrypy script_names = ["", "/foo", "/users/fred/blog", "/corp/blog"] -def setup_server(): - class Root: - def index(self, name="world"): - return name - index.exposed = True - - def foobar(self): - return "bar" - foobar.exposed = True - - def default(self, *params, **kwargs): - return "default:" + repr(params) - default.exposed = True - - def other(self): - return "other" - other.exposed = True - - def extra(self, *p): - return repr(p) - extra.exposed = True - - def redirect(self): - raise cherrypy.HTTPRedirect('dir1/', 302) - redirect.exposed = True - - def notExposed(self): - return "not exposed" - - def confvalue(self): - return cherrypy.request.config.get("user") - confvalue.exposed = True - - def redirect_via_url(self, path): - raise cherrypy.HTTPRedirect(cherrypy.url(path)) - redirect_via_url.exposed = True - - def mapped_func(self, ID=None): - return "ID is %s" % ID - mapped_func.exposed = True - setattr(Root, "Von B\xfclow", mapped_func) - - - class Exposing: - def base(self): - return "expose works!" - cherrypy.expose(base) - cherrypy.expose(base, "1") - cherrypy.expose(base, "2") - - class ExposingNewStyle(object): - def base(self): - return "expose works!" - cherrypy.expose(base) - cherrypy.expose(base, "1") - cherrypy.expose(base, "2") - - - class Dir1: - def index(self): - return "index for dir1" - index.exposed = True - - def myMethod(self): - return "myMethod from dir1, path_info is:" + repr(cherrypy.request.path_info) - myMethod.exposed = True - myMethod._cp_config = {'tools.trailing_slash.extra': True} - - def default(self, *params): - return "default for dir1, param is:" + repr(params) - default.exposed = True +from cherrypy.test import helper - class Dir2: - def index(self): - return "index for dir2, path is:" + cherrypy.request.path_info - index.exposed = True - - def script_name(self): - return cherrypy.tree.script_name() - script_name.exposed = True - - def cherrypy_url(self): - return cherrypy.url("/extra") - cherrypy_url.exposed = True - - def posparam(self, *vpath): - return "/".join(vpath) - posparam.exposed = True - - - class Dir3: - def default(self): - return "default for dir3, not exposed" - - class Dir4: - def index(self): - return "index for dir4, not exposed" - - class DefNoIndex: - def default(self, *args): - raise cherrypy.HTTPRedirect("contact") - default.exposed = True - - # MethodDispatcher code - class ByMethod: - exposed = True +class ObjectMappingTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def index(self, name="world"): + return name + index.exposed = True + + def foobar(self): + return "bar" + foobar.exposed = True + + def default(self, *params, **kwargs): + return "default:" + repr(params) + default.exposed = True + + def other(self): + return "other" + other.exposed = True + + def extra(self, *p): + return repr(p) + extra.exposed = True + + def redirect(self): + raise cherrypy.HTTPRedirect('dir1/', 302) + redirect.exposed = True + + def notExposed(self): + return "not exposed" + + def confvalue(self): + return cherrypy.request.config.get("user") + confvalue.exposed = True + + def redirect_via_url(self, path): + raise cherrypy.HTTPRedirect(cherrypy.url(path)) + redirect_via_url.exposed = True + + def mapped_func(self, ID=None): + return "ID is %s" % ID + mapped_func.exposed = True + setattr(Root, "Von B\xfclow", mapped_func) + + + class Exposing: + def base(self): + return "expose works!" + cherrypy.expose(base) + cherrypy.expose(base, "1") + cherrypy.expose(base, "2") + + class ExposingNewStyle(object): + def base(self): + return "expose works!" + cherrypy.expose(base) + cherrypy.expose(base, "1") + cherrypy.expose(base, "2") + + + class Dir1: + def index(self): + return "index for dir1" + index.exposed = True + + def myMethod(self): + return "myMethod from dir1, path_info is:" + repr(cherrypy.request.path_info) + myMethod.exposed = True + myMethod._cp_config = {'tools.trailing_slash.extra': True} + + def default(self, *params): + return "default for dir1, param is:" + repr(params) + default.exposed = True + + + class Dir2: + def index(self): + return "index for dir2, path is:" + cherrypy.request.path_info + index.exposed = True + + def script_name(self): + return cherrypy.tree.script_name() + script_name.exposed = True + + def cherrypy_url(self): + return cherrypy.url("/extra") + cherrypy_url.exposed = True + + def posparam(self, *vpath): + return "/".join(vpath) + posparam.exposed = True - def __init__(self, *things): - self.things = list(things) - def GET(self): - return repr(self.things) + class Dir3: + def default(self): + return "default for dir3, not exposed" - def POST(self, thing): - self.things.append(thing) - - class Collection: - default = ByMethod('a', 'bit') - - Root.exposing = Exposing() - Root.exposingnew = ExposingNewStyle() - Root.dir1 = Dir1() - Root.dir1.dir2 = Dir2() - Root.dir1.dir2.dir3 = Dir3() - Root.dir1.dir2.dir3.dir4 = Dir4() - Root.defnoindex = DefNoIndex() - Root.bymethod = ByMethod('another') - Root.collection = Collection() - - d = cherrypy.dispatch.MethodDispatcher() - for url in script_names: - conf = {'/': {'user': (url or "/").split("/")[-2]}, - '/bymethod': {'request.dispatch': d}, - '/collection': {'request.dispatch': d}, - } - cherrypy.tree.mount(Root(), url, conf) - - - class Isolated: - def index(self): - return "made it!" - index.exposed = True - - cherrypy.tree.mount(Isolated(), "/isolated") - - class AnotherApp: + class Dir4: + def index(self): + return "index for dir4, not exposed" - exposed = True + class DefNoIndex: + def default(self, *args): + raise cherrypy.HTTPRedirect("contact") + default.exposed = True - def GET(self): - return "milk" - - cherrypy.tree.mount(AnotherApp(), "/app", {'/': {'request.dispatch': d}}) - - -from cherrypy.test import helper + # MethodDispatcher code + class ByMethod: + exposed = True + + def __init__(self, *things): + self.things = list(things) + + def GET(self): + return repr(self.things) + + def POST(self, thing): + self.things.append(thing) + + class Collection: + default = ByMethod('a', 'bit') + + Root.exposing = Exposing() + Root.exposingnew = ExposingNewStyle() + Root.dir1 = Dir1() + Root.dir1.dir2 = Dir2() + Root.dir1.dir2.dir3 = Dir3() + Root.dir1.dir2.dir3.dir4 = Dir4() + Root.defnoindex = DefNoIndex() + Root.bymethod = ByMethod('another') + Root.collection = Collection() + + d = cherrypy.dispatch.MethodDispatcher() + for url in script_names: + conf = {'/': {'user': (url or "/").split("/")[-2]}, + '/bymethod': {'request.dispatch': d}, + '/collection': {'request.dispatch': d}, + } + cherrypy.tree.mount(Root(), url, conf) + + + class Isolated: + def index(self): + return "made it!" + index.exposed = True + + cherrypy.tree.mount(Isolated(), "/isolated") + + class AnotherApp: + + exposed = True + + def GET(self): + return "milk" + + cherrypy.tree.mount(AnotherApp(), "/app", {'/': {'request.dispatch': d}}) -class ObjectMappingTest(helper.CPWebCase): def testObjectMapping(self): for url in script_names: diff --git a/cherrypy/test/test_proxy.py b/cherrypy/test/test_proxy.py index f2d15cf9..0a2b9bc8 100644 --- a/cherrypy/test/test_proxy.py +++ b/cherrypy/test/test_proxy.py @@ -1,66 +1,65 @@ from cherrypy.test import test -test.prefer_parent_path() + import cherrypy script_names = ["", "/path/to/myapp"] -def setup_server(): - - # Set up site - cherrypy.config.update({ - 'tools.proxy.on': True, - 'tools.proxy.base': 'www.mydomain.test', - }) - - # Set up application - - class Root: - - def __init__(self, sn): - # Calculate a URL outside of any requests. - self.thisnewpage = cherrypy.url("/this/new/page", script_name=sn) - - def pageurl(self): - return self.thisnewpage - pageurl.exposed = True - - def index(self): - raise cherrypy.HTTPRedirect('dummy') - index.exposed = True - - def remoteip(self): - return cherrypy.request.remote.ip - remoteip.exposed = True +from cherrypy.test import helper + +class ProxyTest(helper.CPWebCase): + @staticmethod + def setup_server(): - def xhost(self): - raise cherrypy.HTTPRedirect('blah') - xhost.exposed = True - xhost._cp_config = {'tools.proxy.local': 'X-Host', - 'tools.trailing_slash.extra': True, - } + # Set up site + cherrypy.config.update({ + 'tools.proxy.on': True, + 'tools.proxy.base': 'www.mydomain.test', + }) - def base(self): - return cherrypy.request.base - base.exposed = True + # Set up application - def ssl(self): - return cherrypy.request.base - ssl.exposed = True - ssl._cp_config = {'tools.proxy.scheme': 'X-Forwarded-Ssl'} + class Root: + + def __init__(self, sn): + # Calculate a URL outside of any requests. + self.thisnewpage = cherrypy.url("/this/new/page", script_name=sn) + + def pageurl(self): + return self.thisnewpage + pageurl.exposed = True + + def index(self): + raise cherrypy.HTTPRedirect('dummy') + index.exposed = True + + def remoteip(self): + return cherrypy.request.remote.ip + remoteip.exposed = True + + def xhost(self): + raise cherrypy.HTTPRedirect('blah') + xhost.exposed = True + xhost._cp_config = {'tools.proxy.local': 'X-Host', + 'tools.trailing_slash.extra': True, + } + + def base(self): + return cherrypy.request.base + base.exposed = True + + def ssl(self): + return cherrypy.request.base + ssl.exposed = True + ssl._cp_config = {'tools.proxy.scheme': 'X-Forwarded-Ssl'} + + def newurl(self): + return ("Browse to <a href='%s'>this page</a>." + % cherrypy.url("/this/new/page")) + newurl.exposed = True - def newurl(self): - return ("Browse to <a href='%s'>this page</a>." - % cherrypy.url("/this/new/page")) - newurl.exposed = True - - for sn in script_names: - cherrypy.tree.mount(Root(sn), sn) - - -from cherrypy.test import helper - -class ProxyTest(helper.CPWebCase): + for sn in script_names: + cherrypy.tree.mount(Root(sn), sn) def testProxy(self): self.getPage("/") diff --git a/cherrypy/test/test_refleaks.py b/cherrypy/test/test_refleaks.py index 5fe12782..d7c04c07 100644 --- a/cherrypy/test/test_refleaks.py +++ b/cherrypy/test/test_refleaks.py @@ -1,7 +1,7 @@ """Tests for refleaks.""" from cherrypy.test import test -test.prefer_parent_path() + import gc from httplib import HTTPConnection, HTTPSConnection @@ -16,72 +16,73 @@ data = object() def get_instances(cls): return [x for x in gc.get_objects() if isinstance(x, cls)] -def setup_server(): - - class Root: - def index(self, *args, **kwargs): - cherrypy.request.thing = data - return "Hello world!" - index.exposed = True - - def gc_stats(self): - output = ["Statistics:"] - - # Uncollectable garbage - - # gc_collect isn't perfectly synchronous, because it may - # break reference cycles that then take time to fully - # finalize. Call it twice and hope for the best. - gc.collect() - unreachable = gc.collect() - if unreachable: - output.append("\n%s unreachable objects:" % unreachable) - trash = {} - for x in gc.garbage: - trash[type(x)] = trash.get(type(x), 0) + 1 - trash = [(v, k) for k, v in trash.iteritems()] - trash.sort() - for pair in trash: - output.append(" " + repr(pair)) - - # Request references - reqs = get_instances(_cprequest.Request) - lenreqs = len(reqs) - if lenreqs < 2: - output.append("\nMissing Request reference. Should be 1 in " - "this request thread and 1 in the main thread.") - elif lenreqs > 2: - output.append("\nToo many Request references (%r)." % lenreqs) - for req in reqs: - output.append("Referrers for %s:" % repr(req)) - for ref in gc.get_referrers(req): - if ref is not reqs: - output.append(" %s" % repr(ref)) - - # Response references - resps = get_instances(_cprequest.Response) - lenresps = len(resps) - if lenresps < 2: - output.append("\nMissing Response reference. Should be 1 in " - "this request thread and 1 in the main thread.") - elif lenresps > 2: - output.append("\nToo many Response references (%r)." % lenresps) - for resp in resps: - output.append("Referrers for %s:" % repr(resp)) - for ref in gc.get_referrers(resp): - if ref is not resps: - output.append(" %s" % repr(ref)) - - return "\n".join(output) - gc_stats.exposed = True - - cherrypy.tree.mount(Root()) - from cherrypy.test import helper class ReferenceTests(helper.CPWebCase): + @staticmethod + def setup_server(): + + class Root: + def index(self, *args, **kwargs): + cherrypy.request.thing = data + return "Hello world!" + index.exposed = True + + def gc_stats(self): + output = ["Statistics:"] + + # Uncollectable garbage + + # gc_collect isn't perfectly synchronous, because it may + # break reference cycles that then take time to fully + # finalize. Call it twice and hope for the best. + gc.collect() + unreachable = gc.collect() + if unreachable: + output.append("\n%s unreachable objects:" % unreachable) + trash = {} + for x in gc.garbage: + trash[type(x)] = trash.get(type(x), 0) + 1 + trash = [(v, k) for k, v in trash.iteritems()] + trash.sort() + for pair in trash: + output.append(" " + repr(pair)) + + # Request references + reqs = get_instances(_cprequest.Request) + lenreqs = len(reqs) + if lenreqs < 2: + output.append("\nMissing Request reference. Should be 1 in " + "this request thread and 1 in the main thread.") + elif lenreqs > 2: + output.append("\nToo many Request references (%r)." % lenreqs) + for req in reqs: + output.append("Referrers for %s:" % repr(req)) + for ref in gc.get_referrers(req): + if ref is not reqs: + output.append(" %s" % repr(ref)) + + # Response references + resps = get_instances(_cprequest.Response) + lenresps = len(resps) + if lenresps < 2: + output.append("\nMissing Response reference. Should be 1 in " + "this request thread and 1 in the main thread.") + elif lenresps > 2: + output.append("\nToo many Response references (%r)." % lenresps) + for resp in resps: + output.append("Referrers for %s:" % repr(resp)) + for ref in gc.get_referrers(resp): + if ref is not resps: + output.append(" %s" % repr(ref)) + + return "\n".join(output) + gc_stats.exposed = True + + cherrypy.tree.mount(Root()) + def test_threadlocal_garbage(self): success = [] diff --git a/cherrypy/test/test_request_obj.py b/cherrypy/test/test_request_obj.py index 68af9c81..c26c90ae 100644 --- a/cherrypy/test/test_request_obj.py +++ b/cherrypy/test/test_request_obj.py @@ -1,7 +1,7 @@ """Basic tests for the cherrypy.Request object.""" from cherrypy.test import test -test.prefer_parent_path() + import os localDir = os.path.dirname(__file__) @@ -17,266 +17,267 @@ defined_http_methods = ("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "PROPFIND") -def setup_server(): - class Root: + +# Client-side code # + +from cherrypy.test import helper + +class RequestObjectTests(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + + def index(self): + return "hello" + index.exposed = True + + def scheme(self): + return cherrypy.request.scheme + scheme.exposed = True - def index(self): - return "hello" - index.exposed = True + root = Root() - def scheme(self): - return cherrypy.request.scheme - scheme.exposed = True - - root = Root() - - - class TestType(type): - """Metaclass which automatically exposes all functions in each subclass, - and adds an instance of the subclass as an attribute of root. - """ - def __init__(cls, name, bases, dct): - type.__init__(cls, name, bases, dct) - for value in dct.itervalues(): - if isinstance(value, types.FunctionType): - value.exposed = True - setattr(root, name.lower(), cls()) - class Test(object): - __metaclass__ = TestType - - - class Params(Test): - def index(self, thing): - return repr(thing) + class TestType(type): + """Metaclass which automatically exposes all functions in each subclass, + and adds an instance of the subclass as an attribute of root. + """ + def __init__(cls, name, bases, dct): + type.__init__(cls, name, bases, dct) + for value in dct.itervalues(): + if isinstance(value, types.FunctionType): + value.exposed = True + setattr(root, name.lower(), cls()) + class Test(object): + __metaclass__ = TestType - def ismap(self, x, y): - return "Coordinates: %s, %s" % (x, y) - def default(self, *args, **kwargs): - return "args: %s kwargs: %s" % (args, kwargs) - default._cp_config = {'request.query_string_encoding': 'latin1'} + class Params(Test): + + def index(self, thing): + return repr(thing) + + def ismap(self, x, y): + return "Coordinates: %s, %s" % (x, y) + + def default(self, *args, **kwargs): + return "args: %s kwargs: %s" % (args, kwargs) + default._cp_config = {'request.query_string_encoding': 'latin1'} - class ParamErrorsCallable(object): - exposed = True - def __call__(self): - return "data" + class ParamErrorsCallable(object): + exposed = True + def __call__(self): + return "data" - class ParamErrors(Test): + class ParamErrors(Test): - def one_positional(self, param1): - return "data" - one_positional.exposed = True + def one_positional(self, param1): + return "data" + one_positional.exposed = True - def one_positional_args(self, param1, *args): - return "data" - one_positional_args.exposed = True + def one_positional_args(self, param1, *args): + return "data" + one_positional_args.exposed = True - def one_positional_args_kwargs(self, param1, *args, **kwargs): - return "data" - one_positional_args_kwargs.exposed = True + def one_positional_args_kwargs(self, param1, *args, **kwargs): + return "data" + one_positional_args_kwargs.exposed = True - def one_positional_kwargs(self, param1, **kwargs): - return "data" - one_positional_kwargs.exposed = True + def one_positional_kwargs(self, param1, **kwargs): + return "data" + one_positional_kwargs.exposed = True - def no_positional(self): - return "data" - no_positional.exposed = True + def no_positional(self): + return "data" + no_positional.exposed = True - def no_positional_args(self, *args): - return "data" - no_positional_args.exposed = True + def no_positional_args(self, *args): + return "data" + no_positional_args.exposed = True - def no_positional_args_kwargs(self, *args, **kwargs): - return "data" - no_positional_args_kwargs.exposed = True + def no_positional_args_kwargs(self, *args, **kwargs): + return "data" + no_positional_args_kwargs.exposed = True - def no_positional_kwargs(self, **kwargs): - return "data" - no_positional_kwargs.exposed = True + def no_positional_kwargs(self, **kwargs): + return "data" + no_positional_kwargs.exposed = True - callable_object = ParamErrorsCallable() + callable_object = ParamErrorsCallable() - def raise_type_error(self, **kwargs): - raise TypeError("Client Error") - raise_type_error.exposed = True + def raise_type_error(self, **kwargs): + raise TypeError("Client Error") + raise_type_error.exposed = True - def raise_type_error_with_default_param(self, x, y=None): - return '%d' % 'a' # throw an exception - raise_type_error_with_default_param.exposed = True + def raise_type_error_with_default_param(self, x, y=None): + return '%d' % 'a' # throw an exception + raise_type_error_with_default_param.exposed = True - def callable_error_page(status, **kwargs): - return "Error %s - Well, I'm very sorry but you haven't paid!" % status - - - class Error(Test): - - _cp_config = {'tools.log_tracebacks.on': True, - } - - def reason_phrase(self): - raise cherrypy.HTTPError("410 Gone fishin'") - - def custom(self, err='404'): - raise cherrypy.HTTPError(int(err), "No, <b>really</b>, not found!") - custom._cp_config = {'error_page.404': os.path.join(localDir, "static/index.html"), - 'error_page.401': callable_error_page, - } - - def custom_default(self): - return 1 + 'a' # raise an unexpected error - custom_default._cp_config = {'error_page.default': callable_error_page} - - def noexist(self): - raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!") - noexist._cp_config = {'error_page.404': "nonexistent.html"} - - def page_method(self): - raise ValueError() - - def page_yield(self): - yield "howdy" - raise ValueError() - - def page_streamed(self): - yield "word up" - raise ValueError() - yield "very oops" - page_streamed._cp_config = {"response.stream": True} - - def cause_err_in_finalize(self): - # Since status must start with an int, this should error. - cherrypy.response.status = "ZOO OK" - cause_err_in_finalize._cp_config = {'request.show_tracebacks': False} - - def rethrow(self): - """Test that an error raised here will be thrown out to the server.""" - raise ValueError() - rethrow._cp_config = {'request.throw_errors': True} - - - class Expect(Test): - - def expectation_failed(self): - expect = cherrypy.request.headers.elements("Expect") - if expect and expect[0].value != '100-continue': - raise cherrypy.HTTPError(400) - raise cherrypy.HTTPError(417, 'Expectation Failed') - - class Headers(Test): - - def default(self, headername): - """Spit back out the value for the requested header.""" - return cherrypy.request.headers[headername] - - def doubledheaders(self): - # From http://www.cherrypy.org/ticket/165: - # "header field names should not be case sensitive sayes the rfc. - # if i set a headerfield in complete lowercase i end up with two - # header fields, one in lowercase, the other in mixed-case." - - # Set the most common headers - hMap = cherrypy.response.headers - hMap['content-type'] = "text/html" - hMap['content-length'] = 18 - hMap['server'] = 'CherryPy headertest' - hMap['location'] = ('%s://%s:%s/headers/' - % (cherrypy.request.local.ip, - cherrypy.request.local.port, - cherrypy.request.scheme)) - - # Set a rare header for fun - hMap['Expires'] = 'Thu, 01 Dec 2194 16:00:00 GMT' - - return "double header test" - - def ifmatch(self): - val = cherrypy.request.headers['If-Match'] - assert isinstance(val, unicode) - cherrypy.response.headers['ETag'] = val - return val - - - class HeaderElements(Test): + def callable_error_page(status, **kwargs): + return "Error %s - Well, I'm very sorry but you haven't paid!" % status - def get_elements(self, headername): - e = cherrypy.request.headers.elements(headername) - return "\n".join([unicode(x) for x in e]) - - - class Method(Test): - def index(self): - m = cherrypy.request.method - if m in defined_http_methods or m == "CONNECT": - return m + class Error(Test): - if m == "LINK": - raise cherrypy.HTTPError(405) - else: - raise cherrypy.HTTPError(501) - - def parameterized(self, data): - return data + _cp_config = {'tools.log_tracebacks.on': True, + } + + def reason_phrase(self): + raise cherrypy.HTTPError("410 Gone fishin'") + + def custom(self, err='404'): + raise cherrypy.HTTPError(int(err), "No, <b>really</b>, not found!") + custom._cp_config = {'error_page.404': os.path.join(localDir, "static/index.html"), + 'error_page.401': callable_error_page, + } + + def custom_default(self): + return 1 + 'a' # raise an unexpected error + custom_default._cp_config = {'error_page.default': callable_error_page} + + def noexist(self): + raise cherrypy.HTTPError(404, "No, <b>really</b>, not found!") + noexist._cp_config = {'error_page.404': "nonexistent.html"} + + def page_method(self): + raise ValueError() + + def page_yield(self): + yield "howdy" + raise ValueError() + + def page_streamed(self): + yield "word up" + raise ValueError() + yield "very oops" + page_streamed._cp_config = {"response.stream": True} + + def cause_err_in_finalize(self): + # Since status must start with an int, this should error. + cherrypy.response.status = "ZOO OK" + cause_err_in_finalize._cp_config = {'request.show_tracebacks': False} + + def rethrow(self): + """Test that an error raised here will be thrown out to the server.""" + raise ValueError() + rethrow._cp_config = {'request.throw_errors': True} - def request_body(self): - # This should be a file object (temp file), - # which CP will just pipe back out if we tell it to. - return cherrypy.request.body - def reachable(self): - return "success" + class Expect(Test): + + def expectation_failed(self): + expect = cherrypy.request.headers.elements("Expect") + if expect and expect[0].value != '100-continue': + raise cherrypy.HTTPError(400) + raise cherrypy.HTTPError(417, 'Expectation Failed') - class Divorce: - """HTTP Method handlers shouldn't collide with normal method names. - For example, a GET-handler shouldn't collide with a method named 'get'. + class Headers(Test): + + def default(self, headername): + """Spit back out the value for the requested header.""" + return cherrypy.request.headers[headername] + + def doubledheaders(self): + # From http://www.cherrypy.org/ticket/165: + # "header field names should not be case sensitive sayes the rfc. + # if i set a headerfield in complete lowercase i end up with two + # header fields, one in lowercase, the other in mixed-case." + + # Set the most common headers + hMap = cherrypy.response.headers + hMap['content-type'] = "text/html" + hMap['content-length'] = 18 + hMap['server'] = 'CherryPy headertest' + hMap['location'] = ('%s://%s:%s/headers/' + % (cherrypy.request.local.ip, + cherrypy.request.local.port, + cherrypy.request.scheme)) + + # Set a rare header for fun + hMap['Expires'] = 'Thu, 01 Dec 2194 16:00:00 GMT' + + return "double header test" + + def ifmatch(self): + val = cherrypy.request.headers['If-Match'] + assert isinstance(val, unicode) + cherrypy.response.headers['ETag'] = val + return val - If you build HTTP method dispatching into CherryPy, rewrite this class - to use your new dispatch mechanism and make sure that: - "GET /divorce HTTP/1.1" maps to divorce.index() and - "GET /divorce/get?ID=13 HTTP/1.1" maps to divorce.get() - """ - documents = {} + class HeaderElements(Test): + + def get_elements(self, headername): + e = cherrypy.request.headers.elements(headername) + return "\n".join([unicode(x) for x in e]) - def index(self): - yield "<h1>Choose your document</h1>\n" - yield "<ul>\n" - for id, contents in self.documents.items(): - yield (" <li><a href='/divorce/get?ID=%s'>%s</a>: %s</li>\n" - % (id, id, contents)) - yield "</ul>" - index.exposed = True - def get(self, ID): - return ("Divorce document %s: %s" % - (ID, self.documents.get(ID, "empty"))) - get.exposed = True - - root.divorce = Divorce() - + class Method(Test): + + def index(self): + m = cherrypy.request.method + if m in defined_http_methods or m == "CONNECT": + return m + + if m == "LINK": + raise cherrypy.HTTPError(405) + else: + raise cherrypy.HTTPError(501) + + def parameterized(self, data): + return data + + def request_body(self): + # This should be a file object (temp file), + # which CP will just pipe back out if we tell it to. + return cherrypy.request.body + + def reachable(self): + return "success" - class ThreadLocal(Test): - - def index(self): - existing = repr(getattr(cherrypy.request, "asdf", None)) - cherrypy.request.asdf = "rassfrassin" - return existing - - appconf = { - '/method': {'request.methods_with_bodies': ("POST", "PUT", "PROPFIND")}, - } - cherrypy.tree.mount(root, config=appconf) + class Divorce: + """HTTP Method handlers shouldn't collide with normal method names. + For example, a GET-handler shouldn't collide with a method named 'get'. + + If you build HTTP method dispatching into CherryPy, rewrite this class + to use your new dispatch mechanism and make sure that: + "GET /divorce HTTP/1.1" maps to divorce.index() and + "GET /divorce/get?ID=13 HTTP/1.1" maps to divorce.get() + """ + + documents = {} + + def index(self): + yield "<h1>Choose your document</h1>\n" + yield "<ul>\n" + for id, contents in self.documents.items(): + yield (" <li><a href='/divorce/get?ID=%s'>%s</a>: %s</li>\n" + % (id, id, contents)) + yield "</ul>" + index.exposed = True + + def get(self, ID): + return ("Divorce document %s: %s" % + (ID, self.documents.get(ID, "empty"))) + get.exposed = True + root.divorce = Divorce() -# Client-side code # -from cherrypy.test import helper + class ThreadLocal(Test): + + def index(self): + existing = repr(getattr(cherrypy.request, "asdf", None)) + cherrypy.request.asdf = "rassfrassin" + return existing + + appconf = { + '/method': {'request.methods_with_bodies': ("POST", "PUT", "PROPFIND")}, + } + cherrypy.tree.mount(root, config=appconf) -class RequestObjectTests(helper.CPWebCase): - + def test_scheme(self): self.getPage("/scheme") self.assertBody(self.scheme) diff --git a/cherrypy/test/test_routes.py b/cherrypy/test/test_routes.py index 67c62dd6..4b245b09 100644 --- a/cherrypy/test/test_routes.py +++ b/cherrypy/test/test_routes.py @@ -1,48 +1,52 @@ from cherrypy.test import test -test.prefer_parent_path() + import os curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) import cherrypy +from cherrypy.test import helper +import nose + +class RoutesDispatchTest(helper.CPWebCase): + @staticmethod + def setup_server(): -def setup_server(): + try: + import routes + except ImportError: + raise nose.SkipTest('Install routes to test RoutesDispatcher code') - class Dummy: - def index(self): - return "I said good day!" - - class City: - - def __init__(self, name): - self.name = name - self.population = 10000 - - def index(self, **kwargs): - return "Welcome to %s, pop. %s" % (self.name, self.population) - index._cp_config = {'tools.response_headers.on': True, - 'tools.response_headers.headers': [('Content-Language', 'en-GB')]} + class Dummy: + def index(self): + return "I said good day!" - def update(self, **kwargs): - self.population = kwargs['pop'] - return "OK" + class City: + + def __init__(self, name): + self.name = name + self.population = 10000 + + def index(self, **kwargs): + return "Welcome to %s, pop. %s" % (self.name, self.population) + index._cp_config = {'tools.response_headers.on': True, + 'tools.response_headers.headers': [('Content-Language', 'en-GB')]} + + def update(self, **kwargs): + self.population = kwargs['pop'] + return "OK" + + d = cherrypy.dispatch.RoutesDispatcher() + d.connect(name='hounslow', route='hounslow', controller=City('Hounslow')) + d.connect(name='surbiton', route='surbiton', controller=City('Surbiton'), + action='index', conditions=dict(method=['GET'])) + d.mapper.connect('surbiton', controller='surbiton', + action='update', conditions=dict(method=['POST'])) + d.connect('main', ':action', controller=Dummy()) - d = cherrypy.dispatch.RoutesDispatcher() - d.connect(name='hounslow', route='hounslow', controller=City('Hounslow')) - d.connect(name='surbiton', route='surbiton', controller=City('Surbiton'), - action='index', conditions=dict(method=['GET'])) - d.mapper.connect('surbiton', controller='surbiton', - action='update', conditions=dict(method=['POST'])) - d.connect('main', ':action', controller=Dummy()) - - conf = {'/': {'request.dispatch': d}} - cherrypy.tree.mount(root=None, config=conf) - - -from cherrypy.test import helper - -class RoutesDispatchTest(helper.CPWebCase): + conf = {'/': {'request.dispatch': d}} + cherrypy.tree.mount(root=None, config=conf) def test_Routes_Dispatch(self): self.getPage("/hounslow") diff --git a/cherrypy/test/test_session.py b/cherrypy/test/test_session.py index 5259f0fd..c9cb57a2 100755 --- a/cherrypy/test/test_session.py +++ b/cherrypy/test/test_session.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + from httplib import HTTPConnection, HTTPSConnection import os @@ -126,6 +126,7 @@ def setup_server(): from cherrypy.test import helper class SessionTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def tearDown(self): # Clean up sessions. @@ -373,11 +374,13 @@ try: break except (ImportError, socket.error): class MemcachedSessionTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def test(self): return self.skip("memcached not reachable ") else: class MemcachedSessionTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def test_0_Session(self): self.getPage('/setsessiontype/memcached') diff --git a/cherrypy/test/test_sessionauthenticate.py b/cherrypy/test/test_sessionauthenticate.py index 17572767..d4078142 100644 --- a/cherrypy/test/test_sessionauthenticate.py +++ b/cherrypy/test/test_sessionauthenticate.py @@ -1,43 +1,44 @@ from cherrypy.test import test -test.prefer_parent_path() + import cherrypy -def setup_server(): - - def check(username, password): - # Dummy check_username_and_password function - if username != 'test' or password != 'password': - return u'Wrong login/password' - - def augment_params(): - # A simple tool to add some things to request.params - # This is to check to make sure that session_auth can handle request - # params (ticket #780) - cherrypy.request.params["test"] = "test" - cherrypy.tools.augment_params = cherrypy.Tool('before_handler', - augment_params, None, priority=30) +from cherrypy.test import helper + - class Test: +class SessionAuthenticateTest(helper.CPWebCase): + @staticmethod + def setup_server(): - _cp_config = {'tools.sessions.on': True, - 'tools.session_auth.on': True, - 'tools.session_auth.check_username_and_password': check, - 'tools.augment_params.on': True, - } + def check(username, password): + # Dummy check_username_and_password function + if username != 'test' or password != 'password': + return u'Wrong login/password' - def index(self, **kwargs): - return "Hi %s, you are logged in" % cherrypy.request.login - index.exposed = True - - cherrypy.tree.mount(Test()) - + def augment_params(): + # A simple tool to add some things to request.params + # This is to check to make sure that session_auth can handle request + # params (ticket #780) + cherrypy.request.params["test"] = "test" -from cherrypy.test import helper + cherrypy.tools.augment_params = cherrypy.Tool('before_handler', + augment_params, None, priority=30) + class Test: + + _cp_config = {'tools.sessions.on': True, + 'tools.session_auth.on': True, + 'tools.session_auth.check_username_and_password': check, + 'tools.augment_params.on': True, + } + + def index(self, **kwargs): + return "Hi %s, you are logged in" % cherrypy.request.login + index.exposed = True + + cherrypy.tree.mount(Test()) -class SessionAuthenticateTest(helper.CPWebCase): def testSessionAuthenticate(self): # request a page and check for login form diff --git a/cherrypy/test/test_states.py b/cherrypy/test/test_states.py index b8a4bea1..00c47d09 100644 --- a/cherrypy/test/test_states.py +++ b/cherrypy/test/test_states.py @@ -6,7 +6,7 @@ import threading import time from cherrypy.test import test -test.prefer_parent_path() + import cherrypy engine = cherrypy.engine @@ -14,34 +14,34 @@ thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) class Dependency: - + def __init__(self, bus): self.bus = bus self.running = False self.startcount = 0 self.gracecount = 0 self.threads = {} - + def subscribe(self): self.bus.subscribe('start', self.start) self.bus.subscribe('stop', self.stop) self.bus.subscribe('graceful', self.graceful) self.bus.subscribe('start_thread', self.startthread) self.bus.subscribe('stop_thread', self.stopthread) - + def start(self): self.running = True self.startcount += 1 - + def stop(self): self.running = False - + def graceful(self): self.gracecount += 1 - + def startthread(self, thread_id): self.threads[thread_id] = None - + def stopthread(self, thread_id): del self.threads[thread_id] @@ -52,16 +52,16 @@ def setup_server(): def index(self): return "Hello World" index.exposed = True - + def ctrlc(self): raise KeyboardInterrupt() ctrlc.exposed = True - + def graceful(self): engine.graceful() return "app was (gracefully) restarted succesfully" graceful.exposed = True - + def block_explicit(self): while True: if cherrypy.response.timed_out: @@ -69,7 +69,7 @@ def setup_server(): return "broken!" time.sleep(0.01) block_explicit.exposed = True - + def block_implicit(self): time.sleep(0.5) return "response.timeout = %s" % cherrypy.response.timeout @@ -91,42 +91,43 @@ def setup_server(): from cherrypy.test import helper class ServerStateTests(helper.CPWebCase): - + setup_server = staticmethod(setup_server) + def setUp(self): cherrypy.server.socket_timeout = 0.1 - + def test_0_NormalStateFlow(self): engine.stop() # Our db_connection should not be running self.assertEqual(db_connection.running, False) self.assertEqual(db_connection.startcount, 1) self.assertEqual(len(db_connection.threads), 0) - + # Test server start engine.start() self.assertEqual(engine.state, engine.states.STARTED) - + host = cherrypy.server.socket_host port = cherrypy.server.socket_port self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port) - + # The db_connection should be running now self.assertEqual(db_connection.running, True) self.assertEqual(db_connection.startcount, 2) self.assertEqual(len(db_connection.threads), 0) - + self.getPage("/") self.assertBody("Hello World") self.assertEqual(len(db_connection.threads), 1) - + # Test engine stop. This will also stop the HTTP server. engine.stop() self.assertEqual(engine.state, engine.states.STOPPED) - + # Verify that our custom stop function was called self.assertEqual(db_connection.running, False) self.assertEqual(len(db_connection.threads), 0) - + # Block the main thread now and verify that exit() works. def exittest(): self.getPage("/") @@ -136,19 +137,19 @@ class ServerStateTests(helper.CPWebCase): engine.start_with_callback(exittest) engine.block() self.assertEqual(engine.state, engine.states.EXITING) - + def test_1_Restart(self): cherrypy.server.start() engine.start() - + # The db_connection should be running now self.assertEqual(db_connection.running, True) grace = db_connection.gracecount - + self.getPage("/") self.assertBody("Hello World") self.assertEqual(len(db_connection.threads), 1) - + # Test server restart from this thread engine.graceful() self.assertEqual(engine.state, engine.states.STARTED) @@ -157,7 +158,7 @@ class ServerStateTests(helper.CPWebCase): self.assertEqual(db_connection.running, True) self.assertEqual(db_connection.gracecount, grace + 1) self.assertEqual(len(db_connection.threads), 1) - + # Test server restart from inside a page handler self.getPage("/graceful") self.assertEqual(engine.state, engine.states.STARTED) @@ -167,18 +168,18 @@ class ServerStateTests(helper.CPWebCase): # Since we are requesting synchronously, is only one thread used? # Note that the "/graceful" request has been flushed. self.assertEqual(len(db_connection.threads), 0) - + engine.stop() self.assertEqual(engine.state, engine.states.STOPPED) self.assertEqual(db_connection.running, False) self.assertEqual(len(db_connection.threads), 0) - + def test_2_KeyboardInterrupt(self): # Raise a keyboard interrupt in the HTTP server's main thread. # We must start the server in this, the main thread engine.start() cherrypy.server.start() - + self.persistent = True try: # Make the first request and assert there's no "Connection: close". @@ -186,23 +187,23 @@ class ServerStateTests(helper.CPWebCase): self.assertStatus('200 OK') self.assertBody("Hello World") self.assertNoHeader("Connection") - + cherrypy.server.httpserver.interrupt = KeyboardInterrupt engine.block() - + self.assertEqual(db_connection.running, False) self.assertEqual(len(db_connection.threads), 0) self.assertEqual(engine.state, engine.states.EXITING) finally: self.persistent = False - + # Raise a keyboard interrupt in a page handler; on multithreaded # servers, this should occur in one of the worker threads. # This should raise a BadStatusLine error, since the worker # thread will just die without writing a response. engine.start() cherrypy.server.start() - + try: self.getPage("/ctrlc") except BadStatusLine: @@ -210,19 +211,19 @@ class ServerStateTests(helper.CPWebCase): else: print(self.body) self.fail("AssertionError: BadStatusLine not raised") - + engine.block() self.assertEqual(db_connection.running, False) self.assertEqual(len(db_connection.threads), 0) - + def test_3_Deadlocks(self): cherrypy.config.update({'response.timeout': 0.2}) - + engine.start() cherrypy.server.start() try: self.assertNotEqual(engine.timeout_monitor.thread, None) - + # Request a "normal" page. self.assertEqual(engine.timeout_monitor.servings, []) self.getPage("/") @@ -231,12 +232,12 @@ class ServerStateTests(helper.CPWebCase): while engine.timeout_monitor.servings: print ".", time.sleep(0.01) - + # Request a page that explicitly checks itself for deadlock. # The deadlock_timeout should be 2 secs. self.getPage("/block_explicit") self.assertBody("broken!") - + # Request a page that implicitly breaks deadlock. # If we deadlock, we want to touch as little code as possible, # so we won't even call handle_error, just bail ASAP. @@ -245,36 +246,36 @@ class ServerStateTests(helper.CPWebCase): self.assertInBody("raise cherrypy.TimeoutError()") finally: engine.exit() - + def test_4_Autoreload(self): # Start the demo script in a new process p = helper.CPProcess(ssl=(self.scheme.lower()=='https')) p.write_conf( extra='test_case_name: "test_4_Autoreload"') - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') try: self.getPage("/start") start = float(self.body) - + # Give the autoreloader time to cache the file time. time.sleep(2) - + # Touch the file - os.utime(os.path.join(thisdir, "test_states_demo.py"), None) - + os.utime(os.path.join(thisdir, "_test_states_demo.py"), None) + # Give the autoreloader time to re-exec the process time.sleep(2) host = cherrypy.server.socket_host port = cherrypy.server.socket_port cherrypy._cpserver.wait_for_occupied_port(host, port) - + self.getPage("/start") self.assert_(float(self.body) > start) finally: # Shut down the spawned process self.getPage("/exit") p.join() - + def test_5_Start_Error(self): # If a process errors during start, it should stop the engine # and exit with a non-zero exit code. @@ -285,15 +286,14 @@ class ServerStateTests(helper.CPWebCase): test_case_name: "test_5_Start_Error" """ ) - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') if p.exit_code == 0: self.fail("Process failed to return nonzero exit code.") class PluginTests(helper.CPWebCase): - def test_daemonize(self): - if os.name not in ['posix']: + if os.name not in ['posix']: return self.skip("skipped (not on posix) ") self.HOST = '127.0.0.1' self.PORT = 8081 @@ -306,7 +306,7 @@ class PluginTests(helper.CPWebCase): socket_port=8081) p.write_conf( extra='test_case_name: "test_daemonize"') - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') try: # Just get the pid of the daemonization process. self.getPage("/pid") @@ -317,7 +317,7 @@ class PluginTests(helper.CPWebCase): # Shut down the spawned process self.getPage("/exit") p.join() - + # Wait until here to test the exit code because we want to ensure # that we wait for the daemon to finish running before we fail. if p.exit_code != 0: @@ -325,34 +325,33 @@ class PluginTests(helper.CPWebCase): class SignalHandlingTests(helper.CPWebCase): - def test_SIGHUP_tty(self): # When not daemonized, SIGHUP should shut down the server. try: from signal import SIGHUP except ImportError: return self.skip("skipped (no SIGHUP) ") - + # Spawn the process. p = helper.CPProcess(ssl=(self.scheme.lower()=='https')) p.write_conf( extra='test_case_name: "test_SIGHUP_tty"') - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') # Send a SIGHUP os.kill(p.get_pid(), SIGHUP) # This might hang if things aren't working right, but meh. p.join() - + def test_SIGHUP_daemonized(self): # When daemonized, SIGHUP should restart the server. try: from signal import SIGHUP except ImportError: return self.skip("skipped (no SIGHUP) ") - - if os.name not in ['posix']: + + if os.name not in ['posix']: return self.skip("skipped (not on posix) ") - + # Spawn the process and wait, when this returns, the original process # is finished. If it daemonized properly, we should still be able # to access pages. @@ -360,8 +359,8 @@ class SignalHandlingTests(helper.CPWebCase): wait=True, daemonize=True) p.write_conf( extra='test_case_name: "test_SIGHUP_daemonized"') - p.start(imports='cherrypy.test.test_states_demo') - + p.start(imports='cherrypy.test._test_states_demo') + pid = p.get_pid() try: # Send a SIGHUP @@ -376,64 +375,64 @@ class SignalHandlingTests(helper.CPWebCase): # Shut down the spawned process self.getPage("/exit") p.join() - + def test_SIGTERM(self): # SIGTERM should shut down the server whether daemonized or not. try: from signal import SIGTERM except ImportError: return self.skip("skipped (no SIGTERM) ") - + try: from os import kill except ImportError: return self.skip("skipped (no os.kill) ") - + # Spawn a normal, undaemonized process. p = helper.CPProcess(ssl=(self.scheme.lower()=='https')) p.write_conf( extra='test_case_name: "test_SIGTERM"') - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') # Send a SIGTERM os.kill(p.get_pid(), SIGTERM) # This might hang if things aren't working right, but meh. p.join() - - if os.name in ['posix']: + + if os.name in ['posix']: # Spawn a daemonized process and test again. p = helper.CPProcess(ssl=(self.scheme.lower()=='https'), wait=True, daemonize=True) p.write_conf( extra='test_case_name: "test_SIGTERM_2"') - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') # Send a SIGTERM os.kill(p.get_pid(), SIGTERM) # This might hang if things aren't working right, but meh. p.join() - + def test_signal_handler_unsubscribe(self): try: from signal import SIGTERM except ImportError: return self.skip("skipped (no SIGTERM) ") - + try: from os import kill except ImportError: return self.skip("skipped (no os.kill) ") - + # Spawn a normal, undaemonized process. p = helper.CPProcess(ssl=(self.scheme.lower()=='https')) p.write_conf( extra="""unsubsig: True test_case_name: "test_signal_handler_unsubscribe" """) - p.start(imports='cherrypy.test.test_states_demo') + p.start(imports='cherrypy.test._test_states_demo') # Send a SIGTERM os.kill(p.get_pid(), SIGTERM) # This might hang if things aren't working right, but meh. p.join() - + # Assert the old handler ran. target_line = open(p.error_log, 'rb').readlines()[-10] if not "I am an old SIGTERM handler." in target_line: diff --git a/cherrypy/test/test_static.py b/cherrypy/test/test_static.py index 524db795..fedc9280 100644 --- a/cherrypy/test/test_static.py +++ b/cherrypy/test/test_static.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + from httplib import HTTPConnection, HTTPSConnection try: @@ -17,103 +17,105 @@ import threading import cherrypy from cherrypy.lib import static -def setup_server(): - if not os.path.exists(has_space_filepath): - open(has_space_filepath, 'wb').write('Hello, world\r\n') - if not os.path.exists(bigfile_filepath): - open(bigfile_filepath, 'wb').write("x" * BIGFILE_SIZE) - - class Root: - - def bigfile(self): - from cherrypy.lib import static - self.f = static.serve_file(bigfile_filepath) - return self.f - bigfile.exposed = True - bigfile._cp_config = {'response.stream': True} + + +from cherrypy.test import helper + +class StaticTest(helper.CPWebCase): + @staticmethod + def setup_server(): + if not os.path.exists(has_space_filepath): + open(has_space_filepath, 'wb').write('Hello, world\r\n') + if not os.path.exists(bigfile_filepath): + open(bigfile_filepath, 'wb').write("x" * BIGFILE_SIZE) - def tell(self): - if self.f.input.closed: - return '' - return repr(self.f.input.tell()).rstrip('L') - tell.exposed = True + class Root: + + def bigfile(self): + from cherrypy.lib import static + self.f = static.serve_file(bigfile_filepath) + return self.f + bigfile.exposed = True + bigfile._cp_config = {'response.stream': True} + + def tell(self): + if self.f.input.closed: + return '' + return repr(self.f.input.tell()).rstrip('L') + tell.exposed = True + + def fileobj(self): + f = open(os.path.join(curdir, 'style.css'), 'rb') + return static.serve_fileobj(f, content_type='text/css') + fileobj.exposed = True + + def stringio(self): + f = StringIO.StringIO('Fee\nfie\nfo\nfum') + return static.serve_fileobj(f, content_type='text/plain') + stringio.exposed = True - def fileobj(self): - f = open(os.path.join(curdir, 'style.css'), 'rb') - return static.serve_fileobj(f, content_type='text/css') - fileobj.exposed = True + class Static: + + def index(self): + return 'You want the Baron? You can have the Baron!' + index.exposed = True + + def dynamic(self): + return "This is a DYNAMIC page" + dynamic.exposed = True - def stringio(self): - f = StringIO.StringIO('Fee\nfie\nfo\nfum') - return static.serve_fileobj(f, content_type='text/plain') - stringio.exposed = True - - class Static: - def index(self): - return 'You want the Baron? You can have the Baron!' - index.exposed = True + root = Root() + root.static = Static() - def dynamic(self): - return "This is a DYNAMIC page" - dynamic.exposed = True - - - root = Root() - root.static = Static() - - rootconf = { - '/static': { - 'tools.staticdir.on': True, - 'tools.staticdir.dir': 'static', - 'tools.staticdir.root': curdir, - }, - '/style.css': { - 'tools.staticfile.on': True, - 'tools.staticfile.filename': os.path.join(curdir, 'style.css'), - }, - '/docroot': { - 'tools.staticdir.on': True, - 'tools.staticdir.root': curdir, - 'tools.staticdir.dir': 'static', - 'tools.staticdir.index': 'index.html', - }, - '/error': { - 'tools.staticdir.on': True, - 'request.show_tracebacks': True, - }, - } - rootApp = cherrypy.Application(root) - rootApp.merge(rootconf) - - test_app_conf = { - '/test': { - 'tools.staticdir.index': 'index.html', - 'tools.staticdir.on': True, - 'tools.staticdir.root': curdir, - 'tools.staticdir.dir': 'static', + rootconf = { + '/static': { + 'tools.staticdir.on': True, + 'tools.staticdir.dir': 'static', + 'tools.staticdir.root': curdir, }, - } - testApp = cherrypy.Application(Static()) - testApp.merge(test_app_conf) - - vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp}) - cherrypy.tree.graft(vhost) - - -def teardown_server(): - for f in (has_space_filepath, bigfile_filepath): - if os.path.exists(f): - try: - os.unlink(f) - except: - pass - + '/style.css': { + 'tools.staticfile.on': True, + 'tools.staticfile.filename': os.path.join(curdir, 'style.css'), + }, + '/docroot': { + 'tools.staticdir.on': True, + 'tools.staticdir.root': curdir, + 'tools.staticdir.dir': 'static', + 'tools.staticdir.index': 'index.html', + }, + '/error': { + 'tools.staticdir.on': True, + 'request.show_tracebacks': True, + }, + } + rootApp = cherrypy.Application(root) + rootApp.merge(rootconf) + + test_app_conf = { + '/test': { + 'tools.staticdir.index': 'index.html', + 'tools.staticdir.on': True, + 'tools.staticdir.root': curdir, + 'tools.staticdir.dir': 'static', + }, + } + testApp = cherrypy.Application(Static()) + testApp.merge(test_app_conf) + + vhost = cherrypy._cpwsgi.VirtualHost(rootApp, {'virt.net': testApp}) + cherrypy.tree.graft(vhost) -from cherrypy.test import helper + @staticmethod + def teardown_server(): + for f in (has_space_filepath, bigfile_filepath): + if os.path.exists(f): + try: + os.unlink(f) + except: + pass -class StaticTest(helper.CPWebCase): def testStatic(self): self.getPage("/static/index.html") diff --git a/cherrypy/test/test_tools.py b/cherrypy/test/test_tools.py index acdd3bd1..dd9701d1 100644 --- a/cherrypy/test/test_tools.py +++ b/cherrypy/test/test_tools.py @@ -12,7 +12,7 @@ timeout = 0.2 import types from cherrypy.test import test -test.prefer_parent_path() + import cherrypy from cherrypy import tools @@ -20,232 +20,233 @@ from cherrypy import tools europoundUnicode = u'\x80\xa3' -def setup_server(): - - # Put check_access in a custom toolbox with its own namespace - myauthtools = cherrypy._cptools.Toolbox("myauth") - - def check_access(default=False): - if not getattr(cherrypy.request, "userid", default): - raise cherrypy.HTTPError(401) - myauthtools.check_access = cherrypy.Tool('before_request_body', check_access) - - def numerify(): - def number_it(body): - for chunk in body: - for k, v in cherrypy.request.numerify_map: - chunk = chunk.replace(k, v) - yield chunk - cherrypy.response.body = number_it(cherrypy.response.body) - - class NumTool(cherrypy.Tool): - def _setup(self): - def makemap(): - m = self._merged_args().get("map", {}) - cherrypy.request.numerify_map = m.items() - cherrypy.request.hooks.attach('on_start_resource', makemap) - - def critical(): - cherrypy.request.error_response = cherrypy.HTTPError(502).set_response - critical.failsafe = True - - cherrypy.request.hooks.attach('on_start_resource', critical) - cherrypy.request.hooks.attach(self._point, self.callable) - - tools.numerify = NumTool('before_finalize', numerify) - - # It's not mandatory to inherit from cherrypy.Tool. - class NadsatTool: + +# Client-side code # + +from cherrypy.test import helper + + +class ToolTests(helper.CPWebCase): + @staticmethod + def setup_server(): + + # Put check_access in a custom toolbox with its own namespace + myauthtools = cherrypy._cptools.Toolbox("myauth") - def __init__(self): - self.ended = {} - self._name = "nadsat" + def check_access(default=False): + if not getattr(cherrypy.request, "userid", default): + raise cherrypy.HTTPError(401) + myauthtools.check_access = cherrypy.Tool('before_request_body', check_access) - def nadsat(self): - def nadsat_it_up(body): + def numerify(): + def number_it(body): for chunk in body: - chunk = chunk.replace("good", "horrorshow") - chunk = chunk.replace("piece", "lomtick") + for k, v in cherrypy.request.numerify_map: + chunk = chunk.replace(k, v) yield chunk - cherrypy.response.body = nadsat_it_up(cherrypy.response.body) - nadsat.priority = 0 + cherrypy.response.body = number_it(cherrypy.response.body) - def cleanup(self): - # This runs after the request has been completely written out. - cherrypy.response.body = "razdrez" - id = cherrypy.request.params.get("id") - if id: - self.ended[id] = True - cleanup.failsafe = True + class NumTool(cherrypy.Tool): + def _setup(self): + def makemap(): + m = self._merged_args().get("map", {}) + cherrypy.request.numerify_map = m.items() + cherrypy.request.hooks.attach('on_start_resource', makemap) + + def critical(): + cherrypy.request.error_response = cherrypy.HTTPError(502).set_response + critical.failsafe = True + + cherrypy.request.hooks.attach('on_start_resource', critical) + cherrypy.request.hooks.attach(self._point, self.callable) - def _setup(self): - cherrypy.request.hooks.attach('before_finalize', self.nadsat) - cherrypy.request.hooks.attach('on_end_request', self.cleanup) - tools.nadsat = NadsatTool() - - def pipe_body(): - cherrypy.request.process_request_body = False - clen = int(cherrypy.request.headers['Content-Length']) - cherrypy.request.body = cherrypy.request.rfile.read(clen) - - # Assert that we can use a callable object instead of a function. - class Rotator(object): - def __call__(self, scale): - r = cherrypy.response - r.collapse_body() - r.body = [chr((ord(x) + scale) % 256) for x in r.body[0]] - cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator()) - - def stream_handler(next_handler, *args, **kwargs): - cherrypy.response.output = o = StringIO() - try: - response = next_handler(*args, **kwargs) - # Ignore the response and return our accumulated output instead. - return o.getvalue() - finally: - o.close() - cherrypy.tools.streamer = cherrypy._cptools.HandlerWrapperTool(stream_handler) - - class Root: - def index(self): - return "Howdy earth!" - index.exposed = True + tools.numerify = NumTool('before_finalize', numerify) - def tarfile(self): - cherrypy.response.output.write('I am ') - cherrypy.response.output.write('a tarfile') - tarfile.exposed = True - tarfile._cp_config = {'tools.streamer.on': True} + # It's not mandatory to inherit from cherrypy.Tool. + class NadsatTool: + + def __init__(self): + self.ended = {} + self._name = "nadsat" + + def nadsat(self): + def nadsat_it_up(body): + for chunk in body: + chunk = chunk.replace("good", "horrorshow") + chunk = chunk.replace("piece", "lomtick") + yield chunk + cherrypy.response.body = nadsat_it_up(cherrypy.response.body) + nadsat.priority = 0 + + def cleanup(self): + # This runs after the request has been completely written out. + cherrypy.response.body = "razdrez" + id = cherrypy.request.params.get("id") + if id: + self.ended[id] = True + cleanup.failsafe = True + + def _setup(self): + cherrypy.request.hooks.attach('before_finalize', self.nadsat) + cherrypy.request.hooks.attach('on_end_request', self.cleanup) + tools.nadsat = NadsatTool() - def euro(self): - hooks = list(cherrypy.request.hooks['before_finalize']) - hooks.sort() - cbnames = [x.callback.__name__ for x in hooks] - assert cbnames == ['gzip'], cbnames - priorities = [x.priority for x in hooks] - assert priorities == [80], priorities - yield u"Hello," - yield u"world" - yield europoundUnicode - euro.exposed = True + def pipe_body(): + cherrypy.request.process_request_body = False + clen = int(cherrypy.request.headers['Content-Length']) + cherrypy.request.body = cherrypy.request.rfile.read(clen) - # Bare hooks - def pipe(self): - return cherrypy.request.body - pipe.exposed = True - pipe._cp_config = {'hooks.before_request_body': pipe_body} + # Assert that we can use a callable object instead of a function. + class Rotator(object): + def __call__(self, scale): + r = cherrypy.response + r.collapse_body() + r.body = [chr((ord(x) + scale) % 256) for x in r.body[0]] + cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator()) - # Multiple decorators; include kwargs just for fun. - # Note that rotator must run before gzip. - def decorated_euro(self, *vpath): - yield u"Hello," - yield u"world" - yield europoundUnicode - decorated_euro.exposed = True - decorated_euro = tools.gzip(compress_level=6)(decorated_euro) - decorated_euro = tools.rotator(scale=3)(decorated_euro) - - root = Root() - - - class TestType(type): - """Metaclass which automatically exposes all functions in each subclass, - and adds an instance of the subclass as an attribute of root. - """ - def __init__(cls, name, bases, dct): - type.__init__(cls, name, bases, dct) - for value in dct.itervalues(): - if isinstance(value, types.FunctionType): - value.exposed = True - setattr(root, name.lower(), cls()) - class Test(object): - __metaclass__ = TestType - - - # METHOD ONE: - # Declare Tools in _cp_config - class Demo(Test): + def stream_handler(next_handler, *args, **kwargs): + cherrypy.response.output = o = StringIO() + try: + response = next_handler(*args, **kwargs) + # Ignore the response and return our accumulated output instead. + return o.getvalue() + finally: + o.close() + cherrypy.tools.streamer = cherrypy._cptools.HandlerWrapperTool(stream_handler) + + class Root: + def index(self): + return "Howdy earth!" + index.exposed = True + + def tarfile(self): + cherrypy.response.output.write('I am ') + cherrypy.response.output.write('a tarfile') + tarfile.exposed = True + tarfile._cp_config = {'tools.streamer.on': True} + + def euro(self): + hooks = list(cherrypy.request.hooks['before_finalize']) + hooks.sort() + cbnames = [x.callback.__name__ for x in hooks] + assert cbnames == ['gzip'], cbnames + priorities = [x.priority for x in hooks] + assert priorities == [80], priorities + yield u"Hello," + yield u"world" + yield europoundUnicode + euro.exposed = True + + # Bare hooks + def pipe(self): + return cherrypy.request.body + pipe.exposed = True + pipe._cp_config = {'hooks.before_request_body': pipe_body} + + # Multiple decorators; include kwargs just for fun. + # Note that rotator must run before gzip. + def decorated_euro(self, *vpath): + yield u"Hello," + yield u"world" + yield europoundUnicode + decorated_euro.exposed = True + decorated_euro = tools.gzip(compress_level=6)(decorated_euro) + decorated_euro = tools.rotator(scale=3)(decorated_euro) - _cp_config = {"tools.nadsat.on": True} + root = Root() - def index(self, id=None): - return "A good piece of cherry pie" - def ended(self, id): - return repr(tools.nadsat.ended[id]) + class TestType(type): + """Metaclass which automatically exposes all functions in each subclass, + and adds an instance of the subclass as an attribute of root. + """ + def __init__(cls, name, bases, dct): + type.__init__(cls, name, bases, dct) + for value in dct.itervalues(): + if isinstance(value, types.FunctionType): + value.exposed = True + setattr(root, name.lower(), cls()) + class Test(object): + __metaclass__ = TestType - def err(self, id=None): - raise ValueError() - def errinstream(self, id=None): - yield "nonconfidential" - raise ValueError() - yield "confidential" + # METHOD ONE: + # Declare Tools in _cp_config + class Demo(Test): + + _cp_config = {"tools.nadsat.on": True} + + def index(self, id=None): + return "A good piece of cherry pie" + + def ended(self, id): + return repr(tools.nadsat.ended[id]) + + def err(self, id=None): + raise ValueError() + + def errinstream(self, id=None): + yield "nonconfidential" + raise ValueError() + yield "confidential" + + # METHOD TWO: decorator using Tool() + # We support Python 2.3, but the @-deco syntax would look like this: + # @tools.check_access() + def restricted(self): + return "Welcome!" + restricted = myauthtools.check_access()(restricted) + userid = restricted + + def err_in_onstart(self): + return "success!" + + def stream(self, id=None): + for x in xrange(100000000): + yield str(x) + stream._cp_config = {'response.stream': True} - # METHOD TWO: decorator using Tool() - # We support Python 2.3, but the @-deco syntax would look like this: - # @tools.check_access() - def restricted(self): - return "Welcome!" - restricted = myauthtools.check_access()(restricted) - userid = restricted - def err_in_onstart(self): - return "success!" + conf = { + # METHOD THREE: + # Declare Tools in detached config + '/demo': { + 'tools.numerify.on': True, + 'tools.numerify.map': {"pie": "3.14159"}, + }, + '/demo/restricted': { + 'request.show_tracebacks': False, + }, + '/demo/userid': { + 'request.show_tracebacks': False, + 'myauth.check_access.default': True, + }, + '/demo/errinstream': { + 'response.stream': True, + }, + '/demo/err_in_onstart': { + # Because this isn't a dict, on_start_resource will error. + 'tools.numerify.map': "pie->3.14159" + }, + # Combined tools + '/euro': { + 'tools.gzip.on': True, + 'tools.encode.on': True, + }, + # Priority specified in config + '/decorated_euro/subpath': { + 'tools.gzip.priority': 10, + }, + # Handler wrappers + '/tarfile': {'tools.streamer.on': True} + } + app = cherrypy.tree.mount(root, config=conf) + app.request_class.namespaces['myauth'] = myauthtools - def stream(self, id=None): - for x in xrange(100000000): - yield str(x) - stream._cp_config = {'response.stream': True} - - - conf = { - # METHOD THREE: - # Declare Tools in detached config - '/demo': { - 'tools.numerify.on': True, - 'tools.numerify.map': {"pie": "3.14159"}, - }, - '/demo/restricted': { - 'request.show_tracebacks': False, - }, - '/demo/userid': { - 'request.show_tracebacks': False, - 'myauth.check_access.default': True, - }, - '/demo/errinstream': { - 'response.stream': True, - }, - '/demo/err_in_onstart': { - # Because this isn't a dict, on_start_resource will error. - 'tools.numerify.map': "pie->3.14159" - }, - # Combined tools - '/euro': { - 'tools.gzip.on': True, - 'tools.encode.on': True, - }, - # Priority specified in config - '/decorated_euro/subpath': { - 'tools.gzip.priority': 10, - }, - # Handler wrappers - '/tarfile': {'tools.streamer.on': True} - } - app = cherrypy.tree.mount(root, config=conf) - app.request_class.namespaces['myauth'] = myauthtools - - if sys.version_info >= (2, 5): - from cherrypy.test import py25 - root.tooldecs = py25.ToolExamples() + if sys.version_info >= (2, 5): + from cherrypy.test import py25 + root.tooldecs = py25.ToolExamples() - -# Client-side code # - -from cherrypy.test import helper - - -class ToolTests(helper.CPWebCase): def testHookErrors(self): self.getPage("/demo/?id=1") diff --git a/cherrypy/test/test_tutorials.py b/cherrypy/test/test_tutorials.py index 1c3157bb..989a1506 100644 --- a/cherrypy/test/test_tutorials.py +++ b/cherrypy/test/test_tutorials.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + import sys @@ -7,46 +7,47 @@ import cherrypy from cherrypy.test import helper -def setup_server(): - - conf = cherrypy.config.copy() - - def load_tut_module(name): - """Import or reload tutorial module as needed.""" - cherrypy.config.reset() - cherrypy.config.update(conf) - - target = "cherrypy.tutorial." + name - if target in sys.modules: - module = reload(sys.modules[target]) - else: - module = __import__(target, globals(), locals(), ['']) - # The above import will probably mount a new app at "". - app = cherrypy.tree.apps[""] - - app.root.load_tut_module = load_tut_module - app.root.sessions = sessions - app.root.traceback_setting = traceback_setting - - test.sync_apps() - load_tut_module.exposed = True - - def sessions(): - cherrypy.config.update({"tools.sessions.on": True}) - sessions.exposed = True - - def traceback_setting(): - return repr(cherrypy.request.show_tracebacks) - traceback_setting.exposed = True - - class Dummy: - pass - root = Dummy() - root.load_tut_module = load_tut_module - cherrypy.tree.mount(root) - class TutorialTest(helper.CPWebCase): + @staticmethod + def setup_server(): + + conf = cherrypy.config.copy() + + def load_tut_module(name): + """Import or reload tutorial module as needed.""" + cherrypy.config.reset() + cherrypy.config.update(conf) + + target = "cherrypy.tutorial." + name + if target in sys.modules: + module = reload(sys.modules[target]) + else: + module = __import__(target, globals(), locals(), ['']) + # The above import will probably mount a new app at "". + app = cherrypy.tree.apps[""] + + app.root.load_tut_module = load_tut_module + app.root.sessions = sessions + app.root.traceback_setting = traceback_setting + + test.sync_apps() + load_tut_module.exposed = True + + def sessions(): + cherrypy.config.update({"tools.sessions.on": True}) + sessions.exposed = True + + def traceback_setting(): + return repr(cherrypy.request.show_tracebacks) + traceback_setting.exposed = True + + class Dummy: + pass + root = Dummy() + root.load_tut_module = load_tut_module + cherrypy.tree.mount(root) + def test01HelloWorld(self): self.getPage("/load_tut_module/tut01_helloworld") diff --git a/cherrypy/test/test_virtualhost.py b/cherrypy/test/test_virtualhost.py index d963b54f..01be0b68 100644 --- a/cherrypy/test/test_virtualhost.py +++ b/cherrypy/test/test_virtualhost.py @@ -1,65 +1,66 @@ from cherrypy.test import test -test.prefer_parent_path() + import os curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) import cherrypy -def setup_server(): - class Root: - def index(self): - return "Hello, world" - index.exposed = True - - def dom4(self): - return "Under construction" - dom4.exposed = True - - def method(self, value): - return "You sent %s" % repr(value) - method.exposed = True - - class VHost: - def __init__(self, sitename): - self.sitename = sitename - - def index(self): - return "Welcome to %s" % self.sitename - index.exposed = True - - def vmethod(self, value): - return "You sent %s" % repr(value) - vmethod.exposed = True - - def url(self): - return cherrypy.url("nextpage") - url.exposed = True - - # Test static as a handler (section must NOT include vhost prefix) - static = cherrypy.tools.staticdir.handler(section='/static', dir=curdir) - - root = Root() - root.mydom2 = VHost("Domain 2") - root.mydom3 = VHost("Domain 3") - hostmap = {'www.mydom2.com': '/mydom2', - 'www.mydom3.com': '/mydom3', - 'www.mydom4.com': '/dom4', - } - cherrypy.tree.mount(root, config={ - '/': {'request.dispatch': cherrypy.dispatch.VirtualHost(**hostmap)}, - # Test static in config (section must include vhost prefix) - '/mydom2/static2': {'tools.staticdir.on': True, - 'tools.staticdir.root': curdir, - 'tools.staticdir.dir': 'static', - 'tools.staticdir.index': 'index.html', - }, - }) from cherrypy.test import helper class VirtualHostTest(helper.CPWebCase): + @staticmethod + def setup_server(): + class Root: + def index(self): + return "Hello, world" + index.exposed = True + + def dom4(self): + return "Under construction" + dom4.exposed = True + + def method(self, value): + return "You sent %s" % repr(value) + method.exposed = True + + class VHost: + def __init__(self, sitename): + self.sitename = sitename + + def index(self): + return "Welcome to %s" % self.sitename + index.exposed = True + + def vmethod(self, value): + return "You sent %s" % repr(value) + vmethod.exposed = True + + def url(self): + return cherrypy.url("nextpage") + url.exposed = True + + # Test static as a handler (section must NOT include vhost prefix) + static = cherrypy.tools.staticdir.handler(section='/static', dir=curdir) + + root = Root() + root.mydom2 = VHost("Domain 2") + root.mydom3 = VHost("Domain 3") + hostmap = {'www.mydom2.com': '/mydom2', + 'www.mydom3.com': '/mydom3', + 'www.mydom4.com': '/dom4', + } + cherrypy.tree.mount(root, config={ + '/': {'request.dispatch': cherrypy.dispatch.VirtualHost(**hostmap)}, + # Test static in config (section must include vhost prefix) + '/mydom2/static2': {'tools.staticdir.on': True, + 'tools.staticdir.root': curdir, + 'tools.staticdir.dir': 'static', + 'tools.staticdir.index': 'index.html', + }, + }) def testVirtualHost(self): self.getPage("/", [('Host', 'www.mydom1.com')]) diff --git a/cherrypy/test/test_wsgi_ns.py b/cherrypy/test/test_wsgi_ns.py index 9aa36b85..78b50eb0 100644 --- a/cherrypy/test/test_wsgi_ns.py +++ b/cherrypy/test/test_wsgi_ns.py @@ -1,78 +1,79 @@ from cherrypy.test import test -test.prefer_parent_path() + import cherrypy -def setup_server(): - - class WSGIResponse(object): - - def __init__(self, appresults): - self.appresults = appresults - self.iter = iter(appresults) + +from cherrypy.test import helper + + +class WSGI_Namespace_Test(helper.CPWebCase): + @staticmethod + def setup_server(): - def __iter__(self): - return self + class WSGIResponse(object): + + def __init__(self, appresults): + self.appresults = appresults + self.iter = iter(appresults) + + def __iter__(self): + return self + + def next(self): + return self.iter.next() + + def close(self): + if hasattr(self.appresults, "close"): + self.appresults.close() - def next(self): - return self.iter.next() - def close(self): - if hasattr(self.appresults, "close"): - self.appresults.close() - - - class ChangeCase(object): + class ChangeCase(object): + + def __init__(self, app, to=None): + self.app = app + self.to = to + + def __call__(self, environ, start_response): + res = self.app(environ, start_response) + class CaseResults(WSGIResponse): + def next(this): + return getattr(this.iter.next(), self.to)() + return CaseResults(res) - def __init__(self, app, to=None): - self.app = app - self.to = to + class Replacer(object): + + def __init__(self, app, map={}): + self.app = app + self.map = map + + def __call__(self, environ, start_response): + res = self.app(environ, start_response) + class ReplaceResults(WSGIResponse): + def next(this): + line = this.iter.next() + for k, v in self.map.iteritems(): + line = line.replace(k, v) + return line + return ReplaceResults(res) - def __call__(self, environ, start_response): - res = self.app(environ, start_response) - class CaseResults(WSGIResponse): - def next(this): - return getattr(this.iter.next(), self.to)() - return CaseResults(res) - - class Replacer(object): + class Root(object): + + def index(self): + return "HellO WoRlD!" + index.exposed = True - def __init__(self, app, map={}): - self.app = app - self.map = map - def __call__(self, environ, start_response): - res = self.app(environ, start_response) - class ReplaceResults(WSGIResponse): - def next(this): - line = this.iter.next() - for k, v in self.map.iteritems(): - line = line.replace(k, v) - return line - return ReplaceResults(res) - - class Root(object): + root_conf = {'wsgi.pipeline': [('replace', Replacer)], + 'wsgi.replace.map': {'L': 'X', 'l': 'r'}, + } - def index(self): - return "HellO WoRlD!" - index.exposed = True - - - root_conf = {'wsgi.pipeline': [('replace', Replacer)], - 'wsgi.replace.map': {'L': 'X', 'l': 'r'}, - } - - app = cherrypy.Application(Root()) - app.wsgiapp.pipeline.append(('changecase', ChangeCase)) - app.wsgiapp.config['changecase'] = {'to': 'upper'} - cherrypy.tree.mount(app, config={'/': root_conf}) + app = cherrypy.Application(Root()) + app.wsgiapp.pipeline.append(('changecase', ChangeCase)) + app.wsgiapp.config['changecase'] = {'to': 'upper'} + cherrypy.tree.mount(app, config={'/': root_conf}) - -from cherrypy.test import helper - - -class WSGI_Namespace_Test(helper.CPWebCase): def test_pipeline(self): if not cherrypy.server.httpserver: diff --git a/cherrypy/test/test_wsgi_vhost.py b/cherrypy/test/test_wsgi_vhost.py index c3bf7810..e969b4a9 100644 --- a/cherrypy/test/test_wsgi_vhost.py +++ b/cherrypy/test/test_wsgi_vhost.py @@ -1,35 +1,36 @@ from cherrypy.test import test -test.prefer_parent_path() + import cherrypy -def setup_server(): - - class ClassOfRoot(object): - - def __init__(self, name): - self.name = name - - def index(self): - return "Welcome to the %s website!" % self.name - index.exposed = True - - - default = cherrypy.Application(None) - - domains = {} - for year in range(1997, 2008): - app = cherrypy.Application(ClassOfRoot('Class of %s' % year)) - domains['www.classof%s.example' % year] = app - - cherrypy.tree.graft(cherrypy._cpwsgi.VirtualHost(default, domains)) from cherrypy.test import helper class WSGI_VirtualHost_Test(helper.CPWebCase): + @staticmethod + def setup_server(): + + class ClassOfRoot(object): + + def __init__(self, name): + self.name = name + + def index(self): + return "Welcome to the %s website!" % self.name + index.exposed = True + + + default = cherrypy.Application(None) + + domains = {} + for year in range(1997, 2008): + app = cherrypy.Application(ClassOfRoot('Class of %s' % year)) + domains['www.classof%s.example' % year] = app + + cherrypy.tree.graft(cherrypy._cpwsgi.VirtualHost(default, domains)) def test_welcome(self): if not cherrypy.server.using_wsgi: diff --git a/cherrypy/test/test_wsgiapps.py b/cherrypy/test/test_wsgiapps.py index 5854560a..1b1552b4 100644 --- a/cherrypy/test/test_wsgiapps.py +++ b/cherrypy/test/test_wsgiapps.py @@ -1,84 +1,85 @@ from cherrypy.test import test -test.prefer_parent_path() -def setup_server(): - import os - curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) - - import cherrypy - - def test_app(environ, start_response): - status = '200 OK' - response_headers = [('Content-type', 'text/plain')] - start_response(status, response_headers) - output = ['Hello, world!\n', - 'This is a wsgi app running within CherryPy!\n\n'] - keys = list(environ.keys()) - keys.sort() - for k in keys: - output.append('%s: %s\n' % (k,environ[k])) - return output - - def test_empty_string_app(environ, start_response): - status = '200 OK' - response_headers = [('Content-type', 'text/plain')] - start_response(status, response_headers) - return ['Hello', '', ' ', '', 'world'] - - - class WSGIResponse(object): + + +from cherrypy.test import helper + + +class WSGIGraftTests(helper.CPWebCase): + @staticmethod + def setup_server(): + import os + curdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) - def __init__(self, appresults): - self.appresults = appresults - self.iter = iter(appresults) + import cherrypy - def __iter__(self): - return self + def test_app(environ, start_response): + status = '200 OK' + response_headers = [('Content-type', 'text/plain')] + start_response(status, response_headers) + output = ['Hello, world!\n', + 'This is a wsgi app running within CherryPy!\n\n'] + keys = list(environ.keys()) + keys.sort() + for k in keys: + output.append('%s: %s\n' % (k,environ[k])) + return output - def next(self): - return self.iter.next() + def test_empty_string_app(environ, start_response): + status = '200 OK' + response_headers = [('Content-type', 'text/plain')] + start_response(status, response_headers) + return ['Hello', '', ' ', '', 'world'] - def close(self): - if hasattr(self.appresults, "close"): - self.appresults.close() - - - class ReversingMiddleware(object): - def __init__(self, app): - self.app = app + class WSGIResponse(object): + + def __init__(self, appresults): + self.appresults = appresults + self.iter = iter(appresults) + + def __iter__(self): + return self + + def next(self): + return self.iter.next() + + def close(self): + if hasattr(self.appresults, "close"): + self.appresults.close() - def __call__(self, environ, start_response): - results = app(environ, start_response) - class Reverser(WSGIResponse): - def next(this): - line = list(this.iter.next()) - line.reverse() - return "".join(line) - return Reverser(results) - - class Root: - def index(self): - return "I'm a regular CherryPy page handler!" - index.exposed = True - - - cherrypy.tree.mount(Root()) - - cherrypy.tree.graft(test_app, '/hosted/app1') - cherrypy.tree.graft(test_empty_string_app, '/hosted/app3') - - # Set script_name explicitly to None to signal CP that it should - # be pulled from the WSGI environ each time. - app = cherrypy.Application(Root(), script_name=None) - cherrypy.tree.graft(ReversingMiddleware(app), '/hosted/app2') - -from cherrypy.test import helper - + + class ReversingMiddleware(object): + + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + results = app(environ, start_response) + class Reverser(WSGIResponse): + def next(this): + line = list(this.iter.next()) + line.reverse() + return "".join(line) + return Reverser(results) + + class Root: + def index(self): + return "I'm a regular CherryPy page handler!" + index.exposed = True + + + cherrypy.tree.mount(Root()) + + cherrypy.tree.graft(test_app, '/hosted/app1') + cherrypy.tree.graft(test_empty_string_app, '/hosted/app3') + + # Set script_name explicitly to None to signal CP that it should + # be pulled from the WSGI environ each time. + app = cherrypy.Application(Root(), script_name=None) + cherrypy.tree.graft(ReversingMiddleware(app), '/hosted/app2') -class WSGIGraftTests(helper.CPWebCase): - wsgi_output = '''Hello, world! This is a wsgi app running within CherryPy!''' diff --git a/cherrypy/test/test_xmlrpc.py b/cherrypy/test/test_xmlrpc.py index 85e5dda5..27240df4 100644 --- a/cherrypy/test/test_xmlrpc.py +++ b/cherrypy/test/test_xmlrpc.py @@ -1,5 +1,5 @@ from cherrypy.test import test -test.prefer_parent_path() + import xmlrpclib import cherrypy @@ -73,6 +73,7 @@ def setup_server(): class HTTPSTransport(xmlrpclib.SafeTransport): + setup_server = staticmethod(setup_server) """Subclass of SafeTransport to fix sock.recv errors (by using file).""" def request(self, host, handler, request_body, verbose=0): @@ -106,6 +107,7 @@ class HTTPSTransport(xmlrpclib.SafeTransport): from cherrypy.test import helper class XmlRpcTest(helper.CPWebCase): + setup_server = staticmethod(setup_server) def testXmlRpc(self): # load the appropriate xmlrpc proxy diff --git a/cherrypy/test/webtest.py b/cherrypy/test/webtest.py index c83c41fd..dfbc8352 100644 --- a/cherrypy/test/webtest.py +++ b/cherrypy/test/webtest.py @@ -33,7 +33,7 @@ from unittest import _TextTestResult def interface(host): """Return an IP address for a client connection given the server host. - + If the server is listening on '0.0.0.0' (INADDR_ANY) or '::' (IN6ADDR_ANY), this will return the proper localhost.""" if host == '0.0.0.0': @@ -46,7 +46,7 @@ def interface(host): class TerseTestResult(_TextTestResult): - + def printErrors(self): # Overridden to avoid unnecessary empty line if self.errors or self.failures: @@ -58,10 +58,10 @@ class TerseTestResult(_TextTestResult): class TerseTestRunner(TextTestRunner): """A test runner class that displays results in textual form.""" - + def _makeResult(self): return TerseTestResult(self.stream, self.descriptions, self.verbosity) - + def run(self, test): "Run the given test case or test suite." # Overridden to remove unnecessary empty lines and separators @@ -81,7 +81,7 @@ class TerseTestRunner(TextTestRunner): class ReloadingTestLoader(TestLoader): - + def loadTestsFromName(self, name, module=None): """Return a suite of all tests cases given a string specifier. @@ -118,7 +118,7 @@ class ReloadingTestLoader(TestLoader): obj = module for part in parts: obj = getattr(obj, part) - + if type(obj) == types.ModuleType: return self.loadTestsFromModule(obj) elif (isinstance(obj, (type, types.ClassType)) and @@ -161,15 +161,15 @@ class WebCase(TestCase): PORT = 8000 HTTP_CONN = HTTPConnection PROTOCOL = "HTTP/1.1" - + scheme = "http" url = None - + status = None headers = None body = None time = None - + def get_conn(self, auto_open=False): """Return a connection to our HTTP server.""" if self.scheme == "https": @@ -181,14 +181,14 @@ class WebCase(TestCase): conn.auto_open = auto_open conn.connect() return conn - + def set_persistent(self, on=True, auto_open=False): """Make our HTTP_CONN persistent (or not). - + If the 'on' argument is True (the default), then self.HTTP_CONN will be set to an instance of HTTPConnection (or HTTPS if self.scheme is "https"). This will then persist across requests. - + We only allow for a single open connection, so if you call this and we currently have an open connection, it will be closed. """ @@ -196,7 +196,7 @@ class WebCase(TestCase): self.HTTP_CONN.close() except (TypeError, AttributeError): pass - + if on: self.HTTP_CONN = self.get_conn(auto_open=auto_open) else: @@ -204,24 +204,24 @@ class WebCase(TestCase): self.HTTP_CONN = HTTPSConnection else: self.HTTP_CONN = HTTPConnection - + def _get_persistent(self): return hasattr(self.HTTP_CONN, "__class__") def _set_persistent(self, on): self.set_persistent(on) persistent = property(_get_persistent, _set_persistent) - + def interface(self): """Return an IP address for a client connection. - + If the server is listening on '0.0.0.0' (INADDR_ANY) or '::' (IN6ADDR_ANY), this will return the proper localhost.""" return interface(self.HOST) - + def getPage(self, url, headers=None, method="GET", body=None, protocol=None): """Open the url with debugging support. Return status, headers, body.""" ServerError.on = False - + self.url = url self.time = None start = time.time() @@ -229,26 +229,26 @@ class WebCase(TestCase): self.HTTP_CONN, protocol or self.PROTOCOL) self.time = time.time() - start self.status, self.headers, self.body = result - + # Build a list of request cookies from the previous response cookies. self.cookies = [('Cookie', v) for k, v in self.headers if k.lower() == 'set-cookie'] - + if ServerError.on: raise ServerError() return result - + interactive = True console_height = 30 - + def _handlewebError(self, msg): import cherrypy print("") print(" ERROR: %s" % msg) - + if not self.interactive: raise self.failureException(msg) - + p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> " print p, # ARGH! @@ -284,85 +284,11 @@ class WebCase(TestCase): self.exit() print p, # ARGH - sys.stdout.flush() + sys.stdout.flush() + def exit(self): sys.exit() - - if sys.version_info >= (2, 5): - def __call__(self, result=None): - if result is None: - result = self.defaultTestResult() - result.startTest(self) - testMethod = getattr(self, self._testMethodName) - try: - try: - self.setUp() - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(self, self._exc_info()) - return - - ok = 0 - try: - testMethod() - ok = 1 - except self.failureException: - result.addFailure(self, self._exc_info()) - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(self, self._exc_info()) - - try: - self.tearDown() - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(self, self._exc_info()) - ok = 0 - if ok: - result.addSuccess(self) - finally: - result.stopTest(self) - else: - def __call__(self, result=None): - if result is None: - result = self.defaultTestResult() - result.startTest(self) - testMethod = getattr(self, self._TestCase__testMethodName) - try: - try: - self.setUp() - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(self, self._TestCase__exc_info()) - return - - ok = 0 - try: - testMethod() - ok = 1 - except self.failureException: - result.addFailure(self, self._TestCase__exc_info()) - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(self, self._TestCase__exc_info()) - - try: - self.tearDown() - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(self, self._TestCase__exc_info()) - ok = 0 - if ok: - result.addSuccess(self) - finally: - result.stopTest(self) - + def assertStatus(self, status, msg=None): """Fail if self.status != status.""" if isinstance(status, basestring): @@ -391,7 +317,7 @@ class WebCase(TestCase): if msg is None: msg = 'Status (%r) not in %r' % (self.status, status) self._handlewebError(msg) - + def assertHeader(self, key, value=None, msg=None): """Fail if (key, [value]) not in self.headers.""" lowkey = key.lower() @@ -399,21 +325,21 @@ class WebCase(TestCase): if k.lower() == lowkey: if value is None or str(value) == v: return v - + if msg is None: if value is None: msg = '%r not in headers' % key else: msg = '%r:%r not in headers' % (key, value) self._handlewebError(msg) - + def assertHeaderItemValue(self, key, value, msg=None): """Fail if the header does not contain the specified value""" actual_value = self.assertHeader(key, msg=msg) header_values = map(str.strip, actual_value.split(',')) if value in header_values: return value - + if msg is None: msg = "%r not in %r" % (value, header_values) self._handlewebError(msg) @@ -426,28 +352,28 @@ class WebCase(TestCase): if msg is None: msg = '%r in headers' % key self._handlewebError(msg) - + def assertBody(self, value, msg=None): """Fail if value != self.body.""" if value != self.body: if msg is None: msg = 'expected body:\n%r\n\nactual body:\n%r' % (value, self.body) self._handlewebError(msg) - + def assertInBody(self, value, msg=None): """Fail if value not in self.body.""" if value not in self.body: if msg is None: msg = '%r not in body: %s' % (value, self.body) self._handlewebError(msg) - + def assertNotInBody(self, value, msg=None): """Fail if value in self.body.""" if value in self.body: if msg is None: msg = '%r found in body' % value self._handlewebError(msg) - + def assertMatchesBody(self, pattern, msg=None, flags=0): """Fail if value (a regex pattern) is not in self.body.""" if re.search(pattern, self.body, flags) is None: @@ -462,7 +388,7 @@ def cleanHeaders(headers, method, body, host, port): """Return request headers, with required headers added (if missing).""" if headers is None: headers = [] - + # Add the required Host request header if not present. # [This specifies the host:port of the server, not the client.] found = False @@ -475,7 +401,7 @@ def cleanHeaders(headers, method, body, host, port): headers.append(("Host", host)) else: headers.append(("Host", "%s:%s" % (host, port))) - + if method in methods_with_bodies: # Stick in default type and length headers if not present found = False @@ -486,7 +412,7 @@ def cleanHeaders(headers, method, body, host, port): if not found: headers.append(("Content-Type", "application/x-www-form-urlencoded")) headers.append(("Content-Length", str(len(body or "")))) - + return headers @@ -506,7 +432,7 @@ def shb(response): value = value.strip() if key and value: h.append((key, value)) - + return "%s %s" % (response.status, response.reason), h, response.read() @@ -514,9 +440,9 @@ def openURL(url, headers=None, method="GET", body=None, host="127.0.0.1", port=8000, http_conn=HTTPConnection, protocol="HTTP/1.1"): """Open the given HTTP resource and return status, headers, and body.""" - + headers = cleanHeaders(headers, method, body, host, port) - + # Trying 10 times is simply in case of socket errors. # Normal case--it should run once. for trial in range(10): @@ -529,7 +455,7 @@ def openURL(url, headers=None, method="GET", body=None, conn._http_vsn_str = protocol conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) - + # skip_accept_encoding argument added in python version 2.4 if sys.version_info < (2, 4): def putheader(self, header, value): @@ -542,23 +468,23 @@ def openURL(url, headers=None, method="GET", body=None, else: conn.putrequest(method.upper(), url, skip_host=True, skip_accept_encoding=True) - + for key, value in headers: conn.putheader(key, value) conn.endheaders() - + if body is not None: conn.send(body) - + # Handle response response = conn.getresponse() - + s, h, b = shb(response) - + if not hasattr(http_conn, "host"): # We made our own conn instance. Close it. conn.close() - + return s, h, b except socket.error: time.sleep(0.5) @@ -580,13 +506,13 @@ class ServerError(Exception): def server_error(exc=None): """Server debug hook. Return True if exception handled, False if ignored. - + You probably want to wrap this, so you can still handle an error using your framework when it's ignored. """ - if exc is None: + if exc is None: exc = sys.exc_info() - + if ignore_all or exc[0] in ignored_exceptions: return False else: |