1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#!/usr/bin/env python3
# Copyright 2022 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import re
import sys
ANALYSIS_FAILURE_STATES = ['NULL', 'Unknown', '', None]
STACKFRAME_REGEX = r'^#\d+.*'
FILE_LOCATION_REGEX = r'.*\:\d+\:\d+$'
def fake_clusterfuzz_imports(modules):
for module in modules:
sys.modules[module] = __import__('clusterfuzz_fakes')
def pre_clusterfuzz_import():
local_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(local_path)
fake_clusterfuzz_imports([
'clusterfuzz._internal.platforms.android',
'clusterfuzz._internal.google_cloud_utils',
'clusterfuzz._internal.config.local_config',
])
class CustomStackParser:
def __init__(self):
pre_clusterfuzz_import()
from clusterfuzz.stacktraces import StackParser, MAX_CRASH_STATE_FRAMES
from clusterfuzz.stacktraces import llvm_test_one_input_override
from clusterfuzz.stacktraces import constants
self.MAX_CRASH_STATE_FRAMES = MAX_CRASH_STATE_FRAMES
self.stack_parser = StackParser()
self.constants = constants
self.llvm_test_one_input_override = llvm_test_one_input_override
def analyze_crash(self, stderr):
try:
stderr_analysis = self.stack_parser.parse(stderr)
self.custom_analyzer(stderr_analysis)
return {
"crash_state": stderr_analysis.crash_state.strip(),
"crash_type": stderr_analysis.crash_type.strip(),
}
except Exception as e:
logging.info(e)
return {
"crash_state": "Unknown",
"crash_type": "Unknown",
}
def _extract_function_name(self, frame):
split_frame = frame.split()
start_loc = 1
end_loc = -1
if len(split_frame) > 2 and split_frame[2] == 'in':
start_loc = 3
split_frame = split_frame[start_loc:]
for idx, item in enumerate(split_frame):
# exclude file name and everything after it
if re.match(FILE_LOCATION_REGEX, item):
end_loc = idx
break
if end_loc != -1:
return ' '.join(split_frame[:end_loc])
return None
def _fallback_crash_state(self, stacktrace):
status = []
for line in [l.strip() for l in stacktrace.splitlines()]:
if re.match(STACKFRAME_REGEX, line):
frame = self._extract_function_name(line)
if not self.stack_parser.ignore_stack_frame(frame):
status.append(frame)
if len(status) >= self.MAX_CRASH_STATE_FRAMES:
break
return '\n'.join(status)
def custom_analyzer(self, crash_info):
if crash_info.crash_state in ANALYSIS_FAILURE_STATES:
fallback_state = self._fallback_crash_state(crash_info.crash_stacktrace)
crash_info.crash_state = fallback_state or crash_info.crash_state
class EmptyStackParser:
def analyze_crash(self, stderr):
return {}
def create_stack_parser():
try:
return CustomStackParser()
except ImportError as e:
logging.info(e)
return EmptyStackParser()
|