diff options
Diffstat (limited to 'pecan/commands')
-rw-r--r-- | pecan/commands/__init__.py | 4 | ||||
-rw-r--r-- | pecan/commands/base.py | 166 | ||||
-rw-r--r-- | pecan/commands/create.py | 60 | ||||
-rw-r--r-- | pecan/commands/serve.py | 223 | ||||
-rw-r--r-- | pecan/commands/shell.py | 177 |
5 files changed, 0 insertions, 630 deletions
diff --git a/pecan/commands/__init__.py b/pecan/commands/__init__.py deleted file mode 100644 index da02464..0000000 --- a/pecan/commands/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .base import CommandRunner, BaseCommand # noqa -from .serve import ServeCommand # noqa -from .shell import ShellCommand # noqa -from .create import CreateCommand # noqa diff --git a/pecan/commands/base.py b/pecan/commands/base.py deleted file mode 100644 index 441d577..0000000 --- a/pecan/commands/base.py +++ /dev/null @@ -1,166 +0,0 @@ -import pkg_resources -import argparse -import logging -import sys -from warnings import warn - -import six - -log = logging.getLogger(__name__) - - -class HelpfulArgumentParser(argparse.ArgumentParser): - - def error(self, message): # pragma: nocover - """error(message: string) - - Prints a usage message incorporating the message to stderr and - exits. - - If you override this in a subclass, it should not return -- it - should either exit or raise an exception. - """ - self.print_help(sys.stderr) - self._print_message('\n') - self.exit(2, '%s: %s\n' % (self.prog, message)) - - -class CommandManager(object): - """ Used to discover `pecan.command` entry points. """ - - def __init__(self): - self.commands = {} - self.load_commands() - - def load_commands(self): - for ep in pkg_resources.iter_entry_points('pecan.command'): - log.debug('%s loading plugin %s', self.__class__.__name__, ep) - if ep.name in self.commands: - warn( - "Duplicate entry points found on `%s` - ignoring %s" % ( - ep.name, - ep - ), - RuntimeWarning - ) - continue - try: - cmd = ep.load() - cmd.run # ensure existance; catch AttributeError otherwise - except Exception as e: # pragma: nocover - warn("Unable to load plugin %s: %s" % (ep, e), RuntimeWarning) - continue - self.add({ep.name: cmd}) - - def add(self, cmd): - self.commands.update(cmd) - - -class CommandRunner(object): - """ Dispatches `pecan` command execution requests. """ - - def __init__(self): - self.manager = CommandManager() - self.parser = HelpfulArgumentParser(add_help=True) - self.parser.add_argument( - '--version', - action='version', - version='Pecan %s' % self.version - ) - self.parse_sub_commands() - - def parse_sub_commands(self): - subparsers = self.parser.add_subparsers( - dest='command_name', - metavar='command' - ) - for name, cmd in self.commands.items(): - sub = subparsers.add_parser( - name, - help=cmd.summary - ) - for arg in getattr(cmd, 'arguments', tuple()): - arg = arg.copy() - if isinstance(arg.get('name'), six.string_types): - sub.add_argument(arg.pop('name'), **arg) - elif isinstance(arg.get('name'), list): - sub.add_argument(*arg.pop('name'), **arg) - - def run(self, args): - ns = self.parser.parse_args(args) - self.commands[ns.command_name]().run(ns) - - @classmethod - def handle_command_line(cls): # pragma: nocover - runner = CommandRunner() - runner.run(sys.argv[1:]) - - @property - def version(self): - return pkg_resources.get_distribution('pecan').version - - @property - def commands(self): - return self.manager.commands - - -class BaseCommandMeta(type): - - @property - def summary(cls): - """ - This is used to populate the --help argument on the command line. - - This provides a default behavior which takes the first sentence of the - command's docstring and uses it. - """ - return cls.__doc__.strip().splitlines()[0].rstrip('.') - - -class BaseCommandParent(object): - """ - A base interface for Pecan commands. - - Can be extended to support ``pecan`` command extensions in individual Pecan - projects, e.g., - - $ ``pecan my-custom-command config.py`` - - :: - - # myapp/myapp/custom_command.py - class CustomCommand(pecan.commands.base.BaseCommand): - ''' - (First) line of the docstring is used to summarize the command. - ''' - - arguments = ({ - 'name': '--extra_arg', - 'help': 'an extra command line argument', - 'optional': True - }) - - def run(self, args): - super(SomeCommand, self).run(args) - if args.extra_arg: - pass - """ - - arguments = ({ - 'name': 'config_file', - 'help': 'a Pecan configuration file', - 'nargs': '?', - 'default': None, - },) - - def run(self, args): - """To be implemented by subclasses.""" - self.args = args - - def load_app(self): - from pecan import load_app - return load_app(self.args.config_file) - -BaseCommand = BaseCommandMeta('BaseCommand', (BaseCommandParent,), { - '__doc__': BaseCommandParent.__doc__ -}) diff --git a/pecan/commands/create.py b/pecan/commands/create.py deleted file mode 100644 index 1187489..0000000 --- a/pecan/commands/create.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -Create command for Pecan -""" -import pkg_resources -import logging -from warnings import warn -from pecan.commands import BaseCommand -from pecan.scaffolds import DEFAULT_SCAFFOLD - -log = logging.getLogger(__name__) - - -class ScaffoldManager(object): - """ Used to discover `pecan.scaffold` entry points. """ - - def __init__(self): - self.scaffolds = {} - self.load_scaffolds() - - def load_scaffolds(self): - for ep in pkg_resources.iter_entry_points('pecan.scaffold'): - log.debug('%s loading scaffold %s', self.__class__.__name__, ep) - try: - cmd = ep.load() - cmd.copy_to # ensure existance; catch AttributeError otherwise - except Exception as e: # pragma: nocover - warn( - "Unable to load scaffold %s: %s" % (ep, e), RuntimeWarning - ) - continue - self.add({ep.name: cmd}) - - def add(self, cmd): - self.scaffolds.update(cmd) - - -class CreateCommand(BaseCommand): - """ - Creates the file layout for a new Pecan scaffolded project. - """ - - manager = ScaffoldManager() - - arguments = ({ - 'name': 'project_name', - 'help': 'the (package) name of the new project' - }, { - 'name': 'template_name', - 'metavar': 'template_name', - 'help': 'a registered Pecan template', - 'nargs': '?', - 'default': DEFAULT_SCAFFOLD, - 'choices': manager.scaffolds.keys() - }) - - def run(self, args): - super(CreateCommand, self).run(args) - self.manager.scaffolds[args.template_name]().copy_to( - args.project_name - ) diff --git a/pecan/commands/serve.py b/pecan/commands/serve.py deleted file mode 100644 index 72a536d..0000000 --- a/pecan/commands/serve.py +++ /dev/null @@ -1,223 +0,0 @@ -""" -Serve command for Pecan. -""" -from __future__ import print_function -import logging -import os -import sys -import time -import subprocess -from wsgiref.simple_server import WSGIRequestHandler - - -from pecan.commands import BaseCommand -from pecan import util - - -logger = logging.getLogger(__name__) - - -class ServeCommand(BaseCommand): - """ - Serves a Pecan web application. - - This command serves a Pecan web application using the provided - configuration file for the server and application. - """ - - arguments = BaseCommand.arguments + ({ - 'name': '--reload', - 'help': 'Watch for changes and automatically reload.', - 'default': False, - 'action': 'store_true' - },) - - def run(self, args): - super(ServeCommand, self).run(args) - app = self.load_app() - self.serve(app, app.config) - - def create_subprocess(self): - self.server_process = subprocess.Popen( - [arg for arg in sys.argv if arg != '--reload'], - stdout=sys.stdout, stderr=sys.stderr - ) - - def watch_and_spawn(self, conf): - from watchdog.observers import Observer - from watchdog.events import ( - FileSystemEventHandler, FileSystemMovedEvent, FileModifiedEvent, - DirModifiedEvent - ) - - print('Monitoring for changes...') - self.create_subprocess() - - parent = self - - class AggressiveEventHandler(FileSystemEventHandler): - def should_reload(self, event): - for t in ( - FileSystemMovedEvent, FileModifiedEvent, DirModifiedEvent - ): - if isinstance(event, t): - return True - return False - - def on_modified(self, event): - if self.should_reload(event): - parent.server_process.kill() - parent.create_subprocess() - - # Determine a list of file paths to monitor - paths = self.paths_to_monitor(conf) - - event_handler = AggressiveEventHandler() - for path, recurse in paths: - observer = Observer() - observer.schedule( - event_handler, - path=path, - recursive=recurse - ) - observer.start() - - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - pass - - def paths_to_monitor(self, conf): - paths = [] - - for package_name in getattr(conf.app, 'modules', []): - module = __import__(package_name, fromlist=['app']) - if hasattr(module, 'app') and hasattr(module.app, 'setup_app'): - paths.append(( - os.path.dirname(module.__file__), - True - )) - break - - paths.append((os.path.dirname(conf.__file__), False)) - return paths - - def _serve(self, app, conf): - from wsgiref.simple_server import make_server - - host, port = conf.server.host, int(conf.server.port) - srv = make_server( - host, - port, - app, - handler_class=PecanWSGIRequestHandler, - ) - - print('Starting server in PID %s' % os.getpid()) - - if host == '0.0.0.0': - print( - 'serving on 0.0.0.0:%s, view at http://127.0.0.1:%s' % - (port, port) - ) - else: - print("serving on http://%s:%s" % (host, port)) - - try: - srv.serve_forever() - except KeyboardInterrupt: - # allow CTRL+C to shutdown - pass - - def serve(self, app, conf): - """ - A very simple approach for a WSGI server. - """ - - if self.args.reload: - try: - self.watch_and_spawn(conf) - except ImportError: - print('The `--reload` option requires `watchdog` to be ' - 'installed.') - print(' $ pip install watchdog') - else: - self._serve(app, conf) - - -def gunicorn_run(): - """ - The ``gunicorn_pecan`` command for launching ``pecan`` applications - """ - try: - from gunicorn.app.wsgiapp import WSGIApplication - except ImportError as exc: - args = exc.args - arg0 = args[0] if args else '' - arg0 += ' (are you sure `gunicorn` is installed?)' - exc.args = (arg0,) + args[1:] - raise - - class PecanApplication(WSGIApplication): - - def init(self, parser, opts, args): - if len(args) != 1: - parser.error("No configuration file was specified.") - - self.cfgfname = os.path.normpath( - os.path.join(os.getcwd(), args[0]) - ) - self.cfgfname = os.path.abspath(self.cfgfname) - if not os.path.exists(self.cfgfname): - parser.error("Config file not found: %s" % self.cfgfname) - - from pecan.configuration import _runtime_conf, set_config - set_config(self.cfgfname, overwrite=True) - - # If available, use the host and port from the pecan config file - cfg = {} - if _runtime_conf.get('server'): - server = _runtime_conf['server'] - if hasattr(server, 'host') and hasattr(server, 'port'): - cfg['bind'] = '%s:%s' % ( - server.host, server.port - ) - return cfg - - def load(self): - from pecan.deploy import deploy - return deploy(self.cfgfname) - - PecanApplication("%(prog)s [OPTIONS] config.py").run() - - -class PecanWSGIRequestHandler(WSGIRequestHandler, object): - """ - A wsgiref request handler class that allows actual log output depending on - the application configuration. - """ - - def __init__(self, *args, **kwargs): - # We set self.path to avoid crashes in log_message() on unsupported - # requests (like "OPTIONS"). - self.path = '' - super(PecanWSGIRequestHandler, self).__init__(*args, **kwargs) - - def log_message(self, format, *args): - """ - overrides the ``log_message`` method from the wsgiref server so that - normal logging works with whatever configuration the application has - been set to. - - Levels are inferred from the HTTP status code, 4XX codes are treated as - warnings, 5XX as errors and everything else as INFO level. - """ - code = args[1][0] - levels = { - '4': 'warning', - '5': 'error' - } - - log_handler = getattr(logger, levels.get(code, 'info')) - log_handler(format % args) diff --git a/pecan/commands/shell.py b/pecan/commands/shell.py deleted file mode 100644 index 968ded0..0000000 --- a/pecan/commands/shell.py +++ /dev/null @@ -1,177 +0,0 @@ -""" -Shell command for Pecan. -""" -from pecan.commands import BaseCommand -from webtest import TestApp -from warnings import warn -import sys - - -class NativePythonShell(object): - """ - Open an interactive python shell with the Pecan app loaded. - """ - - @classmethod - def invoke(cls, ns, banner): # pragma: nocover - """ - :param ns: local namespace - :param banner: interactive shell startup banner - - Embed an interactive native python shell. - """ - import code - py_prefix = sys.platform.startswith('java') and 'J' or 'P' - shell_banner = 'Pecan Interactive Shell\n%sython %s\n\n' % \ - (py_prefix, sys.version) - shell = code.InteractiveConsole(locals=ns) - try: - import readline # noqa - except ImportError: - pass - shell.interact(shell_banner + banner) - - -class IPythonShell(object): - """ - Open an interactive ipython shell with the Pecan app loaded. - """ - - @classmethod - def invoke(cls, ns, banner): # pragma: nocover - """ - :param ns: local namespace - :param banner: interactive shell startup banner - - Embed an interactive ipython shell. - Try the InteractiveShellEmbed API first, fall back on - IPShellEmbed for older IPython versions. - """ - try: - from IPython.frontend.terminal.embed import ( - InteractiveShellEmbed - ) - # try and load their default profile - from IPython.frontend.terminal.ipapp import ( - load_default_config - ) - config = load_default_config() - shell = InteractiveShellEmbed(config=config, banner2=banner) - shell(local_ns=ns) - except ImportError: - # Support for the IPython <= 0.10 shell API - from IPython.Shell import IPShellEmbed - shell = IPShellEmbed(argv=[]) - shell.set_banner(shell.IP.BANNER + '\n\n' + banner) - shell(local_ns=ns, global_ns={}) - - -class BPythonShell(object): - """ - Open an interactive bpython shell with the Pecan app loaded. - """ - - @classmethod - def invoke(cls, ns, banner): # pragma: nocover - """ - :param ns: local namespace - :param banner: interactive shell startup banner - - Embed an interactive bpython shell. - """ - from bpython import embed - embed(ns, ['-i'], banner) - - -class ShellCommand(BaseCommand): - """ - Open an interactive shell with the Pecan app loaded. - Attempt to invoke the specified python shell flavor - (ipython, bpython, etc.). Fall back on the native - python shell if the requested flavor variance is not - installed. - """ - - SHELLS = { - 'python': NativePythonShell, - 'ipython': IPythonShell, - 'bpython': BPythonShell, - } - - arguments = BaseCommand.arguments + ({ - 'name': ['--shell', '-s'], - 'help': 'which Python shell to use', - 'choices': SHELLS.keys(), - 'default': 'python' - },) - - def run(self, args): - """ - Load the pecan app, prepare the locals, sets the - banner, and invokes the python shell. - """ - super(ShellCommand, self).run(args) - - # load the application - app = self.load_app() - - # prepare the locals - locs = dict(__name__='pecan-admin') - locs['wsgiapp'] = app - locs['app'] = TestApp(app) - - model = self.load_model(app.config) - if model: - locs['model'] = model - - # insert the pecan locals - from pecan import abort, conf, redirect, request, response - locs['abort'] = abort - locs['conf'] = conf - locs['redirect'] = redirect - locs['request'] = request - locs['response'] = response - - # prepare the banner - banner = ' The following objects are available:\n' - banner += ' %-10s - This project\'s WSGI App instance\n' % 'wsgiapp' - banner += ' %-10s - The current configuration\n' % 'conf' - banner += ' %-10s - webtest.TestApp wrapped around wsgiapp\n' % 'app' - if model: - model_name = getattr( - model, - '__module__', - getattr(model, '__name__', 'model') - ) - banner += ' %-10s - Models from %s\n' % ('model', model_name) - - self.invoke_shell(locs, banner) - - def invoke_shell(self, locs, banner): - """ - Invokes the appropriate flavor of the python shell. - Falls back on the native python shell if the requested - flavor (ipython, bpython,etc) is not installed. - """ - shell = self.SHELLS[self.args.shell] - try: - shell().invoke(locs, banner) - except ImportError as e: - warn(( - "%s is not installed, `%s`, " - "falling back to native shell") % (self.args.shell, e), - RuntimeWarning - ) - if shell == NativePythonShell: - raise - NativePythonShell().invoke(locs, banner) - - def load_model(self, config): - """ - Load the model extension module - """ - for package_name in getattr(config.app, 'modules', []): - module = __import__(package_name, fromlist=['model']) - if hasattr(module, 'model'): - return module.model - return None |