summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Peveler <matt.peveler@gmail.com>2022-02-17 10:09:43 -0500
committerMatthew Peveler <matt.peveler@gmail.com>2022-02-17 10:35:58 -0500
commit1173c768b0524c2dcb0a13a18dcf5dc440bcbc4b (patch)
tree6cddd890269b4249e59db82116b5d5e16a0ddc2d
parent84630fa680a18deb534391f01fd1d0231755259e (diff)
downloadasciidoc-py3-1173c768b0524c2dcb0a13a18dcf5dc440bcbc4b.tar.gz
Move Plugin to its own module
Signed-off-by: Matthew Peveler <matt.peveler@gmail.com>
-rw-r--r--.gitignore3
-rw-r--r--asciidoc/asciidoc.py211
-rw-r--r--asciidoc/plugin.py215
-rw-r--r--tests/__test_data__/plugin/backends/__pycache__/.gitkeep0
-rw-r--r--tests/__test_data__/plugin/backends/bar/.gitkeep0
-rw-r--r--tests/__test_data__/plugin/backends/file0
-rw-r--r--tests/__test_data__/plugin/backends/foo/.gitkeep0
-rw-r--r--tests/__test_data__/plugin/filters/baz/.gitkeep0
-rw-r--r--tests/__test_data__/plugin/filters/qux/.gitkeep0
-rw-r--r--tests/test_plugin.py18
-rw-r--r--tests/utils.py17
11 files changed, 260 insertions, 204 deletions
diff --git a/.gitignore b/.gitignore
index 79d42f8..6ae909b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,6 +28,9 @@ build/
dist/
*.egg-info/
+# test data
+!tests/__test_data__/plugin/backends/__pycache__
+
# test artifacts
.coverage
coverage.xml
diff --git a/asciidoc/asciidoc.py b/asciidoc/asciidoc.py
index 8550609..ef486fb 100644
--- a/asciidoc/asciidoc.py
+++ b/asciidoc/asciidoc.py
@@ -22,7 +22,6 @@ import getopt
import io
import os
import re
-import shutil
import subprocess
import sys
import tempfile
@@ -30,7 +29,6 @@ import time
import typing
import traceback
import unicodedata
-import zipfile
from collections import OrderedDict
@@ -38,6 +36,7 @@ from .blocks.table import parse_table_span_spec, Cell, Column
from .collections import AttrDict, InsensitiveDict
from .exceptions import EAsciiDoc
from .message import Message
+from .plugin import Plugin
from . import utils
CONF_DIR = os.path.join(os.path.dirname(__file__), 'resources')
@@ -5522,207 +5521,11 @@ class Tables_OLD(AbstractBlocks):
# End of deprecated old table classes.
# ---------------------------------------------------------------------------
-# ---------------------------------------------------------------------------
-# filter and theme plugin commands.
-# ---------------------------------------------------------------------------
def die(msg: str) -> typing.NoReturn:
message.stderr(msg)
sys.exit(1)
-def extract_zip(zip_file, destdir):
- """
- Unzip Zip file to destination directory.
- Throws exception if error occurs.
- """
- zipo = zipfile.ZipFile(zip_file, 'r')
- try:
- for zi in zipo.infolist():
- outfile = zi.filename
- if not outfile.endswith('/'):
- d, outfile = os.path.split(outfile)
- directory = os.path.normpath(os.path.join(destdir, d))
- if not os.path.isdir(directory):
- os.makedirs(directory)
- outfile = os.path.join(directory, outfile)
- perms = (zi.external_attr >> 16) & 0o777
- message.verbose('extracting: %s' % outfile)
- flags = os.O_CREAT | os.O_WRONLY
- if sys.platform == 'win32':
- flags |= os.O_BINARY
- if perms == 0:
- # Zip files created under Windows do not include permissions.
- fh = os.open(outfile, flags)
- else:
- fh = os.open(outfile, flags, perms)
- try:
- os.write(fh, zipo.read(zi.filename))
- finally:
- os.close(fh)
- finally:
- zipo.close()
-
-
-def create_zip(zip_file, src, skip_hidden=False):
- """
- Create Zip file. If src is a directory archive all contained files and
- subdirectories, if src is a file archive the src file.
- Files and directories names starting with . are skipped
- if skip_hidden is True.
- Throws exception if error occurs.
- """
- zipo = zipfile.ZipFile(zip_file, 'w')
- try:
- if os.path.isfile(src):
- arcname = os.path.basename(src)
- message.verbose('archiving: %s' % arcname)
- zipo.write(src, arcname, zipfile.ZIP_DEFLATED)
- elif os.path.isdir(src):
- srcdir = os.path.abspath(src)
- if srcdir[-1] != os.path.sep:
- srcdir += os.path.sep
- for root, dirs, files in os.walk(srcdir):
- arcroot = os.path.abspath(root)[len(srcdir):]
- if skip_hidden:
- for d in dirs[:]:
- if d.startswith('.'):
- message.verbose('skipping: %s' % os.path.join(arcroot, d))
- del dirs[dirs.index(d)]
- for f in files:
- filename = os.path.join(root, f)
- arcname = os.path.join(arcroot, f)
- if skip_hidden and f.startswith('.'):
- message.verbose('skipping: %s' % arcname)
- continue
- message.verbose('archiving: %s' % arcname)
- zipo.write(filename, arcname, zipfile.ZIP_DEFLATED)
- else:
- raise ValueError('src must specify directory or file: %s' % src)
- finally:
- zipo.close()
-
-
-class Plugin:
- """
- --filter and --theme option commands.
- """
- CMDS = ('install', 'remove', 'list', 'build')
-
- type = None # 'backend', 'filter' or 'theme'.
-
- @staticmethod
- def reset_class():
- Plugin.type = None
-
- @staticmethod
- def get_dir():
- """
- Return plugins path (.asciidoc/filters or .asciidoc/themes) in user's
- home directory or None if user home not defined.
- """
- result = utils.userdir()
- if result:
- result = os.path.join(result, '.asciidoc', Plugin.type + 's')
- return result
-
- @staticmethod
- def install(args):
- """
- Install plugin Zip file.
- args[0] is plugin zip file path.
- args[1] is optional destination plugins directory.
- """
- if len(args) not in (1, 2):
- die('invalid number of arguments: --%s install %s' % (Plugin.type, ' '.join(args)))
- zip_file = args[0]
- if not os.path.isfile(zip_file):
- die('file not found: %s' % zip_file)
- reo = re.match(r'^\w+', os.path.split(zip_file)[1])
- if not reo:
- die('file name does not start with legal %s name: %s' % (Plugin.type, zip_file))
- plugin_name = reo.group()
- if len(args) == 2:
- plugins_dir = args[1]
- if not os.path.isdir(plugins_dir):
- die('directory not found: %s' % plugins_dir)
- else:
- plugins_dir = Plugin.get_dir()
- if not plugins_dir:
- die('user home directory is not defined')
- plugin_dir = os.path.join(plugins_dir, plugin_name)
- if os.path.exists(plugin_dir):
- die('%s is already installed: %s' % (Plugin.type, plugin_dir))
- try:
- os.makedirs(plugin_dir)
- except Exception as e:
- die('failed to create %s directory: %s' % (Plugin.type, str(e)))
- try:
- extract_zip(zip_file, plugin_dir)
- except Exception as e:
- if os.path.isdir(plugin_dir):
- shutil.rmtree(plugin_dir)
- die('failed to extract %s: %s' % (Plugin.type, str(e)))
-
- @staticmethod
- def remove(args):
- """
- Delete plugin directory.
- args[0] is plugin name.
- args[1] is optional plugin directory (defaults to ~/.asciidoc/<plugin_name>).
- """
- if len(args) not in (1, 2):
- die('invalid number of arguments: --%s remove %s' % (Plugin.type, ' '.join(args)))
- plugin_name = args[0]
- if not re.match(r'^\w+$', plugin_name):
- die('illegal %s name: %s' % (Plugin.type, plugin_name))
- if len(args) == 2:
- d = args[1]
- if not os.path.isdir(d):
- die('directory not found: %s' % d)
- else:
- d = Plugin.get_dir()
- if not d:
- die('user directory is not defined')
- plugin_dir = os.path.join(d, plugin_name)
- if not os.path.isdir(plugin_dir):
- die('cannot find %s: %s' % (Plugin.type, plugin_dir))
- try:
- message.verbose('removing: %s' % plugin_dir)
- shutil.rmtree(plugin_dir)
- except Exception as e:
- die('failed to delete %s: %s' % (Plugin.type, str(e)))
-
- @staticmethod
- def list(args):
- """
- List all plugin directories (global and local).
- """
- for d in [os.path.join(d, Plugin.type + 's') for d in config.get_load_dirs()]:
- if os.path.isdir(d):
- for f in sorted(filter(os.path.isdir, [os.path.join(d, o) for o in os.listdir(d)])):
- if f.endswith('__pycache__'):
- continue
- message.stdout(f)
-
- @staticmethod
- def build(args):
- """
- Create plugin Zip file.
- args[0] is Zip file name.
- args[1] is plugin directory.
- """
- if len(args) != 2:
- die('invalid number of arguments: --%s build %s' % (Plugin.type, ' '.join(args)))
- zip_file = args[0]
- plugin_source = args[1]
- if not (os.path.isdir(plugin_source) or os.path.isfile(plugin_source)):
- die('plugin source not found: %s' % plugin_source)
- try:
- create_zip(zip_file, plugin_source, skip_hidden=True)
- except Exception as e:
- die('failed to create %s: %s' % (zip_file, str(e)))
-
-
# ---------------------------------------------------------------------------
# Application code.
# ---------------------------------------------------------------------------
@@ -5784,7 +5587,6 @@ def reset_asciidoc():
BlockTitle.reset_class()
Section.reset_class()
AbstractBlock.reset_class()
- Plugin.reset_class()
def asciidoc(backend, doctype, confiles, infile, outfile, options):
@@ -6140,11 +5942,12 @@ def cli(argv=None):
sys.exit(1)
# Look for plugin management commands.
count = 0
+ plugin_type = ''
for o, v in opts:
if o in ('-b', '--backend', '--filter', '--theme'):
if o == '-b':
o = '--backend'
- plugin = o[2:]
+ plugin_type = o[2:]
cmd = v
if cmd not in Plugin.CMDS:
continue
@@ -6154,13 +5957,13 @@ def cli(argv=None):
if count == 1:
# Execute plugin management commands.
if not cmd:
- die('missing --%s command' % plugin)
+ die('missing --%s command' % plugin_type)
if cmd not in Plugin.CMDS:
- die('illegal --%s command: %s' % (plugin, cmd))
- Plugin.type = plugin
+ die('illegal --%s command: %s' % (plugin_type, cmd))
config.init()
config.verbose = bool(set(['-v', '--verbose']) & set(opt_names))
- getattr(Plugin, cmd)(args)
+ plugin = Plugin(plugin_type, message, config)
+ getattr(plugin, cmd)(args)
else:
# Execute asciidoc.
try:
diff --git a/asciidoc/plugin.py b/asciidoc/plugin.py
new file mode 100644
index 0000000..87e50f9
--- /dev/null
+++ b/asciidoc/plugin.py
@@ -0,0 +1,215 @@
+import os
+import re
+import shutil
+import sys
+from typing import NoReturn
+import zipfile
+
+from .utils import userdir
+
+
+class Plugin:
+ """
+ --filter and --theme option commands.
+ """
+ CMDS = ('install', 'remove', 'list', 'build')
+
+ def __init__(self, type: str, message, config):
+ self.type = type
+ self.message = message
+ self.config = config
+
+ def die(self, msg: str) -> NoReturn:
+ self.message.stderr(msg)
+ sys.exit(1)
+
+ def get_dir(self) -> str:
+ """
+ Return plugins path (.asciidoc/filters or .asciidoc/themes) in user's
+ home directory or None if user home not defined.
+ """
+ result = userdir()
+ if result:
+ result = os.path.join(result, '.asciidoc', self.type + 's')
+ return result
+
+ def install(self, args) -> None:
+ """
+ Install plugin Zip file.
+ args[0] is plugin zip file path.
+ args[1] is optional destination plugins directory.
+ """
+ if len(args) not in (1, 2):
+ self.die(
+ 'invalid number of arguments: --{} install {}'.format(
+ self.type,
+ ' '.join(args)
+ )
+ )
+ zip_file = args[0]
+ if not os.path.isfile(zip_file):
+ self.die('file not found: %s' % zip_file)
+ reo = re.match(r'^\w+', os.path.split(zip_file)[1])
+ if not reo:
+ self.die('file name does not start with legal {} name: {}'.format(
+ self.type,
+ zip_file
+ ))
+ plugin_name = reo.group()
+ if len(args) == 2:
+ plugins_dir = args[1]
+ if not os.path.isdir(plugins_dir):
+ self.die('directory not found: %s' % plugins_dir)
+ else:
+ plugins_dir = Plugin.get_dir()
+ if not plugins_dir:
+ self.die('user home directory is not defined')
+ plugin_dir = os.path.join(plugins_dir, plugin_name)
+ if os.path.exists(plugin_dir):
+ self.die('%s is already installed: %s' % (self.type, plugin_dir))
+ try:
+ os.makedirs(plugin_dir)
+ except Exception as e:
+ self.die('failed to create %s directory: %s' % (self.type, str(e)))
+ try:
+ self.extract_zip(zip_file, plugin_dir)
+ except Exception as e:
+ if os.path.isdir(plugin_dir):
+ shutil.rmtree(plugin_dir)
+ self.die('failed to extract %s: %s' % (self.type, str(e)))
+
+ def remove(self, args) -> None:
+ """
+ Delete plugin directory.
+ args[0] is plugin name.
+ args[1] is optional plugin directory (defaults to ~/.asciidoc/<plugin_name>).
+ """
+ if len(args) not in (1, 2):
+ self.die('invalid number of arguments: --{} remove {}'.format(
+ self.type,
+ ' '.join(args)
+ ))
+ plugin_name = args[0]
+ if not re.match(r'^\w+$', plugin_name):
+ self.die('illegal %s name: %s' % (self.type, plugin_name))
+ if len(args) == 2:
+ d = args[1]
+ if not os.path.isdir(d):
+ self.die('directory not found: %s' % d)
+ else:
+ d = Plugin.get_dir()
+ if not d:
+ self.die('user directory is not defined')
+ plugin_dir = os.path.join(d, plugin_name)
+ if not os.path.isdir(plugin_dir):
+ self.die('cannot find %s: %s' % (self.type, plugin_dir))
+ try:
+ self.message.verbose('removing: %s' % plugin_dir)
+ shutil.rmtree(plugin_dir)
+ except Exception as e:
+ self.die('failed to delete %s: %s' % (self.type, str(e)))
+
+ def list(self, _) -> None:
+ """
+ List all plugin directories (global and local).
+ """
+ dirs = [os.path.join(d, self.type + 's') for d in self.config.get_load_dirs()]
+ for d in dirs:
+ if os.path.isdir(d):
+ plugin_dirs = [os.path.join(d, o) for o in os.listdir(d)]
+ for f in sorted(filter(os.path.isdir, plugin_dirs)):
+ if f.endswith('__pycache__'):
+ continue
+ self.message.stdout(f)
+
+ def build(self, args) -> None:
+ """
+ Create plugin Zip file.
+ args[0] is Zip file name.
+ args[1] is plugin directory.
+ """
+ if len(args) != 2:
+ self.die('invalid number of arguments: --{} build {}'.format(
+ self.type,
+ ' '.join(args)
+ ))
+ zip_file = args[0]
+ plugin_source = args[1]
+ if not (os.path.isdir(plugin_source) or os.path.isfile(plugin_source)):
+ self.die('plugin source not found: %s' % plugin_source)
+ try:
+ self.create_zip(zip_file, plugin_source, skip_hidden=True)
+ except Exception as e:
+ self.die('failed to create %s: %s' % (zip_file, str(e)))
+
+ def extract_zip(self, zip_file: str, destdir: str) -> None:
+ """
+ Unzip Zip file to destination directory.
+ Throws exception if error occurs.
+ """
+ zipo = zipfile.ZipFile(zip_file, 'r')
+ try:
+ for zi in zipo.infolist():
+ outfile = zi.filename
+ if not outfile.endswith('/'):
+ d, outfile = os.path.split(outfile)
+ directory = os.path.normpath(os.path.join(destdir, d))
+ if not os.path.isdir(directory):
+ os.makedirs(directory)
+ outfile = os.path.join(directory, outfile)
+ perms = (zi.external_attr >> 16) & 0o777
+ self.message.verbose('extracting: %s' % outfile)
+ flags = os.O_CREAT | os.O_WRONLY
+ if sys.platform == 'win32':
+ flags |= os.O_BINARY
+ if perms == 0:
+ # Zip files created under Windows do not include permissions.
+ fh = os.open(outfile, flags)
+ else:
+ fh = os.open(outfile, flags, perms)
+ try:
+ os.write(fh, zipo.read(zi.filename))
+ finally:
+ os.close(fh)
+ finally:
+ zipo.close()
+
+ def create_zip(self, zip_file: str, src: str, skip_hidden: bool = False) -> None:
+ """
+ Create Zip file. If src is a directory archive all contained files and
+ subdirectories, if src is a file archive the src file.
+ Files and directories names starting with . are skipped
+ if skip_hidden is True.
+ Throws exception if error occurs.
+ """
+ zipo = zipfile.ZipFile(zip_file, 'w')
+ try:
+ if os.path.isfile(src):
+ arcname = os.path.basename(src)
+ self.message.verbose('archiving: %s' % arcname)
+ zipo.write(src, arcname, zipfile.ZIP_DEFLATED)
+ elif os.path.isdir(src):
+ srcdir = os.path.abspath(src)
+ if srcdir[-1] != os.path.sep:
+ srcdir += os.path.sep
+ for root, dirs, files in os.walk(srcdir):
+ arcroot = os.path.abspath(root)[len(srcdir):]
+ if skip_hidden:
+ for d in dirs[:]:
+ if d.startswith('.'):
+ self.message.verbose(
+ 'skipping: %s' % os.path.join(arcroot, d)
+ )
+ del dirs[dirs.index(d)]
+ for f in files:
+ filename = os.path.join(root, f)
+ arcname = os.path.join(arcroot, f)
+ if skip_hidden and f.startswith('.'):
+ self.message.verbose('skipping: %s' % arcname)
+ continue
+ self.message.verbose('archiving: %s' % arcname)
+ zipo.write(filename, arcname, zipfile.ZIP_DEFLATED)
+ else:
+ raise ValueError('src must specify directory or file: %s' % src)
+ finally:
+ zipo.close()
diff --git a/tests/__test_data__/plugin/backends/__pycache__/.gitkeep b/tests/__test_data__/plugin/backends/__pycache__/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__test_data__/plugin/backends/__pycache__/.gitkeep
diff --git a/tests/__test_data__/plugin/backends/bar/.gitkeep b/tests/__test_data__/plugin/backends/bar/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__test_data__/plugin/backends/bar/.gitkeep
diff --git a/tests/__test_data__/plugin/backends/file b/tests/__test_data__/plugin/backends/file
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__test_data__/plugin/backends/file
diff --git a/tests/__test_data__/plugin/backends/foo/.gitkeep b/tests/__test_data__/plugin/backends/foo/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__test_data__/plugin/backends/foo/.gitkeep
diff --git a/tests/__test_data__/plugin/filters/baz/.gitkeep b/tests/__test_data__/plugin/filters/baz/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__test_data__/plugin/filters/baz/.gitkeep
diff --git a/tests/__test_data__/plugin/filters/qux/.gitkeep b/tests/__test_data__/plugin/filters/qux/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__test_data__/plugin/filters/qux/.gitkeep
diff --git a/tests/test_plugin.py b/tests/test_plugin.py
new file mode 100644
index 0000000..c9333f7
--- /dev/null
+++ b/tests/test_plugin.py
@@ -0,0 +1,18 @@
+
+
+from asciidoc.message import Message
+from asciidoc.plugin import Plugin
+
+from .utils import Struct, TEST_DIR
+
+PLUGIN_DIR = TEST_DIR / 'plugin'
+CONFIG = Struct(get_load_dirs=lambda: [str(PLUGIN_DIR)], verbose=True)
+
+
+def test_plugin_list(capsys) -> None:
+ plugin = Plugin('backend', Message(None, None, CONFIG, None), CONFIG)
+ plugin.list([])
+ captured = capsys.readouterr()
+ backend_dir = PLUGIN_DIR / 'backends'
+ assert captured.out == "{}\n{}\n".format(backend_dir / 'bar', backend_dir / 'foo')
+ assert captured.err == ''
diff --git a/tests/utils.py b/tests/utils.py
new file mode 100644
index 0000000..427c728
--- /dev/null
+++ b/tests/utils.py
@@ -0,0 +1,17 @@
+from pathlib import Path
+
+TEST_DIR = Path(__file__).resolve().parent / '__test_data__'
+
+
+class Struct:
+ """
+ Use this to make "mock" version of asciidoc classes. Usage is passing in kwargs,
+ and these are set to the properties of the class.
+
+ >>> a = Struct(foo=1, bar=2)
+ >>> a.foo
+ 1
+ >>> a.bar
+ 2
+ """
+ def __init__(self, **entries): self.__dict__.update(entries)