summaryrefslogtreecommitdiff
path: root/_test/lib/canonical.py
diff options
context:
space:
mode:
Diffstat (limited to '_test/lib/canonical.py')
-rw-r--r--_test/lib/canonical.py60
1 files changed, 29 insertions, 31 deletions
diff --git a/_test/lib/canonical.py b/_test/lib/canonical.py
index 64f3153..af2c3cf 100644
--- a/_test/lib/canonical.py
+++ b/_test/lib/canonical.py
@@ -11,7 +11,6 @@ class CanonicalError(ruamel.yaml.YAMLError):
class CanonicalScanner:
-
def __init__(self, data):
try:
if PY3:
@@ -20,7 +19,7 @@ class CanonicalScanner:
else:
data = unicode(data, 'utf-8') # NOQA
except UnicodeDecodeError:
- raise CanonicalError("utf-8 stream is expected")
+ raise CanonicalError('utf-8 stream is expected')
self.data = data + u'\0'
self.index = 0
self.tokens = []
@@ -48,7 +47,7 @@ class CanonicalScanner:
self.scan()
token = self.tokens.pop(0)
if choice and not isinstance(token, choice):
- raise CanonicalError("unexpected token " + repr(token))
+ raise CanonicalError('unexpected token ' + repr(token))
return token
def get_token_value(self):
@@ -65,7 +64,7 @@ class CanonicalScanner:
break
elif ch == u'%':
self.tokens.append(self.scan_directive())
- elif ch == u'-' and self.data[self.index:self.index + 3] == u'---':
+ elif ch == u'-' and self.data[self.index : self.index + 3] == u'---':
self.index += 3
self.tokens.append(ruamel.yaml.DocumentStartToken(None, None))
elif ch == u'[':
@@ -96,18 +95,20 @@ class CanonicalScanner:
elif ch == u'"':
self.tokens.append(self.scan_scalar())
else:
- raise CanonicalError("invalid token")
+ raise CanonicalError('invalid token')
self.scanned = True
DIRECTIVE = u'%YAML 1.1'
def scan_directive(self):
- if self.data[self.index:self.index + len(self.DIRECTIVE)] == self.DIRECTIVE and \
- self.data[self.index + len(self.DIRECTIVE)] in u' \n\0':
+ if (
+ self.data[self.index : self.index + len(self.DIRECTIVE)] == self.DIRECTIVE
+ and self.data[self.index + len(self.DIRECTIVE)] in u' \n\0'
+ ):
self.index += len(self.DIRECTIVE)
return ruamel.yaml.DirectiveToken('YAML', (1, 1), None, None)
else:
- raise CanonicalError("invalid directive")
+ raise CanonicalError('invalid directive')
def scan_alias(self):
if self.data[self.index] == u'*':
@@ -118,7 +119,7 @@ class CanonicalScanner:
start = self.index
while self.data[self.index] not in u', \n\0':
self.index += 1
- value = self.data[start:self.index]
+ value = self.data[start : self.index]
return TokenClass(value, None, None)
def scan_tag(self):
@@ -126,7 +127,7 @@ class CanonicalScanner:
start = self.index
while self.data[self.index] not in u' \n\0':
self.index += 1
- value = self.data[start:self.index]
+ value = self.data[start : self.index]
if not value:
value = u'!'
elif value[0] == u'!':
@@ -137,15 +138,11 @@ class CanonicalScanner:
value = u'!' + value
return ruamel.yaml.TagToken(value, None, None)
- QUOTE_CODES = {
- 'x': 2,
- 'u': 4,
- 'U': 8,
- }
+ QUOTE_CODES = {'x': 2, 'u': 4, 'U': 8}
QUOTE_REPLACES = {
u'\\': u'\\',
- u'\"': u'\"',
+ u'"': u'"',
u' ': u' ',
u'a': u'\x07',
u'b': u'\x08',
@@ -160,7 +157,6 @@ class CanonicalScanner:
u'P': u'\u2029',
u'_': u'_',
u'0': u'\x00',
-
}
def scan_scalar(self):
@@ -171,7 +167,7 @@ class CanonicalScanner:
while self.data[self.index] != u'"':
if self.data[self.index] == u'\\':
ignore_spaces = False
- chunks.append(self.data[start:self.index])
+ chunks.append(self.data[start : self.index])
self.index += 1
ch = self.data[self.index]
self.index += 1
@@ -179,16 +175,16 @@ class CanonicalScanner:
ignore_spaces = True
elif ch in self.QUOTE_CODES:
length = self.QUOTE_CODES[ch]
- code = int(self.data[self.index:self.index + length], 16)
+ code = int(self.data[self.index : self.index + length], 16)
chunks.append(unichr(code))
self.index += length
else:
if ch not in self.QUOTE_REPLACES:
- raise CanonicalError("invalid escape code")
+ raise CanonicalError('invalid escape code')
chunks.append(self.QUOTE_REPLACES[ch])
start = self.index
elif self.data[self.index] == u'\n':
- chunks.append(self.data[start:self.index])
+ chunks.append(self.data[start : self.index])
chunks.append(u' ')
self.index += 1
start = self.index
@@ -199,9 +195,9 @@ class CanonicalScanner:
else:
ignore_spaces = False
self.index += 1
- chunks.append(self.data[start:self.index])
+ chunks.append(self.data[start : self.index])
self.index += 1
- return ruamel.yaml.ScalarToken(u''.join(chunks), False, None, None)
+ return ruamel.yaml.ScalarToken("".join(chunks), False, None, None)
def find_token(self):
found = False
@@ -233,7 +229,7 @@ class CanonicalParser:
if self.check_token(ruamel.yaml.DirectiveToken, ruamel.yaml.DocumentStartToken):
self.parse_document()
else:
- raise CanonicalError("document is expected, got " + repr(self.tokens[0]))
+ raise CanonicalError('document is expected, got ' + repr(self.tokens[0]))
self.get_token(ruamel.yaml.StreamEndToken)
self.events.append(ruamel.yaml.StreamEndEvent(None, None))
@@ -259,8 +255,11 @@ class CanonicalParser:
if self.check_token(ruamel.yaml.TagToken):
tag = self.get_token_value()
if self.check_token(ruamel.yaml.ScalarToken):
- self.events.append(ruamel.yaml.ScalarEvent(anchor, tag, (False, False),
- self.get_token_value(), None, None))
+ self.events.append(
+ ruamel.yaml.ScalarEvent(
+ anchor, tag, (False, False), self.get_token_value(), None, None
+ )
+ )
elif self.check_token(ruamel.yaml.FlowSequenceStartToken):
self.events.append(ruamel.yaml.SequenceStartEvent(anchor, tag, None, None))
self.parse_sequence()
@@ -268,8 +267,9 @@ class CanonicalParser:
self.events.append(ruamel.yaml.MappingStartEvent(anchor, tag, None, None))
self.parse_mapping()
else:
- raise CanonicalError("SCALAR, '[', or '{' is expected, got " +
- repr(self.tokens[0]))
+ raise CanonicalError(
+ "SCALAR, '[', or '{' is expected, got " + repr(self.tokens[0])
+ )
# sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
def parse_sequence(self):
@@ -328,9 +328,7 @@ class CanonicalParser:
return self.events[0]
-class CanonicalLoader(CanonicalScanner, CanonicalParser,
- Composer, Constructor, Resolver):
-
+class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
def __init__(self, stream):
if hasattr(stream, 'read'):
stream = stream.read()