summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Peveler <matt.peveler@gmail.com>2022-06-27 02:57:30 +0000
committerMatthew Peveler <matt.peveler@gmail.com>2022-06-27 03:05:37 +0000
commit6aed4614ef7ae53495ad3bcb1fb820f880070a62 (patch)
tree7372def0fa577e39dad50ead017ee40ea31f1bf5
parentac3dbb71aa6840c115218dd531fdf2d64a55437a (diff)
downloadasciidoc-py3-matt-chore-io.tar.gz
Move Reader/Writer to asciidoc.io modulematt-chore-io
Signed-off-by: Matthew Peveler <matt.peveler@gmail.com>
-rw-r--r--asciidoc/asciidoc.py467
-rw-r--r--asciidoc/io.py493
2 files changed, 499 insertions, 461 deletions
diff --git a/asciidoc/asciidoc.py b/asciidoc/asciidoc.py
index 0ccf24d..ac97da7 100644
--- a/asciidoc/asciidoc.py
+++ b/asciidoc/asciidoc.py
@@ -43,6 +43,7 @@ from .attrs import parse_attributes
from .blocks.table import parse_table_span_spec, Cell, Column
from .collections import AttrDict, InsensitiveDict
from .exceptions import EAsciiDoc
+from .io import Reader, Writer
from .message import Message
from .plugin import Plugin
@@ -3818,463 +3819,6 @@ class CalloutMap:
if listindex > maxlistindex:
message.warning('callout refers to non-existent list item ' + str(listindex))
-# ---------------------------------------------------------------------------
-# Input stream Reader and output stream writer classes.
-# ---------------------------------------------------------------------------
-
-
-UTF8_BOM = b'\xef\xbb\xbf'.decode('utf-8')
-
-
-class Reader1:
- """Line oriented AsciiDoc input file reader. Processes include and
- conditional inclusion system macros. Tabs are expanded and lines are right
- trimmed."""
- # This class is not used directly, use Reader class instead.
- READ_BUFFER_MIN = 10 # Read buffer low level.
-
- def __init__(self):
- self.f = None # Input file object.
- self.fname = None # Input file name.
- self.next = [] # Read ahead buffer containing [filename,linenumber,linetext] lists.
- self.cursor = None # Last read() [filename,linenumber,linetext].
- self.tabsize = 8 # Tab expansion number of spaces.
- self.parent = None # Included reader's parent reader.
- self._lineno = 0 # The last line read from file object f.
- self.line_ranges = None # line ranges to include
- self.current_depth = 0 # Current include depth.
- self.max_depth = 10 # Initial maximum allowed include depth.
- self.bom = None # Byte order mark (BOM).
- self.infile = None # Saved document 'infile' attribute.
- self.indir = None # Saved document 'indir' attribute.
-
- def open(self, fname):
- self.fname = fname
- message.verbose('reading: ' + fname)
- if fname == '<stdin>':
- self.f = sys.stdin
- self.infile = None
- self.indir = None
- else:
- self.f = open(fname, 'r', encoding='utf-8')
- self.infile = fname
- self.indir = os.path.dirname(fname)
- document.attributes['infile'] = self.infile
- document.attributes['indir'] = self.indir
- self._lineno = 0 # The last line read from file object f.
- self.next = []
- # Pre-fill buffer by reading the first line and then pushing it back.
- if self.read():
- if self.cursor[2].startswith(UTF8_BOM):
- self.cursor[2] = self.cursor[2][len(UTF8_BOM):]
- self.bom = UTF8_BOM
- self.unread(self.cursor)
- self.cursor = None
-
- def closefile(self):
- """Used by class methods to close nested include files."""
- self.f.close()
- self.next = []
-
- def close(self):
- self.closefile()
- self.__init__()
-
- def readline(self):
- while True:
- s = self.f.readline()
- if s:
- self._lineno = self._lineno + 1
- else:
- break
-
- if self.line_ranges is not None:
- for line_range in self.line_ranges:
- if len(line_range) == 1 and self._lineno == line_range[0]:
- break
- elif len(line_range) == 2 and line_range[0] <= self._lineno and (line_range[1] == -1 or self._lineno <= line_range[1]):
- break
- else:
- continue
- break
- else:
- break
- return s
-
- def read(self, skip=False):
- """Read next line. Return None if EOF. Expand tabs. Strip trailing
- white space. Maintain self.next read ahead buffer. If skip=True then
- conditional exclusion is active (ifdef and ifndef macros)."""
- # Top up buffer.
- if len(self.next) <= self.READ_BUFFER_MIN:
- s = self.readline()
- while s:
- if self.tabsize != 0:
- s = s.expandtabs(self.tabsize)
- s = s.rstrip()
- self.next.append([self.fname, self._lineno, s])
- if len(self.next) > self.READ_BUFFER_MIN:
- break
- s = self.readline()
- # Return first (oldest) buffer entry.
- if len(self.next) > 0:
- self.cursor = self.next[0]
- del self.next[0]
- result = self.cursor[2]
- # Check for include macro.
- mo = macros.match('+', r'^include[1]?$', result)
- if mo and not skip:
- # Parse include macro attributes.
- attrs = {}
- parse_attributes(mo.group('attrlist'), attrs)
- warnings = attrs.get('warnings', True)
- # Don't process include macro once the maximum depth is reached.
- if self.current_depth >= self.max_depth:
- message.warning('maximum include depth exceeded')
- return result
- # Perform attribute substitution on include macro file name.
- fname = subs_attrs(mo.group('target'))
- if not fname:
- return Reader1.read(self) # Return next input line.
- if self.fname != '<stdin>':
- fname = os.path.expandvars(os.path.expanduser(fname))
- fname = safe_filename(fname, os.path.dirname(self.fname))
- if not fname:
- return Reader1.read(self) # Return next input line.
- if not os.path.isfile(fname):
- if warnings:
- message.warning('include file not found: %s' % fname)
- return Reader1.read(self) # Return next input line.
- if mo.group('name') == 'include1':
- if not config.dumping:
- if fname not in config.include1:
- message.verbose('include1: ' + fname, linenos=False)
- # Store the include file in memory for later
- # retrieval by the {include1:} system attribute.
- with open(fname, encoding='utf-8') as f:
- config.include1[fname] = [s.rstrip() for s in f]
- return '{include1:%s}' % fname
- else:
- # This is a configuration dump, just pass the macro
- # call through.
- return result
- # Clone self and set as parent (self assumes the role of child).
- parent = Reader1()
- utils.assign(parent, self)
- self.parent = parent
- # Set attributes in child.
- if 'tabsize' in attrs:
- try:
- val = int(attrs['tabsize'])
- if not val >= 0:
- raise ValueError('not >= 0')
- self.tabsize = val
- except ValueError:
- raise EAsciiDoc('illegal include macro tabsize argument')
- else:
- self.tabsize = config.tabsize
- if 'depth' in attrs:
- try:
- val = int(attrs['depth'])
- if not val >= 1:
- raise ValueError('not >= 1')
- self.max_depth = self.current_depth + val
- except ValueError:
- raise EAsciiDoc("include macro: illegal 'depth' argument")
- if 'lines' in attrs:
- try:
- if ';' in attrs['lines']:
- ranges = attrs['lines'].split(';')
- else:
- ranges = attrs['lines'].split(',')
- for idx in range(len(ranges)):
- ranges[idx] = [int(x) for x in ranges[idx].split('..')]
- self.line_ranges = ranges
- except ValueError:
- raise EAsciiDoc("include macro: illegal 'lines' argument")
- # Process included file.
- message.verbose('include: ' + fname, linenos=False)
- self.open(fname)
- self.current_depth = self.current_depth + 1
- result = Reader1.read(self)
- else:
- if not Reader1.eof(self):
- result = Reader1.read(self)
- else:
- result = None
- return result
-
- def eof(self):
- """Returns True if all lines have been read."""
- if len(self.next) == 0:
- # End of current file.
- if self.parent:
- self.closefile()
- utils.assign(self, self.parent) # Restore parent reader.
- document.attributes['infile'] = self.infile
- document.attributes['indir'] = self.indir
- return Reader1.eof(self)
- else:
- return True
- else:
- return False
-
- def read_next(self):
- """Like read() but does not advance file pointer."""
- if Reader1.eof(self):
- return None
- else:
- return self.next[0][2]
-
- def unread(self, cursor):
- """Push the line (filename,linenumber,linetext) tuple back into the read
- buffer. Note that it's up to the caller to restore the previous
- cursor."""
- assert cursor
- self.next.insert(0, cursor)
-
-
-class Reader(Reader1):
- """ Wraps (well, sought of) Reader1 class and implements conditional text
- inclusion."""
- def __init__(self):
- Reader1.__init__(self)
- self.depth = 0 # if nesting depth.
- self.skip = False # true if we're skipping ifdef...endif.
- self.skipname = '' # Name of current endif macro target.
- self.skipto = -1 # The depth at which skipping is re-enabled.
-
- def read_super(self):
- result = Reader1.read(self, self.skip)
- if result is None and self.skip:
- raise EAsciiDoc('missing endif::%s[]' % self.skipname)
- return result
-
- def read(self):
- result = self.read_super()
- if result is None:
- return None
- while self.skip:
- mo = macros.match('+', r'ifdef|ifndef|ifeval|endif', result)
- if mo:
- name = mo.group('name')
- target = mo.group('target')
- attrlist = mo.group('attrlist')
- if name == 'endif':
- self.depth -= 1
- if self.depth < 0:
- raise EAsciiDoc('mismatched macro: %s' % result)
- if self.depth == self.skipto:
- self.skip = False
- if target and self.skipname != target:
- raise EAsciiDoc('mismatched macro: %s' % result)
- else:
- if name in ('ifdef', 'ifndef'):
- if not target:
- raise EAsciiDoc('missing macro target: %s' % result)
- if not attrlist:
- self.depth += 1
- elif name == 'ifeval':
- if not attrlist:
- raise EAsciiDoc('missing ifeval condition: %s' % result)
- self.depth += 1
- result = self.read_super()
- if result is None:
- return None
- mo = macros.match('+', r'ifdef|ifndef|ifeval|endif', result)
- if mo:
- name = mo.group('name')
- target = mo.group('target')
- attrlist = mo.group('attrlist')
- if name == 'endif':
- self.depth = self.depth - 1
- else:
- if not target and name in ('ifdef', 'ifndef'):
- raise EAsciiDoc('missing macro target: %s' % result)
- defined = is_attr_defined(target, document.attributes)
- if name == 'ifdef':
- if attrlist:
- if defined:
- return attrlist
- else:
- self.skip = not defined
- elif name == 'ifndef':
- if attrlist:
- if not defined:
- return attrlist
- else:
- self.skip = defined
- elif name == 'ifeval':
- if safe():
- message.unsafe('ifeval invalid')
- raise EAsciiDoc('ifeval invalid safe document')
- if not attrlist:
- raise EAsciiDoc('missing ifeval condition: %s' % result)
- cond = False
- attrlist = subs_attrs(attrlist)
- if attrlist:
- try:
- cond = eval(attrlist)
- except Exception as e:
- raise EAsciiDoc('error evaluating ifeval condition: %s: %s' % (result, str(e)))
- message.verbose('ifeval: %s: %r' % (attrlist, cond))
- self.skip = not cond
- if not attrlist or name == 'ifeval':
- if self.skip:
- self.skipto = self.depth
- self.skipname = target
- self.depth = self.depth + 1
- result = self.read()
- if result:
- # Expand executable block macros.
- mo = macros.match('+', r'eval|sys|sys2', result)
- if mo:
- action = mo.group('name')
- cmd = mo.group('attrlist')
- result = system(action, cmd, is_macro=True)
- self.cursor[2] = result # So we don't re-evaluate.
- if result:
- # Un=escape escaped system macros.
- if macros.match('+', r'\\eval|\\sys|\\sys2|\\ifdef|\\ifndef|\\endif|\\include|\\include1', result):
- result = result[1:]
- return result
-
- def eof(self):
- return self.read_next() is None
-
- def read_next(self):
- save_cursor = self.cursor
- result = self.read()
- if result is not None:
- self.unread(self.cursor)
- self.cursor = save_cursor
- return result
-
- def read_lines(self, count=1):
- """Return tuple containing count lines."""
- result = []
- i = 0
- while i < count and not self.eof():
- result.append(self.read())
- return tuple(result)
-
- def read_ahead(self, count=1):
- """Same as read_lines() but does not advance the file pointer."""
- result = []
- putback = []
- save_cursor = self.cursor
- try:
- i = 0
- while i < count and not self.eof():
- result.append(self.read())
- putback.append(self.cursor)
- i = i + 1
- while putback:
- self.unread(putback.pop())
- finally:
- self.cursor = save_cursor
- return tuple(result)
-
- @staticmethod
- def skip_blank_lines():
- reader.read_until(r'\s*\S+')
-
- def read_until(self, terminators, same_file=False):
- """Like read() but reads lines up to (but not including) the first line
- that matches the terminator regular expression, regular expression
- object or list of regular expression objects. If same_file is True then
- the terminating pattern must occur in the file the was being read when
- the routine was called."""
- if same_file:
- fname = self.cursor[0]
- result = []
- if not isinstance(terminators, list):
- if isinstance(terminators, str):
- terminators = [re.compile(terminators)]
- else:
- terminators = [terminators]
- while not self.eof():
- save_cursor = self.cursor
- s = self.read()
- if not same_file or fname == self.cursor[0]:
- for reo in terminators:
- if reo.match(s):
- self.unread(self.cursor)
- self.cursor = save_cursor
- return tuple(result)
- result.append(s)
- return tuple(result)
-
-
-class Writer:
- """Writes lines to output file."""
- def __init__(self):
- self.newline = DEFAULT_NEWLINE # End of line terminator.
- self.f = None # Output file object.
- self.fname = None # Output file name.
- self.lines_out = 0 # Number of lines written.
- self.skip_blank_lines = False # If True don't output blank lines.
-
- def open(self, fname, bom=None):
- """
- bom is optional byte order mark.
- http://en.wikipedia.org/wiki/Byte-order_mark
- """
- self.fname = fname
- if fname == '<stdout>':
- self.f = sys.stdout
- else:
- self.f = open(fname, 'w+', encoding='utf-8', newline="")
- message.verbose('writing: ' + writer.fname, False)
- if bom:
- self.f.write(bom)
- self.lines_out = 0
-
- def close(self):
- if self.fname != '<stdout>':
- self.f.close()
-
- def write_line(self, line=None):
- if not (self.skip_blank_lines and (not line or not line.strip())):
- # Replace out any escaped attributes with non-escaped versions
- self.f.write((re.sub(r'\\({[a-zA-Z0-9_][a-zA-Z0-9_\-]*)', '\\1', line) or '') + self.newline)
- self.lines_out = self.lines_out + 1
-
- def write(self, *args, **kwargs):
- """Iterates arguments, writes tuple and list arguments one line per
- element, else writes argument as single line. If no arguments writes
- blank line. If argument is None nothing is written. self.newline is
- appended to each line."""
- if 'trace' in kwargs and len(args) > 0:
- trace(kwargs['trace'], args[0])
- if len(args) == 0:
- self.write_line()
- self.lines_out = self.lines_out + 1
- else:
- for arg in args:
- if utils.is_array(arg):
- for s in arg:
- self.write_line(s)
- elif arg is not None:
- self.write_line(arg)
-
- def write_tag(self, tag, content, subs=None, d=None, **kwargs):
- """Write content enveloped by tag.
- Substitutions specified in the 'subs' list are perform on the
- 'content'."""
- if subs is None:
- subs = config.subsnormal
- stag, etag = subs_tag(tag, d)
- content = Lex.subs(content, subs)
- if 'trace' in kwargs:
- trace(kwargs['trace'], [stag] + content + [etag])
- if stag:
- self.write(stag)
- if content:
- self.write(content)
- if etag:
- self.write(etag)
-
# ---------------------------------------------------------------------------
# Configuration file processing.
@@ -5488,8 +5032,7 @@ APPLICATION_CALLER = __name__
# -------
document = Document() # The document being processed.
config = Config() # Configuration file reader.
-reader = Reader() # Input stream line reader.
-writer = Writer() # Output stream line writer.
+
message = Message(APPLICATION_CALLER, document, config, reader) # Message functions.
paragraphs = Paragraphs() # Paragraph definitions.
lists = Lists() # List definitions.
@@ -5499,6 +5042,8 @@ tables = Tables() # Table definitions.
macros = Macros() # Macro definitions.
calloutmap = CalloutMap() # Coordinates callouts and callout list.
trace = Trace() # Implements trace attribute processing.
+reader = Reader(message, document, macros, config) # Input stream line reader.
+writer = Writer(message, trace, config) # Output stream line writer.
# Used by asciidocapi.py #
# List of message strings written to stderr.
@@ -5518,8 +5063,6 @@ def reset_asciidoc():
document = Document()
config = Config()
- reader = Reader()
- writer = Writer()
message = Message(APPLICATION_CALLER, document, config, reader)
paragraphs = Paragraphs()
lists = Lists()
@@ -5529,6 +5072,8 @@ def reset_asciidoc():
macros = Macros()
calloutmap = CalloutMap()
trace = Trace()
+ reader = Reader(message, document, macros, config)
+ writer = Writer(message, trace, config)
Lex.reset_class()
AttributeEntry.reset_class()
diff --git a/asciidoc/io.py b/asciidoc/io.py
new file mode 100644
index 0000000..12118d0
--- /dev/null
+++ b/asciidoc/io.py
@@ -0,0 +1,493 @@
+from asciidoc import utils
+from asciidoc.asciidoc import DEFAULT_NEWLINE, Config, Document, Lex, Macros, Trace
+from asciidoc.asciidoc import is_attr_defined, safe, subs_tag, safe_filename
+from asciidoc.asciidoc import subs_attrs, system
+from asciidoc.attrs import parse_attributes
+from asciidoc.exceptions import EAsciiDoc
+from asciidoc.message import Message
+import os
+import re
+import sys
+
+
+UTF8_BOM = b'\xef\xbb\xbf'.decode('utf-8')
+
+
+class Reader1:
+ """Line oriented AsciiDoc input file reader. Processes include and
+ conditional inclusion system macros. Tabs are expanded and lines are right
+ trimmed."""
+ # This class is not used directly, use Reader class instead.
+ READ_BUFFER_MIN = 10 # Read buffer low level.
+
+ def __init__(
+ self,
+ message: Message,
+ document: Document,
+ macros: Macros,
+ config: Config,
+ ):
+ self.f = None # Input file object.
+ self.fname = None # Input file name.
+ # Read ahead buffer containing [filename,linenumber,linetext] lists.
+ self.next = []
+ self.cursor = None # Last read() [filename,linenumber,linetext].
+ self.tabsize = 8 # Tab expansion number of spaces.
+ self.parent = None # Included reader's parent reader.
+ self._lineno = 0 # The last line read from file object f.
+ self.line_ranges = None # line ranges to include
+ self.current_depth = 0 # Current include depth.
+ self.max_depth = 10 # Initial maximum allowed include depth.
+ self.bom = None # Byte order mark (BOM).
+ self.infile = None # Saved document 'infile' attribute.
+ self.indir = None # Saved document 'indir' attribute.
+ self.message = message
+ self.document = document
+ self.macros = macros
+ self.config = config
+
+ def open(self, fname):
+ self.fname = fname
+ self.message.verbose('reading: ' + fname)
+ if fname == '<stdin>':
+ self.f = sys.stdin
+ self.infile = None
+ self.indir = None
+ else:
+ self.f = open(fname, 'r', encoding='utf-8')
+ self.infile = fname
+ self.indir = os.path.dirname(fname)
+ self.document.attributes['infile'] = self.infile
+ self.document.attributes['indir'] = self.indir
+ self._lineno = 0 # The last line read from file object f.
+ self.next = []
+ # Pre-fill buffer by reading the first line and then pushing it back.
+ if self.read():
+ if self.cursor[2].startswith(UTF8_BOM):
+ self.cursor[2] = self.cursor[2][len(UTF8_BOM):]
+ self.bom = UTF8_BOM
+ self.unread(self.cursor)
+ self.cursor = None
+
+ def closefile(self):
+ """Used by class methods to close nested include files."""
+ self.f.close()
+ self.next = []
+
+ def close(self):
+ self.closefile()
+ self.__init__()
+
+ def readline(self):
+ while True:
+ s = self.f.readline()
+ if s:
+ self._lineno = self._lineno + 1
+ else:
+ break
+
+ if self.line_ranges is not None:
+ for line_range in self.line_ranges:
+ if len(line_range) == 1 and self._lineno == line_range[0]:
+ break
+ elif len(line_range) == 2 and line_range[0] <= self._lineno and \
+ (line_range[1] == -1 or self._lineno <= line_range[1]):
+ break
+ else:
+ continue
+ break
+ else:
+ break
+ return s
+
+ def read(self, skip=False):
+ """Read next line. Return None if EOF. Expand tabs. Strip trailing
+ white space. Maintain self.next read ahead buffer. If skip=True then
+ conditional exclusion is active (ifdef and ifndef macros)."""
+ # Top up buffer.
+ if len(self.next) <= self.READ_BUFFER_MIN:
+ s = self.readline()
+ while s:
+ if self.tabsize != 0:
+ s = s.expandtabs(self.tabsize)
+ s = s.rstrip()
+ self.next.append([self.fname, self._lineno, s])
+ if len(self.next) > self.READ_BUFFER_MIN:
+ break
+ s = self.readline()
+ # Return first (oldest) buffer entry.
+ if len(self.next) > 0:
+ self.cursor = self.next[0]
+ del self.next[0]
+ result = self.cursor[2]
+ # Check for include macro.
+ mo = self.macros.match('+', r'^include[1]?$', result)
+ if mo and not skip:
+ # Parse include macro attributes.
+ attrs = {}
+ parse_attributes(mo.group('attrlist'), attrs)
+ warnings = attrs.get('warnings', True)
+ # Don't process include macro once the maximum depth is reached.
+ if self.current_depth >= self.max_depth:
+ self.message.warning('maximum include depth exceeded')
+ return result
+ # Perform attribute substitution on include macro file name.
+ fname = subs_attrs(mo.group('target'))
+ if not fname:
+ return Reader1.read(self) # Return next input line.
+ if self.fname != '<stdin>':
+ fname = os.path.expandvars(os.path.expanduser(fname))
+ fname = safe_filename(fname, os.path.dirname(self.fname))
+ if not fname:
+ return Reader1.read(self) # Return next input line.
+ if not os.path.isfile(fname):
+ if warnings:
+ self.message.warning('include file not found: %s' % fname)
+ return Reader1.read(self) # Return next input line.
+ if mo.group('name') == 'include1':
+ if not self.config.dumping:
+ if fname not in self.config.include1:
+ self.message.verbose(
+ 'include1: ' + fname,
+ linenos=False,
+ )
+ # Store the include file in memory for later
+ # retrieval by the {include1:} system attribute.
+ with open(fname, encoding='utf-8') as f:
+ self.config.include1[fname] = [
+ s.rstrip() for s in f
+ ]
+ return '{include1:%s}' % fname
+ else:
+ # This is a configuration dump, just pass the macro
+ # call through.
+ return result
+ # Clone self and set as parent (self assumes the role of child).
+ parent = Reader1()
+ utils.assign(parent, self)
+ self.parent = parent
+ # Set attributes in child.
+ if 'tabsize' in attrs:
+ try:
+ val = int(attrs['tabsize'])
+ if not val >= 0:
+ raise ValueError('not >= 0')
+ self.tabsize = val
+ except ValueError:
+ raise EAsciiDoc('illegal include macro tabsize argument')
+ else:
+ self.tabsize = self.config.tabsize
+ if 'depth' in attrs:
+ try:
+ val = int(attrs['depth'])
+ if not val >= 1:
+ raise ValueError('not >= 1')
+ self.max_depth = self.current_depth + val
+ except ValueError:
+ raise EAsciiDoc("include macro: illegal 'depth' argument")
+ if 'lines' in attrs:
+ try:
+ if ';' in attrs['lines']:
+ ranges = attrs['lines'].split(';')
+ else:
+ ranges = attrs['lines'].split(',')
+ for idx in range(len(ranges)):
+ ranges[idx] = [int(x) for x in ranges[idx].split('..')]
+ self.line_ranges = ranges
+ except ValueError:
+ raise EAsciiDoc("include macro: illegal 'lines' argument")
+ # Process included file.
+ self.message.verbose('include: ' + fname, linenos=False)
+ self.open(fname)
+ self.current_depth = self.current_depth + 1
+ result = Reader1.read(self)
+ else:
+ if not Reader1.eof(self):
+ result = Reader1.read(self)
+ else:
+ result = None
+ return result
+
+ def eof(self):
+ """Returns True if all lines have been read."""
+ if len(self.next) == 0:
+ # End of current file.
+ if self.parent:
+ self.closefile()
+ utils.assign(self, self.parent) # Restore parent reader.
+ self.document.attributes['infile'] = self.infile
+ self.document.attributes['indir'] = self.indir
+ return Reader1.eof(self)
+ else:
+ return True
+ else:
+ return False
+
+ def read_next(self):
+ """Like read() but does not advance file pointer."""
+ if Reader1.eof(self):
+ return None
+ else:
+ return self.next[0][2]
+
+ def unread(self, cursor):
+ """Push the line (filename,linenumber,linetext) tuple back into the read
+ buffer. Note that it's up to the caller to restore the previous
+ cursor."""
+ assert cursor
+ self.next.insert(0, cursor)
+
+
+class Reader(Reader1):
+ """ Wraps (well, sought of) Reader1 class and implements conditional text
+ inclusion."""
+ def __init__(
+ self,
+ message: Message,
+ document: Document,
+ macros: Macros,
+ config: Config,
+ ):
+ Reader1.__init__(self, message, document, macros, config)
+ self.depth = 0 # if nesting depth.
+ self.skip = False # true if we're skipping ifdef...endif.
+ self.skipname = '' # Name of current endif macro target.
+ self.skipto = -1 # The depth at which skipping is re-enabled.
+
+ def read_super(self):
+ result = Reader1.read(self, self.skip)
+ if result is None and self.skip:
+ raise EAsciiDoc('missing endif::%s[]' % self.skipname)
+ return result
+
+ def read(self):
+ result = self.read_super()
+ if result is None:
+ return None
+ while self.skip:
+ mo = self.macros.match('+', r'ifdef|ifndef|ifeval|endif', result)
+ if mo:
+ name = mo.group('name')
+ target = mo.group('target')
+ attrlist = mo.group('attrlist')
+ if name == 'endif':
+ self.depth -= 1
+ if self.depth < 0:
+ raise EAsciiDoc('mismatched macro: %s' % result)
+ if self.depth == self.skipto:
+ self.skip = False
+ if target and self.skipname != target:
+ raise EAsciiDoc('mismatched macro: %s' % result)
+ else:
+ if name in ('ifdef', 'ifndef'):
+ if not target:
+ raise EAsciiDoc('missing macro target: %s' % result)
+ if not attrlist:
+ self.depth += 1
+ elif name == 'ifeval':
+ if not attrlist:
+ raise EAsciiDoc('missing ifeval condition: %s' % result)
+ self.depth += 1
+ result = self.read_super()
+ if result is None:
+ return None
+ mo = self.macros.match('+', r'ifdef|ifndef|ifeval|endif', result)
+ if mo:
+ name = mo.group('name')
+ target = mo.group('target')
+ attrlist = mo.group('attrlist')
+ if name == 'endif':
+ self.depth = self.depth - 1
+ else:
+ if not target and name in ('ifdef', 'ifndef'):
+ raise EAsciiDoc('missing macro target: %s' % result)
+ defined = is_attr_defined(target, self.document.attributes)
+ if name == 'ifdef':
+ if attrlist:
+ if defined:
+ return attrlist
+ else:
+ self.skip = not defined
+ elif name == 'ifndef':
+ if attrlist:
+ if not defined:
+ return attrlist
+ else:
+ self.skip = defined
+ elif name == 'ifeval':
+ if safe():
+ self.message.unsafe('ifeval invalid')
+ raise EAsciiDoc('ifeval invalid safe document')
+ if not attrlist:
+ raise EAsciiDoc('missing ifeval condition: %s' % result)
+ cond = False
+ attrlist = subs_attrs(attrlist)
+ if attrlist:
+ try:
+ cond = eval(attrlist)
+ except Exception as e:
+ raise EAsciiDoc(
+ 'error evaluating ifeval condition: %s: %s' % (
+ result,
+ str(e)
+ )
+ )
+ self.message.verbose('ifeval: %s: %r' % (attrlist, cond))
+ self.skip = not cond
+ if not attrlist or name == 'ifeval':
+ if self.skip:
+ self.skipto = self.depth
+ self.skipname = target
+ self.depth = self.depth + 1
+ result = self.read()
+ if result:
+ # Expand executable block macros.
+ mo = self.macros.match('+', r'eval|sys|sys2', result)
+ if mo:
+ action = mo.group('name')
+ cmd = mo.group('attrlist')
+ result = system(action, cmd, is_macro=True)
+ self.cursor[2] = result # So we don't re-evaluate.
+ if result:
+ # Un=escape escaped system macros.
+ if self.macros.match('+', r'\\eval|\\sys|\\sys2|\\ifdef|\\ifndef|\\endif|\\include|\\include1', result): # noqa=E501
+ result = result[1:]
+ return result
+
+ def eof(self):
+ return self.read_next() is None
+
+ def read_next(self):
+ save_cursor = self.cursor
+ result = self.read()
+ if result is not None:
+ self.unread(self.cursor)
+ self.cursor = save_cursor
+ return result
+
+ def read_lines(self, count=1):
+ """Return tuple containing count lines."""
+ result = []
+ i = 0
+ while i < count and not self.eof():
+ result.append(self.read())
+ return tuple(result)
+
+ def read_ahead(self, count=1):
+ """Same as read_lines() but does not advance the file pointer."""
+ result = []
+ putback = []
+ save_cursor = self.cursor
+ try:
+ i = 0
+ while i < count and not self.eof():
+ result.append(self.read())
+ putback.append(self.cursor)
+ i = i + 1
+ while putback:
+ self.unread(putback.pop())
+ finally:
+ self.cursor = save_cursor
+ return tuple(result)
+
+ def skip_blank_lines(self):
+ self.read_until(r'\s*\S+')
+
+ def read_until(self, terminators, same_file=False):
+ """Like read() but reads lines up to (but not including) the first line
+ that matches the terminator regular expression, regular expression
+ object or list of regular expression objects. If same_file is True then
+ the terminating pattern must occur in the file the was being read when
+ the routine was called."""
+ if same_file:
+ fname = self.cursor[0]
+ result = []
+ if not isinstance(terminators, list):
+ if isinstance(terminators, str):
+ terminators = [re.compile(terminators)]
+ else:
+ terminators = [terminators]
+ while not self.eof():
+ save_cursor = self.cursor
+ s = self.read()
+ if not same_file or fname == self.cursor[0]:
+ for reo in terminators:
+ if reo.match(s):
+ self.unread(self.cursor)
+ self.cursor = save_cursor
+ return tuple(result)
+ result.append(s)
+ return tuple(result)
+
+
+class Writer:
+ """Writes lines to output file."""
+ def __init__(self, message: Message, trace: Trace, config: Config):
+ self.newline = DEFAULT_NEWLINE # End of line terminator.
+ self.f = None # Output file object.
+ self.fname = None # Output file name.
+ self.lines_out = 0 # Number of lines written.
+ self.skip_blank_lines = False # If True don't output blank lines.
+ self.message = message
+ self.trace = trace
+ self.config = config
+
+ def open(self, fname, bom=None):
+ """
+ bom is optional byte order mark.
+ http://en.wikipedia.org/wiki/Byte-order_mark
+ """
+ self.fname = fname
+ if fname == '<stdout>':
+ self.f = sys.stdout
+ else:
+ self.f = open(fname, 'w+', encoding='utf-8', newline="")
+ self.message.verbose('writing: ' + self.fname, False)
+ if bom:
+ self.f.write(bom)
+ self.lines_out = 0
+
+ def close(self):
+ if self.fname != '<stdout>':
+ self.f.close()
+
+ def write_line(self, line=None):
+ if not (self.skip_blank_lines and (not line or not line.strip())):
+ # Replace out any escaped attributes with non-escaped versions
+ self.f.write((re.sub(r'\\({[a-zA-Z0-9_][a-zA-Z0-9_\-]*)', '\\1', line) or '') + self.newline) # noqa=E501
+ self.lines_out = self.lines_out + 1
+
+ def write(self, *args, **kwargs):
+ """Iterates arguments, writes tuple and list arguments one line per
+ element, else writes argument as single line. If no arguments writes
+ blank line. If argument is None nothing is written. self.newline is
+ appended to each line."""
+ if 'trace' in kwargs and len(args) > 0:
+ self.trace(kwargs['trace'], args[0])
+ if len(args) == 0:
+ self.write_line()
+ self.lines_out = self.lines_out + 1
+ else:
+ for arg in args:
+ if utils.is_array(arg):
+ for s in arg:
+ self.write_line(s)
+ elif arg is not None:
+ self.write_line(arg)
+
+ def write_tag(self, tag, content, subs=None, d=None, **kwargs):
+ """Write content enveloped by tag.
+ Substitutions specified in the 'subs' list are perform on the
+ 'content'."""
+ if subs is None:
+ subs = self.config.subsnormal
+ stag, etag = subs_tag(tag, d)
+ content = Lex.subs(content, subs)
+ if 'trace' in kwargs:
+ self.trace(kwargs['trace'], [stag] + content + [etag])
+ if stag:
+ self.write(stag)
+ if content:
+ self.write(content)
+ if etag:
+ self.write(etag)