1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
import unittest
import buildscripts.resmokelib.logging.handlers as under_test
class TestExceptionExtractor(unittest.TestCase):
def get_exception_extractor(self, truncate=under_test.Truncate.FIRST):
return under_test.ExceptionExtractor("START", "END", truncate)
def test_successful_extraction(self):
logs = [
"not captured",
"START",
"captured",
"END",
"not captured",
]
expected_exception = [
"START",
"captured",
"END",
]
exception_extractor = self.get_exception_extractor()
for log in logs:
exception_extractor.process_log_line(log)
assert exception_extractor.exception_detected is True
assert exception_extractor.get_exception() == expected_exception
def test_partial_extraction(self):
logs = [
"not captured",
"START",
"captured",
]
expected_current_exception = [
"START",
"captured",
]
exception_extractor = self.get_exception_extractor()
for log in logs:
exception_extractor.process_log_line(log)
assert exception_extractor.active is True
assert exception_extractor.exception_detected is False
assert list(exception_extractor.current_exception) == expected_current_exception
assert exception_extractor.get_exception() == []
def test_no_extraction(self):
logs = [
"not captured",
"not captured",
]
expected_current_exception = []
exception_extractor = self.get_exception_extractor()
for log in logs:
exception_extractor.process_log_line(log)
assert exception_extractor.active is False
assert exception_extractor.exception_detected is False
assert list(exception_extractor.current_exception) == expected_current_exception
assert exception_extractor.get_exception() == []
def test_successful_extraction_truncate_first(self):
logs = ["START"] + ["not captured"
] + ["captured"] * (under_test.MAX_EXCEPTION_LENGTH - 1) + ["END"]
expected_exception = ["[LAST Part of Exception]"
] + ["captured"] * (under_test.MAX_EXCEPTION_LENGTH - 1) + ["END"]
exception_extractor = self.get_exception_extractor()
for log in logs:
exception_extractor.process_log_line(log)
assert exception_extractor.exception_detected is True
assert exception_extractor.get_exception() == expected_exception
def test_successful_extraction_truncate_last(self):
logs = ["START"] + ["captured"] * (under_test.MAX_EXCEPTION_LENGTH - 1) + ["not captured"
] + ["END"]
expected_exception = ["[FIRST Part of Exception]"
] + ["START"] + ["captured"] * (under_test.MAX_EXCEPTION_LENGTH - 1)
exception_extractor = self.get_exception_extractor(under_test.Truncate.LAST)
for log in logs:
exception_extractor.process_log_line(log)
assert exception_extractor.exception_detected is True
assert exception_extractor.get_exception() == expected_exception
|