diff options
Diffstat (limited to 'lib/yaml/stream.py')
-rw-r--r-- | lib/yaml/stream.py | 226 |
1 files changed, 212 insertions, 14 deletions
diff --git a/lib/yaml/stream.py b/lib/yaml/stream.py index 47f72a2..644d815 100644 --- a/lib/yaml/stream.py +++ b/lib/yaml/stream.py @@ -1,31 +1,229 @@ +# This module contains abstractions for the input stream. You don't have to +# looks further, there are no pretty code. +# +# We define two classes here. +# +# Marker(source, line, column) +# It's just a record and its only use is producing nice error messages. +# Parser does not use it for any other purposes. +# +# Stream(source, data) +# Stream determines the encoding of `data` and converts it to unicode. +# Stream provides the following methods and attributes: +# stream.peek(length=1) - return the next `length` characters +# stream.forward(length=1) - move the current position to `length` characters. +# stream.index - the number of the current character. +# stream.line, stream.column - the line and the column of the current character. -from marker import Marker + +from error import YAMLError + +import codecs, re + +# Unfortunately, codec functions in Python 2.3 does not support the `finish` +# arguments, so we have to write our own wrappers. + +try: + codecs.utf_8_decode('', 'strict', False) + from codecs import utf_8_decode, utf_16_le_decode, utf_16_be_decode + +except TypeError: + + def utf_16_le_decode(data, errors, finish=False): + if not finish and len(data) % 2 == 1: + data = data[:-1] + return codecs.utf_16_le_decode(data, errors) + + def utf_16_be_decode(data, errors, finish=False): + if not finish and len(data) % 2 == 1: + data = data[:-1] + return codecs.utf_16_be_decode(data, errors) + + def utf_8_decode(data, errors, finish=False): + if not finish: + # We are trying to remove a possible incomplete multibyte character + # from the suffix of the data. + # The first byte of a multi-byte sequence is in the range 0xc0 to 0xfd. + # All further bytes are in the range 0x80 to 0xbf. + # UTF-8 encoded UCS characters may be up to six bytes long. + count = 0 + while count < 5 and count < len(data) \ + and '\x80' <= data[-count-1] <= '\xBF': + count -= 1 + if count < 5 and count < len(data) \ + and '\xC0' <= data[-count-1] <= '\xFD': + data = data[:-count-1] + return codecs.utf_8_decode(data, errors) + +class Marker: + + def __init__(self, source, line, column, buffer, pointer): + self.source = source + self.line = line + self.column = column + self.buffer = buffer + self.pointer = pointer + + def get_snippet(self, max_length=79): + if self.buffer is None: + return None + head = '' + start = self.pointer + while start > 0 and self.buffer[start-1] not in u'\0\r\n\x85\u2028\u2029': + start -= 1 + if self.pointer-start > max_length/2-1: + head = ' ... ' + start += 5 + break + tail = '' + end = self.pointer + while end < len(self.buffer) and self.buffer[end] not in u'\0\r\n\x85\u2028\u2029': + end += 1 + if end-self.pointer > max_length/2-1: + tail = ' ... ' + end -= 5 + break + snippet = self.buffer[start:end].encode('utf-8') + return head + snippet + tail + '\n' \ + + ' '*(self.pointer-start+len(head)) + '^' + '\n' + +class StreamError(YAMLError): + + def __init__(self, source, encoding, character, position, reason): + self.source = source + self.encoding = encoding + self.character = character + self.position = position + self.reason = reason + + def __str__(self): + if isinstance(self.character, str): + return "'%s' codec can't decode byte #x%02x: %s\n" \ + "\tin file '%s', position %d." \ + % (self.encoding, ord(self.character), self.reason, + self.source, self.position) + else: + return "unacceptable character #x%04x: %s\n" \ + "\tin file '%s', position %d." \ + % (ord(self.character), self.reason, + self.source, self.position) class Stream: + # Stream: + # - determines the data encoding and converts it to unicode, + # - checks if characters are in allowed range, + # - adds '\0' to the end. + + # Yeah, it's ugly and slow. def __init__(self, source, data): self.source = source - self.data = unicode(data, 'utf-8')+u'\0' + self.stream = None + self.stream_pointer = 0 + self.eof = True + self.buffer = u'' + self.pointer = 0 + self.raw_buffer = None + self.raw_decoder = None self.index = 0 self.line = 0 self.column = 0 + if isinstance(data, unicode): + self.check_printable(data) + self.buffer = data+u'\0' + elif isinstance(data, str): + self.raw_buffer = data + self.determine_encoding() + else: + self.stream = data + self.eof = False + self.raw_buffer = '' + self.determine_encoding() - def peek(self, k=1): - return self.data[self.index:self.index+k] + def peek(self, length=1): + if self.pointer+length >= len(self.buffer): + self.update(length) + return self.buffer[self.pointer:self.pointer+length] - def read(self, k=1): - value = self.data[self.index:self.index+k] - for i in range(k): - if self.index >= len(self.data): - break - if self.data[self.index] in u'\r\n\x85\u2028\u2029': + def forward(self, length=1): + if self.pointer+length+1 >= len(self.buffer): + self.update(length+1) + for k in range(length): + ch = self.buffer[self.pointer] + self.pointer += 1 + self.index += 1 + if ch in u'\n\x85\u2028\u2029' \ + or (ch == u'\r' and self.buffer[self.pointer+1] != u'\n'): self.line += 1 self.column = 0 - else: + elif ch != u'\uFEFF': self.column += 1 - self.index += 1 - return value def get_marker(self): - return Marker(self.source, self.data, self.index, self.line, self.column) + if self.stream is None: + return Marker(self.source, self.line, self.column, + self.buffer, self.pointer) + else: + return Marker(self.source, self.line, self.column, None, None) + + def determine_encoding(self): + while not self.eof and len(self.raw_buffer) < 2: + self.update_raw() + if self.raw_buffer.startswith(codecs.BOM_UTF16_LE): + self.raw_decode = utf_16_le_decode + elif self.raw_buffer.startswith(codecs.BOM_UTF16_BE): + self.raw_decode = utf_16_be_decode + else: + self.raw_decode = utf_8_decode + self.update(1) + + NON_PRINTABLE = re.compile(u'[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD]') + def check_printable(self, data): + match = self.NON_PRINTABLE.search(data) + if match: + character = match.group() + position = self.index+(len(self.buffer)-self.pointer)+match.start() + raise StreamError(self.source, 'unicode', character, position, + "control characters are not allowed") + + def update(self, length): + if self.raw_buffer is None: + return + self.buffer = self.buffer[self.pointer:] + self.pointer = 0 + while len(self.buffer) < length: + if not self.eof: + self.update_raw() + try: + data, converted = self.raw_decode(self.raw_buffer, + 'strict', self.eof) + except UnicodeDecodeError, exc: + character = exc.object[exc.start] + if self.stream is not None: + position = self.stream_pointer-len(self.raw_buffer)+exc.start + else: + position = exc.start + raise StreamError(self.source, exc.encoding, + character, position, exc.reason) + self.check_printable(data) + self.buffer += data + self.raw_buffer = self.raw_buffer[converted:] + if self.eof: + self.buffer += u'\0' + self.raw_buffer = None + break + + def update_raw(self, size=1024): + data = self.stream.read(size) + if data: + self.raw_buffer += data + self.stream_pointer += len(data) + else: + self.eof = True + +#try: +# import psyco +# psyco.bind(Stream) +#except ImportError: +# pass |