summaryrefslogtreecommitdiff
path: root/asciidoc/asciidoc.py
diff options
context:
space:
mode:
Diffstat (limited to 'asciidoc/asciidoc.py')
-rw-r--r--asciidoc/asciidoc.py211
1 files changed, 7 insertions, 204 deletions
diff --git a/asciidoc/asciidoc.py b/asciidoc/asciidoc.py
index 8550609..ef486fb 100644
--- a/asciidoc/asciidoc.py
+++ b/asciidoc/asciidoc.py
@@ -22,7 +22,6 @@ import getopt
import io
import os
import re
-import shutil
import subprocess
import sys
import tempfile
@@ -30,7 +29,6 @@ import time
import typing
import traceback
import unicodedata
-import zipfile
from collections import OrderedDict
@@ -38,6 +36,7 @@ from .blocks.table import parse_table_span_spec, Cell, Column
from .collections import AttrDict, InsensitiveDict
from .exceptions import EAsciiDoc
from .message import Message
+from .plugin import Plugin
from . import utils
CONF_DIR = os.path.join(os.path.dirname(__file__), 'resources')
@@ -5522,207 +5521,11 @@ class Tables_OLD(AbstractBlocks):
# End of deprecated old table classes.
# ---------------------------------------------------------------------------
-# ---------------------------------------------------------------------------
-# filter and theme plugin commands.
-# ---------------------------------------------------------------------------
def die(msg: str) -> typing.NoReturn:
message.stderr(msg)
sys.exit(1)
-def extract_zip(zip_file, destdir):
- """
- Unzip Zip file to destination directory.
- Throws exception if error occurs.
- """
- zipo = zipfile.ZipFile(zip_file, 'r')
- try:
- for zi in zipo.infolist():
- outfile = zi.filename
- if not outfile.endswith('/'):
- d, outfile = os.path.split(outfile)
- directory = os.path.normpath(os.path.join(destdir, d))
- if not os.path.isdir(directory):
- os.makedirs(directory)
- outfile = os.path.join(directory, outfile)
- perms = (zi.external_attr >> 16) & 0o777
- message.verbose('extracting: %s' % outfile)
- flags = os.O_CREAT | os.O_WRONLY
- if sys.platform == 'win32':
- flags |= os.O_BINARY
- if perms == 0:
- # Zip files created under Windows do not include permissions.
- fh = os.open(outfile, flags)
- else:
- fh = os.open(outfile, flags, perms)
- try:
- os.write(fh, zipo.read(zi.filename))
- finally:
- os.close(fh)
- finally:
- zipo.close()
-
-
-def create_zip(zip_file, src, skip_hidden=False):
- """
- Create Zip file. If src is a directory archive all contained files and
- subdirectories, if src is a file archive the src file.
- Files and directories names starting with . are skipped
- if skip_hidden is True.
- Throws exception if error occurs.
- """
- zipo = zipfile.ZipFile(zip_file, 'w')
- try:
- if os.path.isfile(src):
- arcname = os.path.basename(src)
- message.verbose('archiving: %s' % arcname)
- zipo.write(src, arcname, zipfile.ZIP_DEFLATED)
- elif os.path.isdir(src):
- srcdir = os.path.abspath(src)
- if srcdir[-1] != os.path.sep:
- srcdir += os.path.sep
- for root, dirs, files in os.walk(srcdir):
- arcroot = os.path.abspath(root)[len(srcdir):]
- if skip_hidden:
- for d in dirs[:]:
- if d.startswith('.'):
- message.verbose('skipping: %s' % os.path.join(arcroot, d))
- del dirs[dirs.index(d)]
- for f in files:
- filename = os.path.join(root, f)
- arcname = os.path.join(arcroot, f)
- if skip_hidden and f.startswith('.'):
- message.verbose('skipping: %s' % arcname)
- continue
- message.verbose('archiving: %s' % arcname)
- zipo.write(filename, arcname, zipfile.ZIP_DEFLATED)
- else:
- raise ValueError('src must specify directory or file: %s' % src)
- finally:
- zipo.close()
-
-
-class Plugin:
- """
- --filter and --theme option commands.
- """
- CMDS = ('install', 'remove', 'list', 'build')
-
- type = None # 'backend', 'filter' or 'theme'.
-
- @staticmethod
- def reset_class():
- Plugin.type = None
-
- @staticmethod
- def get_dir():
- """
- Return plugins path (.asciidoc/filters or .asciidoc/themes) in user's
- home directory or None if user home not defined.
- """
- result = utils.userdir()
- if result:
- result = os.path.join(result, '.asciidoc', Plugin.type + 's')
- return result
-
- @staticmethod
- def install(args):
- """
- Install plugin Zip file.
- args[0] is plugin zip file path.
- args[1] is optional destination plugins directory.
- """
- if len(args) not in (1, 2):
- die('invalid number of arguments: --%s install %s' % (Plugin.type, ' '.join(args)))
- zip_file = args[0]
- if not os.path.isfile(zip_file):
- die('file not found: %s' % zip_file)
- reo = re.match(r'^\w+', os.path.split(zip_file)[1])
- if not reo:
- die('file name does not start with legal %s name: %s' % (Plugin.type, zip_file))
- plugin_name = reo.group()
- if len(args) == 2:
- plugins_dir = args[1]
- if not os.path.isdir(plugins_dir):
- die('directory not found: %s' % plugins_dir)
- else:
- plugins_dir = Plugin.get_dir()
- if not plugins_dir:
- die('user home directory is not defined')
- plugin_dir = os.path.join(plugins_dir, plugin_name)
- if os.path.exists(plugin_dir):
- die('%s is already installed: %s' % (Plugin.type, plugin_dir))
- try:
- os.makedirs(plugin_dir)
- except Exception as e:
- die('failed to create %s directory: %s' % (Plugin.type, str(e)))
- try:
- extract_zip(zip_file, plugin_dir)
- except Exception as e:
- if os.path.isdir(plugin_dir):
- shutil.rmtree(plugin_dir)
- die('failed to extract %s: %s' % (Plugin.type, str(e)))
-
- @staticmethod
- def remove(args):
- """
- Delete plugin directory.
- args[0] is plugin name.
- args[1] is optional plugin directory (defaults to ~/.asciidoc/<plugin_name>).
- """
- if len(args) not in (1, 2):
- die('invalid number of arguments: --%s remove %s' % (Plugin.type, ' '.join(args)))
- plugin_name = args[0]
- if not re.match(r'^\w+$', plugin_name):
- die('illegal %s name: %s' % (Plugin.type, plugin_name))
- if len(args) == 2:
- d = args[1]
- if not os.path.isdir(d):
- die('directory not found: %s' % d)
- else:
- d = Plugin.get_dir()
- if not d:
- die('user directory is not defined')
- plugin_dir = os.path.join(d, plugin_name)
- if not os.path.isdir(plugin_dir):
- die('cannot find %s: %s' % (Plugin.type, plugin_dir))
- try:
- message.verbose('removing: %s' % plugin_dir)
- shutil.rmtree(plugin_dir)
- except Exception as e:
- die('failed to delete %s: %s' % (Plugin.type, str(e)))
-
- @staticmethod
- def list(args):
- """
- List all plugin directories (global and local).
- """
- for d in [os.path.join(d, Plugin.type + 's') for d in config.get_load_dirs()]:
- if os.path.isdir(d):
- for f in sorted(filter(os.path.isdir, [os.path.join(d, o) for o in os.listdir(d)])):
- if f.endswith('__pycache__'):
- continue
- message.stdout(f)
-
- @staticmethod
- def build(args):
- """
- Create plugin Zip file.
- args[0] is Zip file name.
- args[1] is plugin directory.
- """
- if len(args) != 2:
- die('invalid number of arguments: --%s build %s' % (Plugin.type, ' '.join(args)))
- zip_file = args[0]
- plugin_source = args[1]
- if not (os.path.isdir(plugin_source) or os.path.isfile(plugin_source)):
- die('plugin source not found: %s' % plugin_source)
- try:
- create_zip(zip_file, plugin_source, skip_hidden=True)
- except Exception as e:
- die('failed to create %s: %s' % (zip_file, str(e)))
-
-
# ---------------------------------------------------------------------------
# Application code.
# ---------------------------------------------------------------------------
@@ -5784,7 +5587,6 @@ def reset_asciidoc():
BlockTitle.reset_class()
Section.reset_class()
AbstractBlock.reset_class()
- Plugin.reset_class()
def asciidoc(backend, doctype, confiles, infile, outfile, options):
@@ -6140,11 +5942,12 @@ def cli(argv=None):
sys.exit(1)
# Look for plugin management commands.
count = 0
+ plugin_type = ''
for o, v in opts:
if o in ('-b', '--backend', '--filter', '--theme'):
if o == '-b':
o = '--backend'
- plugin = o[2:]
+ plugin_type = o[2:]
cmd = v
if cmd not in Plugin.CMDS:
continue
@@ -6154,13 +5957,13 @@ def cli(argv=None):
if count == 1:
# Execute plugin management commands.
if not cmd:
- die('missing --%s command' % plugin)
+ die('missing --%s command' % plugin_type)
if cmd not in Plugin.CMDS:
- die('illegal --%s command: %s' % (plugin, cmd))
- Plugin.type = plugin
+ die('illegal --%s command: %s' % (plugin_type, cmd))
config.init()
config.verbose = bool(set(['-v', '--verbose']) & set(opt_names))
- getattr(Plugin, cmd)(args)
+ plugin = Plugin(plugin_type, message, config)
+ getattr(plugin, cmd)(args)
else:
# Execute asciidoc.
try: