diff options
author | Matthew Peveler <matt.peveler@gmail.com> | 2022-06-27 02:57:30 +0000 |
---|---|---|
committer | Matthew Peveler <matt.peveler@gmail.com> | 2022-06-27 03:05:37 +0000 |
commit | 6aed4614ef7ae53495ad3bcb1fb820f880070a62 (patch) | |
tree | 7372def0fa577e39dad50ead017ee40ea31f1bf5 /asciidoc/io.py | |
parent | ac3dbb71aa6840c115218dd531fdf2d64a55437a (diff) | |
download | asciidoc-py3-6aed4614ef7ae53495ad3bcb1fb820f880070a62.tar.gz |
Move Reader/Writer to asciidoc.io modulematt-chore-io
Signed-off-by: Matthew Peveler <matt.peveler@gmail.com>
Diffstat (limited to 'asciidoc/io.py')
-rw-r--r-- | asciidoc/io.py | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/asciidoc/io.py b/asciidoc/io.py new file mode 100644 index 0000000..12118d0 --- /dev/null +++ b/asciidoc/io.py @@ -0,0 +1,493 @@ +from asciidoc import utils +from asciidoc.asciidoc import DEFAULT_NEWLINE, Config, Document, Lex, Macros, Trace +from asciidoc.asciidoc import is_attr_defined, safe, subs_tag, safe_filename +from asciidoc.asciidoc import subs_attrs, system +from asciidoc.attrs import parse_attributes +from asciidoc.exceptions import EAsciiDoc +from asciidoc.message import Message +import os +import re +import sys + + +UTF8_BOM = b'\xef\xbb\xbf'.decode('utf-8') + + +class Reader1: + """Line oriented AsciiDoc input file reader. Processes include and + conditional inclusion system macros. Tabs are expanded and lines are right + trimmed.""" + # This class is not used directly, use Reader class instead. + READ_BUFFER_MIN = 10 # Read buffer low level. + + def __init__( + self, + message: Message, + document: Document, + macros: Macros, + config: Config, + ): + self.f = None # Input file object. + self.fname = None # Input file name. + # Read ahead buffer containing [filename,linenumber,linetext] lists. + self.next = [] + self.cursor = None # Last read() [filename,linenumber,linetext]. + self.tabsize = 8 # Tab expansion number of spaces. + self.parent = None # Included reader's parent reader. + self._lineno = 0 # The last line read from file object f. + self.line_ranges = None # line ranges to include + self.current_depth = 0 # Current include depth. + self.max_depth = 10 # Initial maximum allowed include depth. + self.bom = None # Byte order mark (BOM). + self.infile = None # Saved document 'infile' attribute. + self.indir = None # Saved document 'indir' attribute. + self.message = message + self.document = document + self.macros = macros + self.config = config + + def open(self, fname): + self.fname = fname + self.message.verbose('reading: ' + fname) + if fname == '<stdin>': + self.f = sys.stdin + self.infile = None + self.indir = None + else: + self.f = open(fname, 'r', encoding='utf-8') + self.infile = fname + self.indir = os.path.dirname(fname) + self.document.attributes['infile'] = self.infile + self.document.attributes['indir'] = self.indir + self._lineno = 0 # The last line read from file object f. + self.next = [] + # Pre-fill buffer by reading the first line and then pushing it back. + if self.read(): + if self.cursor[2].startswith(UTF8_BOM): + self.cursor[2] = self.cursor[2][len(UTF8_BOM):] + self.bom = UTF8_BOM + self.unread(self.cursor) + self.cursor = None + + def closefile(self): + """Used by class methods to close nested include files.""" + self.f.close() + self.next = [] + + def close(self): + self.closefile() + self.__init__() + + def readline(self): + while True: + s = self.f.readline() + if s: + self._lineno = self._lineno + 1 + else: + break + + if self.line_ranges is not None: + for line_range in self.line_ranges: + if len(line_range) == 1 and self._lineno == line_range[0]: + break + elif len(line_range) == 2 and line_range[0] <= self._lineno and \ + (line_range[1] == -1 or self._lineno <= line_range[1]): + break + else: + continue + break + else: + break + return s + + def read(self, skip=False): + """Read next line. Return None if EOF. Expand tabs. Strip trailing + white space. Maintain self.next read ahead buffer. If skip=True then + conditional exclusion is active (ifdef and ifndef macros).""" + # Top up buffer. + if len(self.next) <= self.READ_BUFFER_MIN: + s = self.readline() + while s: + if self.tabsize != 0: + s = s.expandtabs(self.tabsize) + s = s.rstrip() + self.next.append([self.fname, self._lineno, s]) + if len(self.next) > self.READ_BUFFER_MIN: + break + s = self.readline() + # Return first (oldest) buffer entry. + if len(self.next) > 0: + self.cursor = self.next[0] + del self.next[0] + result = self.cursor[2] + # Check for include macro. + mo = self.macros.match('+', r'^include[1]?$', result) + if mo and not skip: + # Parse include macro attributes. + attrs = {} + parse_attributes(mo.group('attrlist'), attrs) + warnings = attrs.get('warnings', True) + # Don't process include macro once the maximum depth is reached. + if self.current_depth >= self.max_depth: + self.message.warning('maximum include depth exceeded') + return result + # Perform attribute substitution on include macro file name. + fname = subs_attrs(mo.group('target')) + if not fname: + return Reader1.read(self) # Return next input line. + if self.fname != '<stdin>': + fname = os.path.expandvars(os.path.expanduser(fname)) + fname = safe_filename(fname, os.path.dirname(self.fname)) + if not fname: + return Reader1.read(self) # Return next input line. + if not os.path.isfile(fname): + if warnings: + self.message.warning('include file not found: %s' % fname) + return Reader1.read(self) # Return next input line. + if mo.group('name') == 'include1': + if not self.config.dumping: + if fname not in self.config.include1: + self.message.verbose( + 'include1: ' + fname, + linenos=False, + ) + # Store the include file in memory for later + # retrieval by the {include1:} system attribute. + with open(fname, encoding='utf-8') as f: + self.config.include1[fname] = [ + s.rstrip() for s in f + ] + return '{include1:%s}' % fname + else: + # This is a configuration dump, just pass the macro + # call through. + return result + # Clone self and set as parent (self assumes the role of child). + parent = Reader1() + utils.assign(parent, self) + self.parent = parent + # Set attributes in child. + if 'tabsize' in attrs: + try: + val = int(attrs['tabsize']) + if not val >= 0: + raise ValueError('not >= 0') + self.tabsize = val + except ValueError: + raise EAsciiDoc('illegal include macro tabsize argument') + else: + self.tabsize = self.config.tabsize + if 'depth' in attrs: + try: + val = int(attrs['depth']) + if not val >= 1: + raise ValueError('not >= 1') + self.max_depth = self.current_depth + val + except ValueError: + raise EAsciiDoc("include macro: illegal 'depth' argument") + if 'lines' in attrs: + try: + if ';' in attrs['lines']: + ranges = attrs['lines'].split(';') + else: + ranges = attrs['lines'].split(',') + for idx in range(len(ranges)): + ranges[idx] = [int(x) for x in ranges[idx].split('..')] + self.line_ranges = ranges + except ValueError: + raise EAsciiDoc("include macro: illegal 'lines' argument") + # Process included file. + self.message.verbose('include: ' + fname, linenos=False) + self.open(fname) + self.current_depth = self.current_depth + 1 + result = Reader1.read(self) + else: + if not Reader1.eof(self): + result = Reader1.read(self) + else: + result = None + return result + + def eof(self): + """Returns True if all lines have been read.""" + if len(self.next) == 0: + # End of current file. + if self.parent: + self.closefile() + utils.assign(self, self.parent) # Restore parent reader. + self.document.attributes['infile'] = self.infile + self.document.attributes['indir'] = self.indir + return Reader1.eof(self) + else: + return True + else: + return False + + def read_next(self): + """Like read() but does not advance file pointer.""" + if Reader1.eof(self): + return None + else: + return self.next[0][2] + + def unread(self, cursor): + """Push the line (filename,linenumber,linetext) tuple back into the read + buffer. Note that it's up to the caller to restore the previous + cursor.""" + assert cursor + self.next.insert(0, cursor) + + +class Reader(Reader1): + """ Wraps (well, sought of) Reader1 class and implements conditional text + inclusion.""" + def __init__( + self, + message: Message, + document: Document, + macros: Macros, + config: Config, + ): + Reader1.__init__(self, message, document, macros, config) + self.depth = 0 # if nesting depth. + self.skip = False # true if we're skipping ifdef...endif. + self.skipname = '' # Name of current endif macro target. + self.skipto = -1 # The depth at which skipping is re-enabled. + + def read_super(self): + result = Reader1.read(self, self.skip) + if result is None and self.skip: + raise EAsciiDoc('missing endif::%s[]' % self.skipname) + return result + + def read(self): + result = self.read_super() + if result is None: + return None + while self.skip: + mo = self.macros.match('+', r'ifdef|ifndef|ifeval|endif', result) + if mo: + name = mo.group('name') + target = mo.group('target') + attrlist = mo.group('attrlist') + if name == 'endif': + self.depth -= 1 + if self.depth < 0: + raise EAsciiDoc('mismatched macro: %s' % result) + if self.depth == self.skipto: + self.skip = False + if target and self.skipname != target: + raise EAsciiDoc('mismatched macro: %s' % result) + else: + if name in ('ifdef', 'ifndef'): + if not target: + raise EAsciiDoc('missing macro target: %s' % result) + if not attrlist: + self.depth += 1 + elif name == 'ifeval': + if not attrlist: + raise EAsciiDoc('missing ifeval condition: %s' % result) + self.depth += 1 + result = self.read_super() + if result is None: + return None + mo = self.macros.match('+', r'ifdef|ifndef|ifeval|endif', result) + if mo: + name = mo.group('name') + target = mo.group('target') + attrlist = mo.group('attrlist') + if name == 'endif': + self.depth = self.depth - 1 + else: + if not target and name in ('ifdef', 'ifndef'): + raise EAsciiDoc('missing macro target: %s' % result) + defined = is_attr_defined(target, self.document.attributes) + if name == 'ifdef': + if attrlist: + if defined: + return attrlist + else: + self.skip = not defined + elif name == 'ifndef': + if attrlist: + if not defined: + return attrlist + else: + self.skip = defined + elif name == 'ifeval': + if safe(): + self.message.unsafe('ifeval invalid') + raise EAsciiDoc('ifeval invalid safe document') + if not attrlist: + raise EAsciiDoc('missing ifeval condition: %s' % result) + cond = False + attrlist = subs_attrs(attrlist) + if attrlist: + try: + cond = eval(attrlist) + except Exception as e: + raise EAsciiDoc( + 'error evaluating ifeval condition: %s: %s' % ( + result, + str(e) + ) + ) + self.message.verbose('ifeval: %s: %r' % (attrlist, cond)) + self.skip = not cond + if not attrlist or name == 'ifeval': + if self.skip: + self.skipto = self.depth + self.skipname = target + self.depth = self.depth + 1 + result = self.read() + if result: + # Expand executable block macros. + mo = self.macros.match('+', r'eval|sys|sys2', result) + if mo: + action = mo.group('name') + cmd = mo.group('attrlist') + result = system(action, cmd, is_macro=True) + self.cursor[2] = result # So we don't re-evaluate. + if result: + # Un=escape escaped system macros. + if self.macros.match('+', r'\\eval|\\sys|\\sys2|\\ifdef|\\ifndef|\\endif|\\include|\\include1', result): # noqa=E501 + result = result[1:] + return result + + def eof(self): + return self.read_next() is None + + def read_next(self): + save_cursor = self.cursor + result = self.read() + if result is not None: + self.unread(self.cursor) + self.cursor = save_cursor + return result + + def read_lines(self, count=1): + """Return tuple containing count lines.""" + result = [] + i = 0 + while i < count and not self.eof(): + result.append(self.read()) + return tuple(result) + + def read_ahead(self, count=1): + """Same as read_lines() but does not advance the file pointer.""" + result = [] + putback = [] + save_cursor = self.cursor + try: + i = 0 + while i < count and not self.eof(): + result.append(self.read()) + putback.append(self.cursor) + i = i + 1 + while putback: + self.unread(putback.pop()) + finally: + self.cursor = save_cursor + return tuple(result) + + def skip_blank_lines(self): + self.read_until(r'\s*\S+') + + def read_until(self, terminators, same_file=False): + """Like read() but reads lines up to (but not including) the first line + that matches the terminator regular expression, regular expression + object or list of regular expression objects. If same_file is True then + the terminating pattern must occur in the file the was being read when + the routine was called.""" + if same_file: + fname = self.cursor[0] + result = [] + if not isinstance(terminators, list): + if isinstance(terminators, str): + terminators = [re.compile(terminators)] + else: + terminators = [terminators] + while not self.eof(): + save_cursor = self.cursor + s = self.read() + if not same_file or fname == self.cursor[0]: + for reo in terminators: + if reo.match(s): + self.unread(self.cursor) + self.cursor = save_cursor + return tuple(result) + result.append(s) + return tuple(result) + + +class Writer: + """Writes lines to output file.""" + def __init__(self, message: Message, trace: Trace, config: Config): + self.newline = DEFAULT_NEWLINE # End of line terminator. + self.f = None # Output file object. + self.fname = None # Output file name. + self.lines_out = 0 # Number of lines written. + self.skip_blank_lines = False # If True don't output blank lines. + self.message = message + self.trace = trace + self.config = config + + def open(self, fname, bom=None): + """ + bom is optional byte order mark. + http://en.wikipedia.org/wiki/Byte-order_mark + """ + self.fname = fname + if fname == '<stdout>': + self.f = sys.stdout + else: + self.f = open(fname, 'w+', encoding='utf-8', newline="") + self.message.verbose('writing: ' + self.fname, False) + if bom: + self.f.write(bom) + self.lines_out = 0 + + def close(self): + if self.fname != '<stdout>': + self.f.close() + + def write_line(self, line=None): + if not (self.skip_blank_lines and (not line or not line.strip())): + # Replace out any escaped attributes with non-escaped versions + self.f.write((re.sub(r'\\({[a-zA-Z0-9_][a-zA-Z0-9_\-]*)', '\\1', line) or '') + self.newline) # noqa=E501 + self.lines_out = self.lines_out + 1 + + def write(self, *args, **kwargs): + """Iterates arguments, writes tuple and list arguments one line per + element, else writes argument as single line. If no arguments writes + blank line. If argument is None nothing is written. self.newline is + appended to each line.""" + if 'trace' in kwargs and len(args) > 0: + self.trace(kwargs['trace'], args[0]) + if len(args) == 0: + self.write_line() + self.lines_out = self.lines_out + 1 + else: + for arg in args: + if utils.is_array(arg): + for s in arg: + self.write_line(s) + elif arg is not None: + self.write_line(arg) + + def write_tag(self, tag, content, subs=None, d=None, **kwargs): + """Write content enveloped by tag. + Substitutions specified in the 'subs' list are perform on the + 'content'.""" + if subs is None: + subs = self.config.subsnormal + stag, etag = subs_tag(tag, d) + content = Lex.subs(content, subs) + if 'trace' in kwargs: + self.trace(kwargs['trace'], [stag] + content + [etag]) + if stag: + self.write(stag) + if content: + self.write(content) + if etag: + self.write(etag) |