From 1173c768b0524c2dcb0a13a18dcf5dc440bcbc4b Mon Sep 17 00:00:00 2001 From: Matthew Peveler Date: Thu, 17 Feb 2022 10:09:43 -0500 Subject: Move Plugin to its own module Signed-off-by: Matthew Peveler --- .gitignore | 3 + asciidoc/asciidoc.py | 211 +------------------- asciidoc/plugin.py | 215 +++++++++++++++++++++ .../plugin/backends/__pycache__/.gitkeep | 0 tests/__test_data__/plugin/backends/bar/.gitkeep | 0 tests/__test_data__/plugin/backends/file | 0 tests/__test_data__/plugin/backends/foo/.gitkeep | 0 tests/__test_data__/plugin/filters/baz/.gitkeep | 0 tests/__test_data__/plugin/filters/qux/.gitkeep | 0 tests/test_plugin.py | 18 ++ tests/utils.py | 17 ++ 11 files changed, 260 insertions(+), 204 deletions(-) create mode 100644 asciidoc/plugin.py create mode 100644 tests/__test_data__/plugin/backends/__pycache__/.gitkeep create mode 100644 tests/__test_data__/plugin/backends/bar/.gitkeep create mode 100644 tests/__test_data__/plugin/backends/file create mode 100644 tests/__test_data__/plugin/backends/foo/.gitkeep create mode 100644 tests/__test_data__/plugin/filters/baz/.gitkeep create mode 100644 tests/__test_data__/plugin/filters/qux/.gitkeep create mode 100644 tests/test_plugin.py create mode 100644 tests/utils.py diff --git a/.gitignore b/.gitignore index 79d42f8..6ae909b 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,9 @@ build/ dist/ *.egg-info/ +# test data +!tests/__test_data__/plugin/backends/__pycache__ + # test artifacts .coverage coverage.xml diff --git a/asciidoc/asciidoc.py b/asciidoc/asciidoc.py index 8550609..ef486fb 100644 --- a/asciidoc/asciidoc.py +++ b/asciidoc/asciidoc.py @@ -22,7 +22,6 @@ import getopt import io import os import re -import shutil import subprocess import sys import tempfile @@ -30,7 +29,6 @@ import time import typing import traceback import unicodedata -import zipfile from collections import OrderedDict @@ -38,6 +36,7 @@ from .blocks.table import parse_table_span_spec, Cell, Column from .collections import AttrDict, InsensitiveDict from .exceptions import EAsciiDoc from .message import Message +from .plugin import Plugin from . import utils CONF_DIR = os.path.join(os.path.dirname(__file__), 'resources') @@ -5522,207 +5521,11 @@ class Tables_OLD(AbstractBlocks): # End of deprecated old table classes. # --------------------------------------------------------------------------- -# --------------------------------------------------------------------------- -# filter and theme plugin commands. -# --------------------------------------------------------------------------- def die(msg: str) -> typing.NoReturn: message.stderr(msg) sys.exit(1) -def extract_zip(zip_file, destdir): - """ - Unzip Zip file to destination directory. - Throws exception if error occurs. - """ - zipo = zipfile.ZipFile(zip_file, 'r') - try: - for zi in zipo.infolist(): - outfile = zi.filename - if not outfile.endswith('/'): - d, outfile = os.path.split(outfile) - directory = os.path.normpath(os.path.join(destdir, d)) - if not os.path.isdir(directory): - os.makedirs(directory) - outfile = os.path.join(directory, outfile) - perms = (zi.external_attr >> 16) & 0o777 - message.verbose('extracting: %s' % outfile) - flags = os.O_CREAT | os.O_WRONLY - if sys.platform == 'win32': - flags |= os.O_BINARY - if perms == 0: - # Zip files created under Windows do not include permissions. - fh = os.open(outfile, flags) - else: - fh = os.open(outfile, flags, perms) - try: - os.write(fh, zipo.read(zi.filename)) - finally: - os.close(fh) - finally: - zipo.close() - - -def create_zip(zip_file, src, skip_hidden=False): - """ - Create Zip file. If src is a directory archive all contained files and - subdirectories, if src is a file archive the src file. - Files and directories names starting with . are skipped - if skip_hidden is True. - Throws exception if error occurs. - """ - zipo = zipfile.ZipFile(zip_file, 'w') - try: - if os.path.isfile(src): - arcname = os.path.basename(src) - message.verbose('archiving: %s' % arcname) - zipo.write(src, arcname, zipfile.ZIP_DEFLATED) - elif os.path.isdir(src): - srcdir = os.path.abspath(src) - if srcdir[-1] != os.path.sep: - srcdir += os.path.sep - for root, dirs, files in os.walk(srcdir): - arcroot = os.path.abspath(root)[len(srcdir):] - if skip_hidden: - for d in dirs[:]: - if d.startswith('.'): - message.verbose('skipping: %s' % os.path.join(arcroot, d)) - del dirs[dirs.index(d)] - for f in files: - filename = os.path.join(root, f) - arcname = os.path.join(arcroot, f) - if skip_hidden and f.startswith('.'): - message.verbose('skipping: %s' % arcname) - continue - message.verbose('archiving: %s' % arcname) - zipo.write(filename, arcname, zipfile.ZIP_DEFLATED) - else: - raise ValueError('src must specify directory or file: %s' % src) - finally: - zipo.close() - - -class Plugin: - """ - --filter and --theme option commands. - """ - CMDS = ('install', 'remove', 'list', 'build') - - type = None # 'backend', 'filter' or 'theme'. - - @staticmethod - def reset_class(): - Plugin.type = None - - @staticmethod - def get_dir(): - """ - Return plugins path (.asciidoc/filters or .asciidoc/themes) in user's - home directory or None if user home not defined. - """ - result = utils.userdir() - if result: - result = os.path.join(result, '.asciidoc', Plugin.type + 's') - return result - - @staticmethod - def install(args): - """ - Install plugin Zip file. - args[0] is plugin zip file path. - args[1] is optional destination plugins directory. - """ - if len(args) not in (1, 2): - die('invalid number of arguments: --%s install %s' % (Plugin.type, ' '.join(args))) - zip_file = args[0] - if not os.path.isfile(zip_file): - die('file not found: %s' % zip_file) - reo = re.match(r'^\w+', os.path.split(zip_file)[1]) - if not reo: - die('file name does not start with legal %s name: %s' % (Plugin.type, zip_file)) - plugin_name = reo.group() - if len(args) == 2: - plugins_dir = args[1] - if not os.path.isdir(plugins_dir): - die('directory not found: %s' % plugins_dir) - else: - plugins_dir = Plugin.get_dir() - if not plugins_dir: - die('user home directory is not defined') - plugin_dir = os.path.join(plugins_dir, plugin_name) - if os.path.exists(plugin_dir): - die('%s is already installed: %s' % (Plugin.type, plugin_dir)) - try: - os.makedirs(plugin_dir) - except Exception as e: - die('failed to create %s directory: %s' % (Plugin.type, str(e))) - try: - extract_zip(zip_file, plugin_dir) - except Exception as e: - if os.path.isdir(plugin_dir): - shutil.rmtree(plugin_dir) - die('failed to extract %s: %s' % (Plugin.type, str(e))) - - @staticmethod - def remove(args): - """ - Delete plugin directory. - args[0] is plugin name. - args[1] is optional plugin directory (defaults to ~/.asciidoc/). - """ - if len(args) not in (1, 2): - die('invalid number of arguments: --%s remove %s' % (Plugin.type, ' '.join(args))) - plugin_name = args[0] - if not re.match(r'^\w+$', plugin_name): - die('illegal %s name: %s' % (Plugin.type, plugin_name)) - if len(args) == 2: - d = args[1] - if not os.path.isdir(d): - die('directory not found: %s' % d) - else: - d = Plugin.get_dir() - if not d: - die('user directory is not defined') - plugin_dir = os.path.join(d, plugin_name) - if not os.path.isdir(plugin_dir): - die('cannot find %s: %s' % (Plugin.type, plugin_dir)) - try: - message.verbose('removing: %s' % plugin_dir) - shutil.rmtree(plugin_dir) - except Exception as e: - die('failed to delete %s: %s' % (Plugin.type, str(e))) - - @staticmethod - def list(args): - """ - List all plugin directories (global and local). - """ - for d in [os.path.join(d, Plugin.type + 's') for d in config.get_load_dirs()]: - if os.path.isdir(d): - for f in sorted(filter(os.path.isdir, [os.path.join(d, o) for o in os.listdir(d)])): - if f.endswith('__pycache__'): - continue - message.stdout(f) - - @staticmethod - def build(args): - """ - Create plugin Zip file. - args[0] is Zip file name. - args[1] is plugin directory. - """ - if len(args) != 2: - die('invalid number of arguments: --%s build %s' % (Plugin.type, ' '.join(args))) - zip_file = args[0] - plugin_source = args[1] - if not (os.path.isdir(plugin_source) or os.path.isfile(plugin_source)): - die('plugin source not found: %s' % plugin_source) - try: - create_zip(zip_file, plugin_source, skip_hidden=True) - except Exception as e: - die('failed to create %s: %s' % (zip_file, str(e))) - - # --------------------------------------------------------------------------- # Application code. # --------------------------------------------------------------------------- @@ -5784,7 +5587,6 @@ def reset_asciidoc(): BlockTitle.reset_class() Section.reset_class() AbstractBlock.reset_class() - Plugin.reset_class() def asciidoc(backend, doctype, confiles, infile, outfile, options): @@ -6140,11 +5942,12 @@ def cli(argv=None): sys.exit(1) # Look for plugin management commands. count = 0 + plugin_type = '' for o, v in opts: if o in ('-b', '--backend', '--filter', '--theme'): if o == '-b': o = '--backend' - plugin = o[2:] + plugin_type = o[2:] cmd = v if cmd not in Plugin.CMDS: continue @@ -6154,13 +5957,13 @@ def cli(argv=None): if count == 1: # Execute plugin management commands. if not cmd: - die('missing --%s command' % plugin) + die('missing --%s command' % plugin_type) if cmd not in Plugin.CMDS: - die('illegal --%s command: %s' % (plugin, cmd)) - Plugin.type = plugin + die('illegal --%s command: %s' % (plugin_type, cmd)) config.init() config.verbose = bool(set(['-v', '--verbose']) & set(opt_names)) - getattr(Plugin, cmd)(args) + plugin = Plugin(plugin_type, message, config) + getattr(plugin, cmd)(args) else: # Execute asciidoc. try: diff --git a/asciidoc/plugin.py b/asciidoc/plugin.py new file mode 100644 index 0000000..87e50f9 --- /dev/null +++ b/asciidoc/plugin.py @@ -0,0 +1,215 @@ +import os +import re +import shutil +import sys +from typing import NoReturn +import zipfile + +from .utils import userdir + + +class Plugin: + """ + --filter and --theme option commands. + """ + CMDS = ('install', 'remove', 'list', 'build') + + def __init__(self, type: str, message, config): + self.type = type + self.message = message + self.config = config + + def die(self, msg: str) -> NoReturn: + self.message.stderr(msg) + sys.exit(1) + + def get_dir(self) -> str: + """ + Return plugins path (.asciidoc/filters or .asciidoc/themes) in user's + home directory or None if user home not defined. + """ + result = userdir() + if result: + result = os.path.join(result, '.asciidoc', self.type + 's') + return result + + def install(self, args) -> None: + """ + Install plugin Zip file. + args[0] is plugin zip file path. + args[1] is optional destination plugins directory. + """ + if len(args) not in (1, 2): + self.die( + 'invalid number of arguments: --{} install {}'.format( + self.type, + ' '.join(args) + ) + ) + zip_file = args[0] + if not os.path.isfile(zip_file): + self.die('file not found: %s' % zip_file) + reo = re.match(r'^\w+', os.path.split(zip_file)[1]) + if not reo: + self.die('file name does not start with legal {} name: {}'.format( + self.type, + zip_file + )) + plugin_name = reo.group() + if len(args) == 2: + plugins_dir = args[1] + if not os.path.isdir(plugins_dir): + self.die('directory not found: %s' % plugins_dir) + else: + plugins_dir = Plugin.get_dir() + if not plugins_dir: + self.die('user home directory is not defined') + plugin_dir = os.path.join(plugins_dir, plugin_name) + if os.path.exists(plugin_dir): + self.die('%s is already installed: %s' % (self.type, plugin_dir)) + try: + os.makedirs(plugin_dir) + except Exception as e: + self.die('failed to create %s directory: %s' % (self.type, str(e))) + try: + self.extract_zip(zip_file, plugin_dir) + except Exception as e: + if os.path.isdir(plugin_dir): + shutil.rmtree(plugin_dir) + self.die('failed to extract %s: %s' % (self.type, str(e))) + + def remove(self, args) -> None: + """ + Delete plugin directory. + args[0] is plugin name. + args[1] is optional plugin directory (defaults to ~/.asciidoc/). + """ + if len(args) not in (1, 2): + self.die('invalid number of arguments: --{} remove {}'.format( + self.type, + ' '.join(args) + )) + plugin_name = args[0] + if not re.match(r'^\w+$', plugin_name): + self.die('illegal %s name: %s' % (self.type, plugin_name)) + if len(args) == 2: + d = args[1] + if not os.path.isdir(d): + self.die('directory not found: %s' % d) + else: + d = Plugin.get_dir() + if not d: + self.die('user directory is not defined') + plugin_dir = os.path.join(d, plugin_name) + if not os.path.isdir(plugin_dir): + self.die('cannot find %s: %s' % (self.type, plugin_dir)) + try: + self.message.verbose('removing: %s' % plugin_dir) + shutil.rmtree(plugin_dir) + except Exception as e: + self.die('failed to delete %s: %s' % (self.type, str(e))) + + def list(self, _) -> None: + """ + List all plugin directories (global and local). + """ + dirs = [os.path.join(d, self.type + 's') for d in self.config.get_load_dirs()] + for d in dirs: + if os.path.isdir(d): + plugin_dirs = [os.path.join(d, o) for o in os.listdir(d)] + for f in sorted(filter(os.path.isdir, plugin_dirs)): + if f.endswith('__pycache__'): + continue + self.message.stdout(f) + + def build(self, args) -> None: + """ + Create plugin Zip file. + args[0] is Zip file name. + args[1] is plugin directory. + """ + if len(args) != 2: + self.die('invalid number of arguments: --{} build {}'.format( + self.type, + ' '.join(args) + )) + zip_file = args[0] + plugin_source = args[1] + if not (os.path.isdir(plugin_source) or os.path.isfile(plugin_source)): + self.die('plugin source not found: %s' % plugin_source) + try: + self.create_zip(zip_file, plugin_source, skip_hidden=True) + except Exception as e: + self.die('failed to create %s: %s' % (zip_file, str(e))) + + def extract_zip(self, zip_file: str, destdir: str) -> None: + """ + Unzip Zip file to destination directory. + Throws exception if error occurs. + """ + zipo = zipfile.ZipFile(zip_file, 'r') + try: + for zi in zipo.infolist(): + outfile = zi.filename + if not outfile.endswith('/'): + d, outfile = os.path.split(outfile) + directory = os.path.normpath(os.path.join(destdir, d)) + if not os.path.isdir(directory): + os.makedirs(directory) + outfile = os.path.join(directory, outfile) + perms = (zi.external_attr >> 16) & 0o777 + self.message.verbose('extracting: %s' % outfile) + flags = os.O_CREAT | os.O_WRONLY + if sys.platform == 'win32': + flags |= os.O_BINARY + if perms == 0: + # Zip files created under Windows do not include permissions. + fh = os.open(outfile, flags) + else: + fh = os.open(outfile, flags, perms) + try: + os.write(fh, zipo.read(zi.filename)) + finally: + os.close(fh) + finally: + zipo.close() + + def create_zip(self, zip_file: str, src: str, skip_hidden: bool = False) -> None: + """ + Create Zip file. If src is a directory archive all contained files and + subdirectories, if src is a file archive the src file. + Files and directories names starting with . are skipped + if skip_hidden is True. + Throws exception if error occurs. + """ + zipo = zipfile.ZipFile(zip_file, 'w') + try: + if os.path.isfile(src): + arcname = os.path.basename(src) + self.message.verbose('archiving: %s' % arcname) + zipo.write(src, arcname, zipfile.ZIP_DEFLATED) + elif os.path.isdir(src): + srcdir = os.path.abspath(src) + if srcdir[-1] != os.path.sep: + srcdir += os.path.sep + for root, dirs, files in os.walk(srcdir): + arcroot = os.path.abspath(root)[len(srcdir):] + if skip_hidden: + for d in dirs[:]: + if d.startswith('.'): + self.message.verbose( + 'skipping: %s' % os.path.join(arcroot, d) + ) + del dirs[dirs.index(d)] + for f in files: + filename = os.path.join(root, f) + arcname = os.path.join(arcroot, f) + if skip_hidden and f.startswith('.'): + self.message.verbose('skipping: %s' % arcname) + continue + self.message.verbose('archiving: %s' % arcname) + zipo.write(filename, arcname, zipfile.ZIP_DEFLATED) + else: + raise ValueError('src must specify directory or file: %s' % src) + finally: + zipo.close() diff --git a/tests/__test_data__/plugin/backends/__pycache__/.gitkeep b/tests/__test_data__/plugin/backends/__pycache__/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/__test_data__/plugin/backends/bar/.gitkeep b/tests/__test_data__/plugin/backends/bar/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/__test_data__/plugin/backends/file b/tests/__test_data__/plugin/backends/file new file mode 100644 index 0000000..e69de29 diff --git a/tests/__test_data__/plugin/backends/foo/.gitkeep b/tests/__test_data__/plugin/backends/foo/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/__test_data__/plugin/filters/baz/.gitkeep b/tests/__test_data__/plugin/filters/baz/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/__test_data__/plugin/filters/qux/.gitkeep b/tests/__test_data__/plugin/filters/qux/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_plugin.py b/tests/test_plugin.py new file mode 100644 index 0000000..c9333f7 --- /dev/null +++ b/tests/test_plugin.py @@ -0,0 +1,18 @@ + + +from asciidoc.message import Message +from asciidoc.plugin import Plugin + +from .utils import Struct, TEST_DIR + +PLUGIN_DIR = TEST_DIR / 'plugin' +CONFIG = Struct(get_load_dirs=lambda: [str(PLUGIN_DIR)], verbose=True) + + +def test_plugin_list(capsys) -> None: + plugin = Plugin('backend', Message(None, None, CONFIG, None), CONFIG) + plugin.list([]) + captured = capsys.readouterr() + backend_dir = PLUGIN_DIR / 'backends' + assert captured.out == "{}\n{}\n".format(backend_dir / 'bar', backend_dir / 'foo') + assert captured.err == '' diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..427c728 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,17 @@ +from pathlib import Path + +TEST_DIR = Path(__file__).resolve().parent / '__test_data__' + + +class Struct: + """ + Use this to make "mock" version of asciidoc classes. Usage is passing in kwargs, + and these are set to the properties of the class. + + >>> a = Struct(foo=1, bar=2) + >>> a.foo + 1 + >>> a.bar + 2 + """ + def __init__(self, **entries): self.__dict__.update(entries) -- cgit v1.2.1