diff options
Diffstat (limited to 'src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/content.py')
-rw-r--r-- | src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/content.py | 385 |
1 files changed, 385 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/content.py b/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/content.py new file mode 100644 index 00000000000..09f44844524 --- /dev/null +++ b/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/content.py @@ -0,0 +1,385 @@ +# Copyright (c) 2009-2012 testtools developers. See LICENSE for details. + +"""Content - a MIME-like Content object.""" + +__all__ = [ + 'attach_file', + 'Content', + 'content_from_file', + 'content_from_stream', + 'json_content', + 'text_content', + 'TracebackContent', + ] + +import codecs +import inspect +import json +import os +import sys +import traceback + +from extras import try_import + +from testtools.compat import ( + _b, + _format_exception_only, + _format_stack_list, + _TB_HEADER, + _u, + str_is_unicode, +) +from testtools.content_type import ContentType, JSON, UTF8_TEXT + + +functools = try_import('functools') + +_join_b = _b("").join + + +DEFAULT_CHUNK_SIZE = 4096 + +STDOUT_LINE = '\nStdout:\n%s' +STDERR_LINE = '\nStderr:\n%s' + + +def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0): + """Read 'stream' in chunks of 'chunk_size'. + + :param stream: A file-like object to read from. + :param chunk_size: The size of each read from 'stream'. + :param seek_offset: If non-None, seek before iterating. + :param seek_whence: Pass through to the seek call, if seeking. + """ + if seek_offset is not None: + stream.seek(seek_offset, seek_whence) + chunk = stream.read(chunk_size) + while chunk: + yield chunk + chunk = stream.read(chunk_size) + + +class Content(object): + """A MIME-like Content object. + + Content objects can be serialised to bytes using the iter_bytes method. + If the Content-Type is recognised by other code, they are welcome to + look for richer contents that mere byte serialisation - for example in + memory object graphs etc. However, such code MUST be prepared to receive + a generic Content object that has been reconstructed from a byte stream. + + :ivar content_type: The content type of this Content. + """ + + def __init__(self, content_type, get_bytes): + """Create a ContentType.""" + if None in (content_type, get_bytes): + raise ValueError("None not permitted in %r, %r" % ( + content_type, get_bytes)) + self.content_type = content_type + self._get_bytes = get_bytes + + def __eq__(self, other): + return (self.content_type == other.content_type and + _join_b(self.iter_bytes()) == _join_b(other.iter_bytes())) + + def as_text(self): + """Return all of the content as text. + + This is only valid where ``iter_text`` is. It will load all of the + content into memory. Where this is a concern, use ``iter_text`` + instead. + """ + return _u('').join(self.iter_text()) + + def iter_bytes(self): + """Iterate over bytestrings of the serialised content.""" + return self._get_bytes() + + def iter_text(self): + """Iterate over the text of the serialised content. + + This is only valid for text MIME types, and will use ISO-8859-1 if + no charset parameter is present in the MIME type. (This is somewhat + arbitrary, but consistent with RFC2617 3.7.1). + + :raises ValueError: If the content type is not text/\*. + """ + if self.content_type.type != "text": + raise ValueError("Not a text type %r" % self.content_type) + return self._iter_text() + + def _iter_text(self): + """Worker for iter_text - does the decoding.""" + encoding = self.content_type.parameters.get('charset', 'ISO-8859-1') + try: + # 2.5+ + decoder = codecs.getincrementaldecoder(encoding)() + for bytes in self.iter_bytes(): + yield decoder.decode(bytes) + final = decoder.decode(_b(''), True) + if final: + yield final + except AttributeError: + # < 2.5 + bytes = ''.join(self.iter_bytes()) + yield bytes.decode(encoding) + + def __repr__(self): + return "<Content type=%r, value=%r>" % ( + self.content_type, _join_b(self.iter_bytes())) + + +class StackLinesContent(Content): + """Content object for stack lines. + + This adapts a list of "preprocessed" stack lines into a content object. + The stack lines are most likely produced from ``traceback.extract_stack`` + or ``traceback.extract_tb``. + + text/x-traceback;language=python is used for the mime type, in order to + provide room for other languages to format their tracebacks differently. + """ + + # Whether or not to hide layers of the stack trace that are + # unittest/testtools internal code. Defaults to True since the + # system-under-test is rarely unittest or testtools. + HIDE_INTERNAL_STACK = True + + def __init__(self, stack_lines, prefix_content="", postfix_content=""): + """Create a StackLinesContent for ``stack_lines``. + + :param stack_lines: A list of preprocessed stack lines, probably + obtained by calling ``traceback.extract_stack`` or + ``traceback.extract_tb``. + :param prefix_content: If specified, a unicode string to prepend to the + text content. + :param postfix_content: If specified, a unicode string to append to the + text content. + """ + content_type = ContentType('text', 'x-traceback', + {"language": "python", "charset": "utf8"}) + value = prefix_content + \ + self._stack_lines_to_unicode(stack_lines) + \ + postfix_content + super(StackLinesContent, self).__init__( + content_type, lambda: [value.encode("utf8")]) + + def _stack_lines_to_unicode(self, stack_lines): + """Converts a list of pre-processed stack lines into a unicode string. + """ + + # testtools customization. When str is unicode (e.g. IronPython, + # Python 3), traceback.format_exception returns unicode. For Python 2, + # it returns bytes. We need to guarantee unicode. + if str_is_unicode: + format_stack_lines = traceback.format_list + else: + format_stack_lines = _format_stack_list + + msg_lines = format_stack_lines(stack_lines) + + return ''.join(msg_lines) + + +def TracebackContent(err, test): + """Content object for tracebacks. + + This adapts an exc_info tuple to the Content interface. + text/x-traceback;language=python is used for the mime type, in order to + provide room for other languages to format their tracebacks differently. + """ + if err is None: + raise ValueError("err may not be None") + + exctype, value, tb = err + # Skip test runner traceback levels + if StackLinesContent.HIDE_INTERNAL_STACK: + while tb and '__unittest' in tb.tb_frame.f_globals: + tb = tb.tb_next + + # testtools customization. When str is unicode (e.g. IronPython, + # Python 3), traceback.format_exception_only returns unicode. For Python 2, + # it returns bytes. We need to guarantee unicode. + if str_is_unicode: + format_exception_only = traceback.format_exception_only + else: + format_exception_only = _format_exception_only + + limit = None + # Disabled due to https://bugs.launchpad.net/testtools/+bug/1188420 + if (False + and StackLinesContent.HIDE_INTERNAL_STACK + and test.failureException + and isinstance(value, test.failureException)): + # Skip assert*() traceback levels + limit = 0 + while tb and not self._is_relevant_tb_level(tb): + limit += 1 + tb = tb.tb_next + + prefix = _TB_HEADER + stack_lines = traceback.extract_tb(tb, limit) + postfix = ''.join(format_exception_only(exctype, value)) + + return StackLinesContent(stack_lines, prefix, postfix) + + +def StacktraceContent(prefix_content="", postfix_content=""): + """Content object for stack traces. + + This function will create and return a content object that contains a + stack trace. + + The mime type is set to 'text/x-traceback;language=python', so other + languages can format their stack traces differently. + + :param prefix_content: A unicode string to add before the stack lines. + :param postfix_content: A unicode string to add after the stack lines. + """ + stack = inspect.stack()[1:] + + if StackLinesContent.HIDE_INTERNAL_STACK: + limit = 1 + while limit < len(stack) and '__unittest' not in stack[limit][0].f_globals: + limit += 1 + else: + limit = -1 + + frames_only = [line[0] for line in stack[:limit]] + processed_stack = [ ] + for frame in reversed(frames_only): + filename, line, function, context, _ = inspect.getframeinfo(frame) + context = ''.join(context) + processed_stack.append((filename, line, function, context)) + return StackLinesContent(processed_stack, prefix_content, postfix_content) + + +def json_content(json_data): + """Create a JSON `Content` object from JSON-encodeable data.""" + data = json.dumps(json_data) + if str_is_unicode: + # The json module perversely returns native str not bytes + data = data.encode('utf8') + return Content(JSON, lambda: [data]) + + +def text_content(text): + """Create a `Content` object from some text. + + This is useful for adding details which are short strings. + """ + return Content(UTF8_TEXT, lambda: [text.encode('utf8')]) + + +def maybe_wrap(wrapper, func): + """Merge metadata for func into wrapper if functools is present.""" + if functools is not None: + wrapper = functools.update_wrapper(wrapper, func) + return wrapper + + +def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE, + buffer_now=False, seek_offset=None, seek_whence=0): + """Create a `Content` object from a file on disk. + + Note that unless 'read_now' is explicitly passed in as True, the file + will only be read from when ``iter_bytes`` is called. + + :param path: The path to the file to be used as content. + :param content_type: The type of content. If not specified, defaults + to UTF8-encoded text/plain. + :param chunk_size: The size of chunks to read from the file. + Defaults to ``DEFAULT_CHUNK_SIZE``. + :param buffer_now: If True, read the file from disk now and keep it in + memory. Otherwise, only read when the content is serialized. + :param seek_offset: If non-None, seek within the stream before reading it. + :param seek_whence: If supplied, pass to stream.seek() when seeking. + """ + if content_type is None: + content_type = UTF8_TEXT + def reader(): + # This should be try:finally:, but python2.4 makes that hard. When + # We drop older python support we can make this use a context manager + # for maximum simplicity. + stream = open(path, 'rb') + for chunk in _iter_chunks(stream, chunk_size, seek_offset, seek_whence): + yield chunk + stream.close() + return content_from_reader(reader, content_type, buffer_now) + + +def content_from_stream(stream, content_type=None, + chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False, + seek_offset=None, seek_whence=0): + """Create a `Content` object from a file-like stream. + + Note that the stream will only be read from when ``iter_bytes`` is + called. + + :param stream: A file-like object to read the content from. The stream + is not closed by this function or the content object it returns. + :param content_type: The type of content. If not specified, defaults + to UTF8-encoded text/plain. + :param chunk_size: The size of chunks to read from the file. + Defaults to ``DEFAULT_CHUNK_SIZE``. + :param buffer_now: If True, reads from the stream right now. Otherwise, + only reads when the content is serialized. Defaults to False. + :param seek_offset: If non-None, seek within the stream before reading it. + :param seek_whence: If supplied, pass to stream.seek() when seeking. + """ + if content_type is None: + content_type = UTF8_TEXT + reader = lambda: _iter_chunks(stream, chunk_size, seek_offset, seek_whence) + return content_from_reader(reader, content_type, buffer_now) + + +def content_from_reader(reader, content_type, buffer_now): + """Create a Content object that will obtain the content from reader. + + :param reader: A callback to read the content. Should return an iterable of + bytestrings. + :param content_type: The content type to create. + :param buffer_now: If True the reader is evaluated immediately and + buffered. + """ + if content_type is None: + content_type = UTF8_TEXT + if buffer_now: + contents = list(reader()) + reader = lambda: contents + return Content(content_type, reader) + + +def attach_file(detailed, path, name=None, content_type=None, + chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=True): + """Attach a file to this test as a detail. + + This is a convenience method wrapping around ``addDetail``. + + Note that unless 'read_now' is explicitly passed in as True, the file + *must* exist when the test result is called with the results of this + test, after the test has been torn down. + + :param detailed: An object with details + :param path: The path to the file to attach. + :param name: The name to give to the detail for the attached file. + :param content_type: The content type of the file. If not provided, + defaults to UTF8-encoded text/plain. + :param chunk_size: The size of chunks to read from the file. Defaults + to something sensible. + :param buffer_now: If False the file content is read when the content + object is evaluated rather than when attach_file is called. + Note that this may be after any cleanups that obj_with_details has, so + if the file is a temporary file disabling buffer_now may cause the file + to be read after it is deleted. To handle those cases, using + attach_file as a cleanup is recommended because it guarantees a + sequence for when the attach_file call is made:: + + detailed.addCleanup(attach_file, 'foo.txt', detailed) + """ + if name is None: + name = os.path.basename(path) + content_object = content_from_file( + path, content_type, chunk_size, buffer_now) + detailed.addDetail(name, content_object) |