summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrank Harrison <frank@doublethefish.com>2020-03-26 11:41:22 +0000
committerPierre Sassoulas <pierre.sassoulas@gmail.com>2021-01-02 09:56:39 +0100
commitb41e8d940dbd0a92d2805a99eb0f97c01f620197 (patch)
treeb0b0ca5d36531b469e4573c17c79b3ab0bd4c37c
parent579b58d3583fb0efac58aaa8e4d63f6dcb05b0bb (diff)
downloadpylint-git-b41e8d940dbd0a92d2805a99eb0f97c01f620197.tar.gz
mapreduce| Fixes -jN for map/reduce Checkers (e.g. SimilarChecker)
This integrate the map/reduce functionality into lint.check_process(). We previously had `map` being invoked, here we add `reduce` support. We do this by collecting the map-data by worker and then passing it to a reducer function on the Checker object, if available - determined by whether they confirm to the `mapreduce_checker.MapReduceMixin` mixin interface or nor. This allows Checker objects to function across file-streams when using multiprocessing/-j2+. For example SimilarChecker needs to be able to compare data across all files. The tests, that we also add here, check that a Checker instance returns and reports expected data and errors, such as error-messages and stats - at least in a exit-ok (0) situation. On a personal note, as we are copying more data across process boundaries, I suspect that the memory implications of this might cause issues for large projects already running with -jN and duplicate code detection on. That said, given that it takes a long time to perform lints of large code bases that is an issue for the [near?] future and likely to be part of the performance work. Either way but let's get it working first and deal with memory and perforamnce considerations later - I say this as there are many quick wins we can make here, e.g. file-batching, hashing lines, data compression and so on.
-rw-r--r--pylint/checkers/__init__.py10
-rw-r--r--pylint/checkers/mapreduce_checker.py18
-rw-r--r--pylint/checkers/similar.py4
-rw-r--r--pylint/lint/parallel.py55
-rw-r--r--tests/test_check_parallel.py27
-rw-r--r--tests/test_self.py12
6 files changed, 105 insertions, 21 deletions
diff --git a/pylint/checkers/__init__.py b/pylint/checkers/__init__.py
index bfde22b3d..31d2df522 100644
--- a/pylint/checkers/__init__.py
+++ b/pylint/checkers/__init__.py
@@ -10,6 +10,7 @@
# Copyright (c) 2018-2019 Pierre Sassoulas <pierre.sassoulas@gmail.com>
# Copyright (c) 2018 ssolanki <sushobhitsolanki@gmail.com>
# Copyright (c) 2019 Bruno P. Kinoshita <kinow@users.noreply.github.com>
+# Copyright (c) 2020 Frank Harrison <doublethefish@gmail.com>
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
@@ -43,6 +44,7 @@ messages nor reports. XXX not true, emit a 07 report !
"""
from pylint.checkers.base_checker import BaseChecker, BaseTokenChecker
+from pylint.checkers.mapreduce_checker import MapReduceMixin
from pylint.utils import register_plugins
@@ -64,4 +66,10 @@ def initialize(linter):
register_plugins(linter, __path__[0])
-__all__ = ["BaseChecker", "BaseTokenChecker", "initialize", "register_plugins"]
+__all__ = [
+ "BaseChecker",
+ "BaseTokenChecker",
+ "initialize",
+ "MapReduceMixin",
+ "register_plugins",
+]
diff --git a/pylint/checkers/mapreduce_checker.py b/pylint/checkers/mapreduce_checker.py
new file mode 100644
index 000000000..17a1090bf
--- /dev/null
+++ b/pylint/checkers/mapreduce_checker.py
@@ -0,0 +1,18 @@
+# Copyright (c) 2020 Frank Harrison <doublethefish@gmail.com>
+
+# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
+# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
+import abc
+
+
+class MapReduceMixin(metaclass=abc.ABCMeta):
+ """ A mixin design to allow multiprocess/threaded runs of a Checker """
+
+ @abc.abstractmethod
+ def get_map_data(self):
+ """ Returns mergable/reducible data that will be examined """
+
+ @classmethod
+ @abc.abstractmethod
+ def reduce_map_data(cls, linter, data):
+ """ For a given Checker, receives data for all mapped runs """
diff --git a/pylint/checkers/similar.py b/pylint/checkers/similar.py
index 3ac071bb3..1f817ada2 100644
--- a/pylint/checkers/similar.py
+++ b/pylint/checkers/similar.py
@@ -31,7 +31,7 @@ from itertools import groupby
import astroid
-from pylint.checkers import BaseChecker, table_lines_from_stats
+from pylint.checkers import BaseChecker, MapReduceMixin, table_lines_from_stats
from pylint.interfaces import IRawChecker
from pylint.reporters.ureports.nodes import Table
from pylint.utils import decoding_stream
@@ -302,7 +302,7 @@ def report_similarities(sect, stats, old_stats):
# wrapper to get a pylint checker from the similar class
-class SimilarChecker(BaseChecker, Similar):
+class SimilarChecker(BaseChecker, Similar, MapReduceMixin):
"""checks for similarities and duplicated code. This computation may be
memory / CPU intensive, so you should disable it if you experiment some
problems.
diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py
index fa1e65d8f..fe9ce0605 100644
--- a/pylint/lint/parallel.py
+++ b/pylint/lint/parallel.py
@@ -67,28 +67,59 @@ def _worker_check_single_file(file_item):
_worker_linter.open()
_worker_linter.check_single_file(name, filepath, modname)
-
+ mapreduce_data = collections.defaultdict(list)
+ for checker in _worker_linter.get_checkers():
+ try:
+ data = checker.get_map_data()
+ except AttributeError:
+ continue
+ mapreduce_data[checker.name].append(data)
msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
_worker_linter.reporter.reset()
return (
+ id(multiprocessing.current_process()),
_worker_linter.current_name,
filepath,
_worker_linter.file_state.base_name,
msgs,
_worker_linter.stats,
_worker_linter.msg_status,
+ mapreduce_data,
)
+def _merge_mapreduce_data(linter, all_mapreduce_data):
+ """ Merges map/reduce data across workers, invoking relevant APIs on checkers """
+ # First collate the data, preparing it so we can send it to the checkers for
+ # validation. The intent here is to collect all the mapreduce data for all checker-
+ # runs across processes - that will then be passed to a static method on the
+ # checkers to be reduced and further processed.
+ collated_map_reduce_data = collections.defaultdict(list)
+ for linter_data in all_mapreduce_data.values():
+ for run_data in linter_data:
+ for checker_name, data in run_data.items():
+ collated_map_reduce_data[checker_name].extend(data)
+
+ # Send the data to checkers that support/require consolidated data
+ original_checkers = linter.get_checkers()
+ for checker in original_checkers:
+ if checker.name in collated_map_reduce_data:
+ # Assume that if the check has returned map/reduce data that it has the
+ # reducer function
+ checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
+
+
def check_parallel(linter, jobs, files, arguments=None):
- """Use the given linter to lint the files with given amount of workers (jobs)"""
- # The reporter does not need to be passed to worker processess, i.e. the reporter does
- # not need to be pickleable
+ """Use the given linter to lint the files with given amount of workers (jobs)
+ This splits the work filestream-by-filestream. If you need to do work across
+ multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
+ implement the map/reduce mixin functionality"""
+ # The reporter does not need to be passed to worker processes, i.e. the reporter does
original_reporter = linter.reporter
linter.reporter = None
# The linter is inherited by all the pool's workers, i.e. the linter
- # is identical to the linter object here. This is requred so that
+ # is identical to the linter object here. This is required so that
# a custom PyLinter object can be used.
initializer = functools.partial(_worker_initialize, arguments=arguments)
pool = multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter])
@@ -97,30 +128,36 @@ def check_parallel(linter, jobs, files, arguments=None):
# correct reporter
linter.set_reporter(original_reporter)
linter.open()
-
- all_stats = []
-
try:
+ all_stats = []
+ all_mapreduce_data = collections.defaultdict(list)
+
+ # Maps each file to be worked on by a single _worker_check_single_file() call,
+ # collecting any map/reduce data by checker module so that we can 'reduce' it
+ # later.
for (
+ worker_idx, # used to merge map/reduce data across workers
module,
file_path,
base_name,
messages,
stats,
msg_status,
+ mapreduce_data,
) in pool.imap_unordered(_worker_check_single_file, files):
linter.file_state.base_name = base_name
linter.set_current_module(module, file_path)
for msg in messages:
msg = Message(*msg)
linter.reporter.handle_message(msg)
-
all_stats.append(stats)
+ all_mapreduce_data[worker_idx].append(mapreduce_data)
linter.msg_status |= msg_status
finally:
pool.close()
pool.join()
+ _merge_mapreduce_data(linter, all_mapreduce_data)
linter.stats = _merge_stats(all_stats)
# Insert stats data to local checkers.
diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py
index e8f67f4b6..c45b0b3b9 100644
--- a/tests/test_check_parallel.py
+++ b/tests/test_check_parallel.py
@@ -103,9 +103,17 @@ class TestCheckParallelFramework:
def test_worker_check_single_file_no_checkers(self):
linter = PyLinter(reporter=Reporter())
worker_initialize(linter=linter)
- (name, _, _, msgs, stats, msg_status) = worker_check_single_file(
- _gen_file_data()
- )
+
+ (
+ _, # proc-id
+ name,
+ _, # file_path
+ _, # base_name
+ msgs,
+ stats,
+ msg_status,
+ _, # mapreduce_data
+ ) = worker_check_single_file(_gen_file_data())
assert name == "--test-file_data-name-0--"
assert [] == msgs
no_errors_status = 0
@@ -140,9 +148,16 @@ class TestCheckParallelFramework:
# Add the only checker we care about in this test
linter.register_checker(SequentialTestChecker(linter))
- (name, _, _, msgs, stats, msg_status) = worker_check_single_file(
- _gen_file_data()
- )
+ (
+ _, # proc-id
+ name,
+ _, # file_path
+ _, # base_name
+ msgs,
+ stats,
+ msg_status,
+ _, # mapreduce_data
+ ) = worker_check_single_file(_gen_file_data())
# Ensure we return the same data as the single_file_no_checkers test
assert name == "--test-file_data-name-0--"
diff --git a/tests/test_self.py b/tests/test_self.py
index cd138e789..e8f9f848f 100644
--- a/tests/test_self.py
+++ b/tests/test_self.py
@@ -46,7 +46,7 @@ from unittest import mock
import pytest
-from pylint.constants import MAIN_CHECKER_NAME
+from pylint.constants import MAIN_CHECKER_NAME, MSG_TYPES_STATUS
from pylint.lint import Run
from pylint.reporters import JSONReporter
from pylint.reporters.text import BaseReporter, ColorizedTextReporter, TextReporter
@@ -243,13 +243,19 @@ class TestRunTC:
)
def test_parallel_execution(self):
+ out = StringIO()
self._runtest(
[
"-j 2",
join(HERE, "functional", "a", "arguments.py"),
- join(HERE, "functional", "a", "arguments.py"),
],
- code=2,
+ out=out,
+ # We expect similarities to fail and an error
+ code=MSG_TYPES_STATUS["E"],
+ )
+ assert (
+ "Unexpected keyword argument 'fourth' in function call"
+ in out.getvalue().strip()
)
def test_parallel_execution_missing_arguments(self):