summaryrefslogtreecommitdiff
path: root/pecan/commands
diff options
context:
space:
mode:
Diffstat (limited to 'pecan/commands')
-rw-r--r--pecan/commands/__init__.py4
-rw-r--r--pecan/commands/base.py166
-rw-r--r--pecan/commands/create.py60
-rw-r--r--pecan/commands/serve.py223
-rw-r--r--pecan/commands/shell.py177
5 files changed, 0 insertions, 630 deletions
diff --git a/pecan/commands/__init__.py b/pecan/commands/__init__.py
deleted file mode 100644
index da02464..0000000
--- a/pecan/commands/__init__.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from .base import CommandRunner, BaseCommand # noqa
-from .serve import ServeCommand # noqa
-from .shell import ShellCommand # noqa
-from .create import CreateCommand # noqa
diff --git a/pecan/commands/base.py b/pecan/commands/base.py
deleted file mode 100644
index 441d577..0000000
--- a/pecan/commands/base.py
+++ /dev/null
@@ -1,166 +0,0 @@
-import pkg_resources
-import argparse
-import logging
-import sys
-from warnings import warn
-
-import six
-
-log = logging.getLogger(__name__)
-
-
-class HelpfulArgumentParser(argparse.ArgumentParser):
-
- def error(self, message): # pragma: nocover
- """error(message: string)
-
- Prints a usage message incorporating the message to stderr and
- exits.
-
- If you override this in a subclass, it should not return -- it
- should either exit or raise an exception.
- """
- self.print_help(sys.stderr)
- self._print_message('\n')
- self.exit(2, '%s: %s\n' % (self.prog, message))
-
-
-class CommandManager(object):
- """ Used to discover `pecan.command` entry points. """
-
- def __init__(self):
- self.commands = {}
- self.load_commands()
-
- def load_commands(self):
- for ep in pkg_resources.iter_entry_points('pecan.command'):
- log.debug('%s loading plugin %s', self.__class__.__name__, ep)
- if ep.name in self.commands:
- warn(
- "Duplicate entry points found on `%s` - ignoring %s" % (
- ep.name,
- ep
- ),
- RuntimeWarning
- )
- continue
- try:
- cmd = ep.load()
- cmd.run # ensure existance; catch AttributeError otherwise
- except Exception as e: # pragma: nocover
- warn("Unable to load plugin %s: %s" % (ep, e), RuntimeWarning)
- continue
- self.add({ep.name: cmd})
-
- def add(self, cmd):
- self.commands.update(cmd)
-
-
-class CommandRunner(object):
- """ Dispatches `pecan` command execution requests. """
-
- def __init__(self):
- self.manager = CommandManager()
- self.parser = HelpfulArgumentParser(add_help=True)
- self.parser.add_argument(
- '--version',
- action='version',
- version='Pecan %s' % self.version
- )
- self.parse_sub_commands()
-
- def parse_sub_commands(self):
- subparsers = self.parser.add_subparsers(
- dest='command_name',
- metavar='command'
- )
- for name, cmd in self.commands.items():
- sub = subparsers.add_parser(
- name,
- help=cmd.summary
- )
- for arg in getattr(cmd, 'arguments', tuple()):
- arg = arg.copy()
- if isinstance(arg.get('name'), six.string_types):
- sub.add_argument(arg.pop('name'), **arg)
- elif isinstance(arg.get('name'), list):
- sub.add_argument(*arg.pop('name'), **arg)
-
- def run(self, args):
- ns = self.parser.parse_args(args)
- self.commands[ns.command_name]().run(ns)
-
- @classmethod
- def handle_command_line(cls): # pragma: nocover
- runner = CommandRunner()
- runner.run(sys.argv[1:])
-
- @property
- def version(self):
- return pkg_resources.get_distribution('pecan').version
-
- @property
- def commands(self):
- return self.manager.commands
-
-
-class BaseCommandMeta(type):
-
- @property
- def summary(cls):
- """
- This is used to populate the --help argument on the command line.
-
- This provides a default behavior which takes the first sentence of the
- command's docstring and uses it.
- """
- return cls.__doc__.strip().splitlines()[0].rstrip('.')
-
-
-class BaseCommandParent(object):
- """
- A base interface for Pecan commands.
-
- Can be extended to support ``pecan`` command extensions in individual Pecan
- projects, e.g.,
-
- $ ``pecan my-custom-command config.py``
-
- ::
-
- # myapp/myapp/custom_command.py
- class CustomCommand(pecan.commands.base.BaseCommand):
- '''
- (First) line of the docstring is used to summarize the command.
- '''
-
- arguments = ({
- 'name': '--extra_arg',
- 'help': 'an extra command line argument',
- 'optional': True
- })
-
- def run(self, args):
- super(SomeCommand, self).run(args)
- if args.extra_arg:
- pass
- """
-
- arguments = ({
- 'name': 'config_file',
- 'help': 'a Pecan configuration file',
- 'nargs': '?',
- 'default': None,
- },)
-
- def run(self, args):
- """To be implemented by subclasses."""
- self.args = args
-
- def load_app(self):
- from pecan import load_app
- return load_app(self.args.config_file)
-
-BaseCommand = BaseCommandMeta('BaseCommand', (BaseCommandParent,), {
- '__doc__': BaseCommandParent.__doc__
-})
diff --git a/pecan/commands/create.py b/pecan/commands/create.py
deleted file mode 100644
index 1187489..0000000
--- a/pecan/commands/create.py
+++ /dev/null
@@ -1,60 +0,0 @@
-"""
-Create command for Pecan
-"""
-import pkg_resources
-import logging
-from warnings import warn
-from pecan.commands import BaseCommand
-from pecan.scaffolds import DEFAULT_SCAFFOLD
-
-log = logging.getLogger(__name__)
-
-
-class ScaffoldManager(object):
- """ Used to discover `pecan.scaffold` entry points. """
-
- def __init__(self):
- self.scaffolds = {}
- self.load_scaffolds()
-
- def load_scaffolds(self):
- for ep in pkg_resources.iter_entry_points('pecan.scaffold'):
- log.debug('%s loading scaffold %s', self.__class__.__name__, ep)
- try:
- cmd = ep.load()
- cmd.copy_to # ensure existance; catch AttributeError otherwise
- except Exception as e: # pragma: nocover
- warn(
- "Unable to load scaffold %s: %s" % (ep, e), RuntimeWarning
- )
- continue
- self.add({ep.name: cmd})
-
- def add(self, cmd):
- self.scaffolds.update(cmd)
-
-
-class CreateCommand(BaseCommand):
- """
- Creates the file layout for a new Pecan scaffolded project.
- """
-
- manager = ScaffoldManager()
-
- arguments = ({
- 'name': 'project_name',
- 'help': 'the (package) name of the new project'
- }, {
- 'name': 'template_name',
- 'metavar': 'template_name',
- 'help': 'a registered Pecan template',
- 'nargs': '?',
- 'default': DEFAULT_SCAFFOLD,
- 'choices': manager.scaffolds.keys()
- })
-
- def run(self, args):
- super(CreateCommand, self).run(args)
- self.manager.scaffolds[args.template_name]().copy_to(
- args.project_name
- )
diff --git a/pecan/commands/serve.py b/pecan/commands/serve.py
deleted file mode 100644
index 72a536d..0000000
--- a/pecan/commands/serve.py
+++ /dev/null
@@ -1,223 +0,0 @@
-"""
-Serve command for Pecan.
-"""
-from __future__ import print_function
-import logging
-import os
-import sys
-import time
-import subprocess
-from wsgiref.simple_server import WSGIRequestHandler
-
-
-from pecan.commands import BaseCommand
-from pecan import util
-
-
-logger = logging.getLogger(__name__)
-
-
-class ServeCommand(BaseCommand):
- """
- Serves a Pecan web application.
-
- This command serves a Pecan web application using the provided
- configuration file for the server and application.
- """
-
- arguments = BaseCommand.arguments + ({
- 'name': '--reload',
- 'help': 'Watch for changes and automatically reload.',
- 'default': False,
- 'action': 'store_true'
- },)
-
- def run(self, args):
- super(ServeCommand, self).run(args)
- app = self.load_app()
- self.serve(app, app.config)
-
- def create_subprocess(self):
- self.server_process = subprocess.Popen(
- [arg for arg in sys.argv if arg != '--reload'],
- stdout=sys.stdout, stderr=sys.stderr
- )
-
- def watch_and_spawn(self, conf):
- from watchdog.observers import Observer
- from watchdog.events import (
- FileSystemEventHandler, FileSystemMovedEvent, FileModifiedEvent,
- DirModifiedEvent
- )
-
- print('Monitoring for changes...')
- self.create_subprocess()
-
- parent = self
-
- class AggressiveEventHandler(FileSystemEventHandler):
- def should_reload(self, event):
- for t in (
- FileSystemMovedEvent, FileModifiedEvent, DirModifiedEvent
- ):
- if isinstance(event, t):
- return True
- return False
-
- def on_modified(self, event):
- if self.should_reload(event):
- parent.server_process.kill()
- parent.create_subprocess()
-
- # Determine a list of file paths to monitor
- paths = self.paths_to_monitor(conf)
-
- event_handler = AggressiveEventHandler()
- for path, recurse in paths:
- observer = Observer()
- observer.schedule(
- event_handler,
- path=path,
- recursive=recurse
- )
- observer.start()
-
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- pass
-
- def paths_to_monitor(self, conf):
- paths = []
-
- for package_name in getattr(conf.app, 'modules', []):
- module = __import__(package_name, fromlist=['app'])
- if hasattr(module, 'app') and hasattr(module.app, 'setup_app'):
- paths.append((
- os.path.dirname(module.__file__),
- True
- ))
- break
-
- paths.append((os.path.dirname(conf.__file__), False))
- return paths
-
- def _serve(self, app, conf):
- from wsgiref.simple_server import make_server
-
- host, port = conf.server.host, int(conf.server.port)
- srv = make_server(
- host,
- port,
- app,
- handler_class=PecanWSGIRequestHandler,
- )
-
- print('Starting server in PID %s' % os.getpid())
-
- if host == '0.0.0.0':
- print(
- 'serving on 0.0.0.0:%s, view at http://127.0.0.1:%s' %
- (port, port)
- )
- else:
- print("serving on http://%s:%s" % (host, port))
-
- try:
- srv.serve_forever()
- except KeyboardInterrupt:
- # allow CTRL+C to shutdown
- pass
-
- def serve(self, app, conf):
- """
- A very simple approach for a WSGI server.
- """
-
- if self.args.reload:
- try:
- self.watch_and_spawn(conf)
- except ImportError:
- print('The `--reload` option requires `watchdog` to be '
- 'installed.')
- print(' $ pip install watchdog')
- else:
- self._serve(app, conf)
-
-
-def gunicorn_run():
- """
- The ``gunicorn_pecan`` command for launching ``pecan`` applications
- """
- try:
- from gunicorn.app.wsgiapp import WSGIApplication
- except ImportError as exc:
- args = exc.args
- arg0 = args[0] if args else ''
- arg0 += ' (are you sure `gunicorn` is installed?)'
- exc.args = (arg0,) + args[1:]
- raise
-
- class PecanApplication(WSGIApplication):
-
- def init(self, parser, opts, args):
- if len(args) != 1:
- parser.error("No configuration file was specified.")
-
- self.cfgfname = os.path.normpath(
- os.path.join(os.getcwd(), args[0])
- )
- self.cfgfname = os.path.abspath(self.cfgfname)
- if not os.path.exists(self.cfgfname):
- parser.error("Config file not found: %s" % self.cfgfname)
-
- from pecan.configuration import _runtime_conf, set_config
- set_config(self.cfgfname, overwrite=True)
-
- # If available, use the host and port from the pecan config file
- cfg = {}
- if _runtime_conf.get('server'):
- server = _runtime_conf['server']
- if hasattr(server, 'host') and hasattr(server, 'port'):
- cfg['bind'] = '%s:%s' % (
- server.host, server.port
- )
- return cfg
-
- def load(self):
- from pecan.deploy import deploy
- return deploy(self.cfgfname)
-
- PecanApplication("%(prog)s [OPTIONS] config.py").run()
-
-
-class PecanWSGIRequestHandler(WSGIRequestHandler, object):
- """
- A wsgiref request handler class that allows actual log output depending on
- the application configuration.
- """
-
- def __init__(self, *args, **kwargs):
- # We set self.path to avoid crashes in log_message() on unsupported
- # requests (like "OPTIONS").
- self.path = ''
- super(PecanWSGIRequestHandler, self).__init__(*args, **kwargs)
-
- def log_message(self, format, *args):
- """
- overrides the ``log_message`` method from the wsgiref server so that
- normal logging works with whatever configuration the application has
- been set to.
-
- Levels are inferred from the HTTP status code, 4XX codes are treated as
- warnings, 5XX as errors and everything else as INFO level.
- """
- code = args[1][0]
- levels = {
- '4': 'warning',
- '5': 'error'
- }
-
- log_handler = getattr(logger, levels.get(code, 'info'))
- log_handler(format % args)
diff --git a/pecan/commands/shell.py b/pecan/commands/shell.py
deleted file mode 100644
index 968ded0..0000000
--- a/pecan/commands/shell.py
+++ /dev/null
@@ -1,177 +0,0 @@
-"""
-Shell command for Pecan.
-"""
-from pecan.commands import BaseCommand
-from webtest import TestApp
-from warnings import warn
-import sys
-
-
-class NativePythonShell(object):
- """
- Open an interactive python shell with the Pecan app loaded.
- """
-
- @classmethod
- def invoke(cls, ns, banner): # pragma: nocover
- """
- :param ns: local namespace
- :param banner: interactive shell startup banner
-
- Embed an interactive native python shell.
- """
- import code
- py_prefix = sys.platform.startswith('java') and 'J' or 'P'
- shell_banner = 'Pecan Interactive Shell\n%sython %s\n\n' % \
- (py_prefix, sys.version)
- shell = code.InteractiveConsole(locals=ns)
- try:
- import readline # noqa
- except ImportError:
- pass
- shell.interact(shell_banner + banner)
-
-
-class IPythonShell(object):
- """
- Open an interactive ipython shell with the Pecan app loaded.
- """
-
- @classmethod
- def invoke(cls, ns, banner): # pragma: nocover
- """
- :param ns: local namespace
- :param banner: interactive shell startup banner
-
- Embed an interactive ipython shell.
- Try the InteractiveShellEmbed API first, fall back on
- IPShellEmbed for older IPython versions.
- """
- try:
- from IPython.frontend.terminal.embed import (
- InteractiveShellEmbed
- )
- # try and load their default profile
- from IPython.frontend.terminal.ipapp import (
- load_default_config
- )
- config = load_default_config()
- shell = InteractiveShellEmbed(config=config, banner2=banner)
- shell(local_ns=ns)
- except ImportError:
- # Support for the IPython <= 0.10 shell API
- from IPython.Shell import IPShellEmbed
- shell = IPShellEmbed(argv=[])
- shell.set_banner(shell.IP.BANNER + '\n\n' + banner)
- shell(local_ns=ns, global_ns={})
-
-
-class BPythonShell(object):
- """
- Open an interactive bpython shell with the Pecan app loaded.
- """
-
- @classmethod
- def invoke(cls, ns, banner): # pragma: nocover
- """
- :param ns: local namespace
- :param banner: interactive shell startup banner
-
- Embed an interactive bpython shell.
- """
- from bpython import embed
- embed(ns, ['-i'], banner)
-
-
-class ShellCommand(BaseCommand):
- """
- Open an interactive shell with the Pecan app loaded.
- Attempt to invoke the specified python shell flavor
- (ipython, bpython, etc.). Fall back on the native
- python shell if the requested flavor variance is not
- installed.
- """
-
- SHELLS = {
- 'python': NativePythonShell,
- 'ipython': IPythonShell,
- 'bpython': BPythonShell,
- }
-
- arguments = BaseCommand.arguments + ({
- 'name': ['--shell', '-s'],
- 'help': 'which Python shell to use',
- 'choices': SHELLS.keys(),
- 'default': 'python'
- },)
-
- def run(self, args):
- """
- Load the pecan app, prepare the locals, sets the
- banner, and invokes the python shell.
- """
- super(ShellCommand, self).run(args)
-
- # load the application
- app = self.load_app()
-
- # prepare the locals
- locs = dict(__name__='pecan-admin')
- locs['wsgiapp'] = app
- locs['app'] = TestApp(app)
-
- model = self.load_model(app.config)
- if model:
- locs['model'] = model
-
- # insert the pecan locals
- from pecan import abort, conf, redirect, request, response
- locs['abort'] = abort
- locs['conf'] = conf
- locs['redirect'] = redirect
- locs['request'] = request
- locs['response'] = response
-
- # prepare the banner
- banner = ' The following objects are available:\n'
- banner += ' %-10s - This project\'s WSGI App instance\n' % 'wsgiapp'
- banner += ' %-10s - The current configuration\n' % 'conf'
- banner += ' %-10s - webtest.TestApp wrapped around wsgiapp\n' % 'app'
- if model:
- model_name = getattr(
- model,
- '__module__',
- getattr(model, '__name__', 'model')
- )
- banner += ' %-10s - Models from %s\n' % ('model', model_name)
-
- self.invoke_shell(locs, banner)
-
- def invoke_shell(self, locs, banner):
- """
- Invokes the appropriate flavor of the python shell.
- Falls back on the native python shell if the requested
- flavor (ipython, bpython,etc) is not installed.
- """
- shell = self.SHELLS[self.args.shell]
- try:
- shell().invoke(locs, banner)
- except ImportError as e:
- warn((
- "%s is not installed, `%s`, "
- "falling back to native shell") % (self.args.shell, e),
- RuntimeWarning
- )
- if shell == NativePythonShell:
- raise
- NativePythonShell().invoke(locs, banner)
-
- def load_model(self, config):
- """
- Load the model extension module
- """
- for package_name in getattr(config.app, 'modules', []):
- module = __import__(package_name, fromlist=['model'])
- if hasattr(module, 'model'):
- return module.model
- return None