# -*- coding: utf-8 -*- """ pluginbase ~~~~~~~~~~ Pluginbase is a module for Python that provides a system for building plugin based applications. :copyright: (c) Copyright 2014 by Armin Ronacher. :license: BSD, see LICENSE for more details. """ import os import sys import uuid import errno import pkgutil import hashlib import threading from types import ModuleType from weakref import ref as weakref PY2 = sys.version_info[0] == 2 if PY2: text_type = unicode string_types = (unicode, str) from cStringIO import StringIO as NativeBytesIO else: text_type = str string_types = (str,) from io import BytesIO as NativeBytesIO __version__ = '0.4b0' _local = threading.local() _internalspace = ModuleType(__name__ + '._internalspace') _internalspace.__path__ = [] sys.modules[_internalspace.__name__] = _internalspace def get_plugin_source(module=None, stacklevel=None): """Returns the :class:`PluginSource` for the current module or the given module. The module can be provided by name (in which case an import will be attempted) or as a module object. If no plugin source can be discovered, the return value from this method is `None`. This function can be very useful if additional data has been attached to the plugin source. For instance this could allow plugins to get access to a back reference to the application that created them. :param module: optionally the module to locate the plugin source of. :param stacklevel: defines how many levels up the module should search for before it discovers the plugin frame. The default is 0. This can be useful for writing wrappers around this function. """ if module is None: frm = sys._getframe((stacklevel or 0) + 1) name = frm.f_globals['__name__'] glob = frm.f_globals elif isinstance(module, string_types): frm = sys._getframe(1) name = module glob = __import__(module, frm.f_globals, frm.f_locals, ['__dict__']).__dict__ else: name = module.__name__ glob = module.__dict__ return _discover_space(name, glob) def _discover_space(name, globals): try: return _local.space_stack[-1] except (AttributeError, IndexError): pass if '__pluginbase_state__' in globals: return globals['__pluginbase_state__'].source mod_name = globals.get('__name__') if mod_name is not None and \ mod_name.startswith(_internalspace.__name__ + '.'): end = mod_name.find('.', len(_internalspace.__name__) + 1) space = sys.modules.get(mod_name[:end]) if space is not None: return space.__pluginbase_state__.source def _shutdown_module(mod): members = list(mod.__dict__.items()) for key, value in members: if key[:1] != '_': setattr(mod, key, None) for key, value in members: setattr(mod, key, None) def _to_bytes(s): if isinstance(s, text_type): return s.encode('utf-8') return s class _IntentionallyEmptyModule(ModuleType): def __getattr__(self, name): try: return ModuleType.__getattr__(self, name) except AttributeError: if name[:2] == '__': raise raise RuntimeError( 'Attempted to import from a plugin base module (%s) without ' 'having a plugin source activated. To solve this error ' 'you have to move the import into a "with" block of the ' 'associated plugin source.' % self.__name__) class _PluginSourceModule(ModuleType): def __init__(self, source): modname = '%s.%s' % (_internalspace.__name__, source.spaceid) ModuleType.__init__(self, modname) self.__pluginbase_state__ = PluginBaseState(source) @property def __path__(self): try: ps = self.__pluginbase_state__.source except AttributeError: return [] return ps.searchpath + ps.base.searchpath def _setup_base_package(module_name): try: mod = __import__(module_name, None, None, ['__name__']) except ImportError: mod = None if '.' in module_name: parent_mod = __import__(module_name.rsplit('.', 1)[0], None, None, ['__name__']) else: parent_mod = None if mod is None: mod = _IntentionallyEmptyModule(module_name) if parent_mod is not None: setattr(parent_mod, module_name.rsplit('.', 1)[-1], mod) sys.modules[module_name] = mod class PluginBase(object): """The plugin base acts as a control object around a dummy Python package that acts as a container for plugins. Usually each application creates exactly one base object for all plugins. :param package: the name of the package that acts as the plugin base. Usually this module does not exist. Unless you know what you are doing you should not create this module on the file system. :param searchpath: optionally a shared search path for modules that will be used by all plugin sources registered. """ def __init__(self, package, searchpath=None): #: the name of the dummy package. self.package = package if searchpath is None: searchpath = [] #: the default search path shared by all plugins as list. self.searchpath = searchpath _setup_base_package(package) def make_plugin_source(self, *args, **kwargs): """Creats a plugin source for this plugin base and returns it. All parameters are forwarded to :class:`PluginSource`. """ return PluginSource(self, *args, **kwargs) class PluginSource(object): """The plugin source is what ultimately decides where plugins are loaded from. Plugin bases can have multiple plugin sources which act as isolation layer. While this is not a security system it generally is not possible for plugins from different sources to accidentally cross talk. Once a plugin source has been created it can be used in a ``with`` statement to change the behavior of the ``import`` statement in the block to define which source to load the plugins from:: plugin_source = plugin_base.make_plugin_source( searchpath=['./path/to/plugins', './path/to/more/plugins']) with plugin_source: from myapplication.plugins import my_plugin :param base: the base this plugin source belongs to. :param identifier: optionally a stable identifier. If it's not defined a random identifier is picked. It's useful to set this to a stable value to have consistent tracebacks between restarts and to support pickle. :param searchpath: a list of paths where plugins are looked for. :param persist: optionally this can be set to `True` and the plugins will not be cleaned up when the plugin source gets garbage collected. """ # Set these here to false by default so that a completely failing # constructor does not fuck up the destructor. persist = False mod = None def __init__(self, base, identifier=None, searchpath=None, persist=False): #: indicates if this plugin source persists or not. self.persist = persist if identifier is None: identifier = str(uuid.uuid4()) #: the identifier for this source. self.identifier = identifier #: A reference to the plugin base that created this source. self.base = base #: a list of paths where plugins are searched in. self.searchpath = searchpath #: The internal module name of the plugin source as it appears #: in the :mod:`pluginsource._internalspace`. self.spaceid = '_sp' + hashlib.md5( _to_bytes(self.base.package) + b'|' + _to_bytes(identifier), ).hexdigest() #: a reference to the module on the internal #: :mod:`pluginsource._internalspace`. self.mod = _PluginSourceModule(self) if hasattr(_internalspace, self.spaceid): raise RuntimeError('This plugin source already exists.') sys.modules[self.mod.__name__] = self.mod setattr(_internalspace, self.spaceid, self.mod) def __del__(self): if not self.persist: self.cleanup() def list_plugins(self): """Returns a sorted list of all plugins that are available in this plugin source. This can be useful to automatically discover plugins that are available and is usually used together with :meth:`load_plugin`. """ rv = [] for _, modname, ispkg in pkgutil.iter_modules(self.mod.__path__): rv.append(modname) return sorted(rv) def load_plugin(self, name): """This automatically loads a plugin by the given name from the current source and returns the module. This is a convenient alternative to the import statement and saves you from invoking ``__import__`` or a similar function yourself. :param name: the name of the plugin to load. """ if '.' in name: raise ImportError('Plugin names cannot contain dots.') with self: return __import__(self.base.package + '.' + name, globals(), {}, ['__name__']) def open_resource(self, plugin, filename): """This function locates a resource inside the plugin and returns a byte stream to the contents of it. If the resource cannot be loaded an :exc:`IOError` will be raised. Only plugins that are real Python packages can contain resources. Plain old Python modules do not allow this for obvious reasons. .. versionadded:: 0.3 :param plugin: the name of the plugin to open the resource of. :param filename: the name of the file within the plugin to open. """ mod = self.load_plugin(plugin) fn = getattr(mod, '__file__', None) if fn is not None: if fn.endswith(('.pyc', '.pyo')): fn = fn[:-1] if os.path.isfile(fn): return open(os.path.join(os.path.dirname(fn), filename), 'rb') buf = pkgutil.get_data(self.mod.__name__ + '.' + plugin, filename) if buf is None: raise IOError(errno.ENOENT, 'Could not find resource') return NativeBytesIO(buf) def cleanup(self): """Cleans up all loaded plugins manually. This is necessary to call only if :attr:`persist` is enabled. Otherwise this happens automatically when the source gets garbage collected. """ self.__cleanup() def __cleanup(self, _sys=sys, _shutdown_module=_shutdown_module): # The default parameters are necessary because this can be fired # from the destructor and so late when the interpreter shuts down # that these functions and modules might be gone. if self.mod is None: return modname = self.mod.__name__ self.mod.__pluginbase_state__ = None self.mod = None try: delattr(_internalspace, self.spaceid) except AttributeError: pass prefix = modname + '.' # avoid the bug described in issue #6 if modname in _sys.modules: del _sys.modules[modname] for key, value in list(_sys.modules.items()): if not key.startswith(prefix): continue mod = _sys.modules.pop(key, None) if mod is None: continue _shutdown_module(mod) def __assert_not_cleaned_up(self): if self.mod is None: raise RuntimeError('The plugin source was already cleaned up.') def __enter__(self): self.__assert_not_cleaned_up() _local.__dict__.setdefault('space_stack', []).append(self) return self def __exit__(self, exc_type, exc_value, tb): try: _local.space_stack.pop() except (AttributeError, IndexError): pass def _rewrite_module_path(self, modname): self.__assert_not_cleaned_up() if modname == self.base.package: return self.mod.__name__ elif modname.startswith(self.base.package + '.'): pieces = modname.split('.') return self.mod.__name__ + '.' + '.'.join( pieces[self.base.package.count('.') + 1:]) class PluginBaseState(object): __slots__ = ('_source',) def __init__(self, source): if source.persist: self._source = lambda: source else: self._source = weakref(source) @property def source(self): rv = self._source() if rv is None: raise AttributeError('Plugin source went away') return rv class _ImportHook(ModuleType): def __init__(self, name, system_import): ModuleType.__init__(self, name) self._system_import = system_import self.enabled = True def enable(self): """Enables the import hook which drives the plugin base system. This is the default. """ self.enabled = True def disable(self): """Disables the import hook and restores the default import system behavior. This effectively breaks pluginbase but can be useful for testing purposes. """ self.enabled = False def plugin_import(self, name, globals=None, locals=None, fromlist=None, level=None): if level is None: # set the level to the default value specific to this python version level = -1 if PY2 else 0 import_name = name if self.enabled: ref_globals = globals if ref_globals is None: ref_globals = sys._getframe(1).f_globals space = _discover_space(name, ref_globals) if space is not None: actual_name = space._rewrite_module_path(name) if actual_name is not None: import_name = actual_name return self._system_import(import_name, globals, locals, fromlist, level) try: import __builtin__ as builtins except ImportError: import builtins import_hook = _ImportHook(__name__ + '.import_hook', builtins.__import__) builtins.__import__ = import_hook.plugin_import sys.modules[import_hook.__name__] = import_hook del builtins