summaryrefslogtreecommitdiff
path: root/pylint/lint/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'pylint/lint/parallel.py')
-rw-r--r--pylint/lint/parallel.py55
1 files changed, 46 insertions, 9 deletions
diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py
index fa1e65d8f..fe9ce0605 100644
--- a/pylint/lint/parallel.py
+++ b/pylint/lint/parallel.py
@@ -67,28 +67,59 @@ def _worker_check_single_file(file_item):
_worker_linter.open()
_worker_linter.check_single_file(name, filepath, modname)
-
+ mapreduce_data = collections.defaultdict(list)
+ for checker in _worker_linter.get_checkers():
+ try:
+ data = checker.get_map_data()
+ except AttributeError:
+ continue
+ mapreduce_data[checker.name].append(data)
msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
_worker_linter.reporter.reset()
return (
+ id(multiprocessing.current_process()),
_worker_linter.current_name,
filepath,
_worker_linter.file_state.base_name,
msgs,
_worker_linter.stats,
_worker_linter.msg_status,
+ mapreduce_data,
)
+def _merge_mapreduce_data(linter, all_mapreduce_data):
+ """ Merges map/reduce data across workers, invoking relevant APIs on checkers """
+ # First collate the data, preparing it so we can send it to the checkers for
+ # validation. The intent here is to collect all the mapreduce data for all checker-
+ # runs across processes - that will then be passed to a static method on the
+ # checkers to be reduced and further processed.
+ collated_map_reduce_data = collections.defaultdict(list)
+ for linter_data in all_mapreduce_data.values():
+ for run_data in linter_data:
+ for checker_name, data in run_data.items():
+ collated_map_reduce_data[checker_name].extend(data)
+
+ # Send the data to checkers that support/require consolidated data
+ original_checkers = linter.get_checkers()
+ for checker in original_checkers:
+ if checker.name in collated_map_reduce_data:
+ # Assume that if the check has returned map/reduce data that it has the
+ # reducer function
+ checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
+
+
def check_parallel(linter, jobs, files, arguments=None):
- """Use the given linter to lint the files with given amount of workers (jobs)"""
- # The reporter does not need to be passed to worker processess, i.e. the reporter does
- # not need to be pickleable
+ """Use the given linter to lint the files with given amount of workers (jobs)
+ This splits the work filestream-by-filestream. If you need to do work across
+ multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
+ implement the map/reduce mixin functionality"""
+ # The reporter does not need to be passed to worker processes, i.e. the reporter does
original_reporter = linter.reporter
linter.reporter = None
# The linter is inherited by all the pool's workers, i.e. the linter
- # is identical to the linter object here. This is requred so that
+ # is identical to the linter object here. This is required so that
# a custom PyLinter object can be used.
initializer = functools.partial(_worker_initialize, arguments=arguments)
pool = multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter])
@@ -97,30 +128,36 @@ def check_parallel(linter, jobs, files, arguments=None):
# correct reporter
linter.set_reporter(original_reporter)
linter.open()
-
- all_stats = []
-
try:
+ all_stats = []
+ all_mapreduce_data = collections.defaultdict(list)
+
+ # Maps each file to be worked on by a single _worker_check_single_file() call,
+ # collecting any map/reduce data by checker module so that we can 'reduce' it
+ # later.
for (
+ worker_idx, # used to merge map/reduce data across workers
module,
file_path,
base_name,
messages,
stats,
msg_status,
+ mapreduce_data,
) in pool.imap_unordered(_worker_check_single_file, files):
linter.file_state.base_name = base_name
linter.set_current_module(module, file_path)
for msg in messages:
msg = Message(*msg)
linter.reporter.handle_message(msg)
-
all_stats.append(stats)
+ all_mapreduce_data[worker_idx].append(mapreduce_data)
linter.msg_status |= msg_status
finally:
pool.close()
pool.join()
+ _merge_mapreduce_data(linter, all_mapreduce_data)
linter.stats = _merge_stats(all_stats)
# Insert stats data to local checkers.