diff options
author | Frank Harrison <frank@doublethefish.com> | 2020-03-26 11:41:22 +0000 |
---|---|---|
committer | Pierre Sassoulas <pierre.sassoulas@gmail.com> | 2021-01-02 09:56:39 +0100 |
commit | b41e8d940dbd0a92d2805a99eb0f97c01f620197 (patch) | |
tree | b0b0ca5d36531b469e4573c17c79b3ab0bd4c37c | |
parent | 579b58d3583fb0efac58aaa8e4d63f6dcb05b0bb (diff) | |
download | pylint-git-b41e8d940dbd0a92d2805a99eb0f97c01f620197.tar.gz |
mapreduce| Fixes -jN for map/reduce Checkers (e.g. SimilarChecker)
This integrate the map/reduce functionality into lint.check_process().
We previously had `map` being invoked, here we add `reduce` support.
We do this by collecting the map-data by worker and then passing it to a
reducer function on the Checker object, if available - determined by
whether they confirm to the `mapreduce_checker.MapReduceMixin` mixin
interface or nor.
This allows Checker objects to function across file-streams when using
multiprocessing/-j2+. For example SimilarChecker needs to be able to
compare data across all files.
The tests, that we also add here, check that a Checker instance returns
and reports expected data and errors, such as error-messages and stats -
at least in a exit-ok (0) situation.
On a personal note, as we are copying more data across process
boundaries, I suspect that the memory implications of this might cause
issues for large projects already running with -jN and duplicate code
detection on. That said, given that it takes a long time to perform
lints of large code bases that is an issue for the [near?] future and
likely to be part of the performance work. Either way but let's get it
working first and deal with memory and perforamnce considerations later
- I say this as there are many quick wins we can make here, e.g.
file-batching, hashing lines, data compression and so on.
-rw-r--r-- | pylint/checkers/__init__.py | 10 | ||||
-rw-r--r-- | pylint/checkers/mapreduce_checker.py | 18 | ||||
-rw-r--r-- | pylint/checkers/similar.py | 4 | ||||
-rw-r--r-- | pylint/lint/parallel.py | 55 | ||||
-rw-r--r-- | tests/test_check_parallel.py | 27 | ||||
-rw-r--r-- | tests/test_self.py | 12 |
6 files changed, 105 insertions, 21 deletions
diff --git a/pylint/checkers/__init__.py b/pylint/checkers/__init__.py index bfde22b3d..31d2df522 100644 --- a/pylint/checkers/__init__.py +++ b/pylint/checkers/__init__.py @@ -10,6 +10,7 @@ # Copyright (c) 2018-2019 Pierre Sassoulas <pierre.sassoulas@gmail.com> # Copyright (c) 2018 ssolanki <sushobhitsolanki@gmail.com> # Copyright (c) 2019 Bruno P. Kinoshita <kinow@users.noreply.github.com> +# Copyright (c) 2020 Frank Harrison <doublethefish@gmail.com> # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html # For details: https://github.com/PyCQA/pylint/blob/master/COPYING @@ -43,6 +44,7 @@ messages nor reports. XXX not true, emit a 07 report ! """ from pylint.checkers.base_checker import BaseChecker, BaseTokenChecker +from pylint.checkers.mapreduce_checker import MapReduceMixin from pylint.utils import register_plugins @@ -64,4 +66,10 @@ def initialize(linter): register_plugins(linter, __path__[0]) -__all__ = ["BaseChecker", "BaseTokenChecker", "initialize", "register_plugins"] +__all__ = [ + "BaseChecker", + "BaseTokenChecker", + "initialize", + "MapReduceMixin", + "register_plugins", +] diff --git a/pylint/checkers/mapreduce_checker.py b/pylint/checkers/mapreduce_checker.py new file mode 100644 index 000000000..17a1090bf --- /dev/null +++ b/pylint/checkers/mapreduce_checker.py @@ -0,0 +1,18 @@ +# Copyright (c) 2020 Frank Harrison <doublethefish@gmail.com> + +# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html +# For details: https://github.com/PyCQA/pylint/blob/master/COPYING +import abc + + +class MapReduceMixin(metaclass=abc.ABCMeta): + """ A mixin design to allow multiprocess/threaded runs of a Checker """ + + @abc.abstractmethod + def get_map_data(self): + """ Returns mergable/reducible data that will be examined """ + + @classmethod + @abc.abstractmethod + def reduce_map_data(cls, linter, data): + """ For a given Checker, receives data for all mapped runs """ diff --git a/pylint/checkers/similar.py b/pylint/checkers/similar.py index 3ac071bb3..1f817ada2 100644 --- a/pylint/checkers/similar.py +++ b/pylint/checkers/similar.py @@ -31,7 +31,7 @@ from itertools import groupby import astroid -from pylint.checkers import BaseChecker, table_lines_from_stats +from pylint.checkers import BaseChecker, MapReduceMixin, table_lines_from_stats from pylint.interfaces import IRawChecker from pylint.reporters.ureports.nodes import Table from pylint.utils import decoding_stream @@ -302,7 +302,7 @@ def report_similarities(sect, stats, old_stats): # wrapper to get a pylint checker from the similar class -class SimilarChecker(BaseChecker, Similar): +class SimilarChecker(BaseChecker, Similar, MapReduceMixin): """checks for similarities and duplicated code. This computation may be memory / CPU intensive, so you should disable it if you experiment some problems. diff --git a/pylint/lint/parallel.py b/pylint/lint/parallel.py index fa1e65d8f..fe9ce0605 100644 --- a/pylint/lint/parallel.py +++ b/pylint/lint/parallel.py @@ -67,28 +67,59 @@ def _worker_check_single_file(file_item): _worker_linter.open() _worker_linter.check_single_file(name, filepath, modname) - + mapreduce_data = collections.defaultdict(list) + for checker in _worker_linter.get_checkers(): + try: + data = checker.get_map_data() + except AttributeError: + continue + mapreduce_data[checker.name].append(data) msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages] _worker_linter.reporter.reset() return ( + id(multiprocessing.current_process()), _worker_linter.current_name, filepath, _worker_linter.file_state.base_name, msgs, _worker_linter.stats, _worker_linter.msg_status, + mapreduce_data, ) +def _merge_mapreduce_data(linter, all_mapreduce_data): + """ Merges map/reduce data across workers, invoking relevant APIs on checkers """ + # First collate the data, preparing it so we can send it to the checkers for + # validation. The intent here is to collect all the mapreduce data for all checker- + # runs across processes - that will then be passed to a static method on the + # checkers to be reduced and further processed. + collated_map_reduce_data = collections.defaultdict(list) + for linter_data in all_mapreduce_data.values(): + for run_data in linter_data: + for checker_name, data in run_data.items(): + collated_map_reduce_data[checker_name].extend(data) + + # Send the data to checkers that support/require consolidated data + original_checkers = linter.get_checkers() + for checker in original_checkers: + if checker.name in collated_map_reduce_data: + # Assume that if the check has returned map/reduce data that it has the + # reducer function + checker.reduce_map_data(linter, collated_map_reduce_data[checker.name]) + + def check_parallel(linter, jobs, files, arguments=None): - """Use the given linter to lint the files with given amount of workers (jobs)""" - # The reporter does not need to be passed to worker processess, i.e. the reporter does - # not need to be pickleable + """Use the given linter to lint the files with given amount of workers (jobs) + This splits the work filestream-by-filestream. If you need to do work across + multiple files, as in the similarity-checker, then inherit from MapReduceMixin and + implement the map/reduce mixin functionality""" + # The reporter does not need to be passed to worker processes, i.e. the reporter does original_reporter = linter.reporter linter.reporter = None # The linter is inherited by all the pool's workers, i.e. the linter - # is identical to the linter object here. This is requred so that + # is identical to the linter object here. This is required so that # a custom PyLinter object can be used. initializer = functools.partial(_worker_initialize, arguments=arguments) pool = multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter]) @@ -97,30 +128,36 @@ def check_parallel(linter, jobs, files, arguments=None): # correct reporter linter.set_reporter(original_reporter) linter.open() - - all_stats = [] - try: + all_stats = [] + all_mapreduce_data = collections.defaultdict(list) + + # Maps each file to be worked on by a single _worker_check_single_file() call, + # collecting any map/reduce data by checker module so that we can 'reduce' it + # later. for ( + worker_idx, # used to merge map/reduce data across workers module, file_path, base_name, messages, stats, msg_status, + mapreduce_data, ) in pool.imap_unordered(_worker_check_single_file, files): linter.file_state.base_name = base_name linter.set_current_module(module, file_path) for msg in messages: msg = Message(*msg) linter.reporter.handle_message(msg) - all_stats.append(stats) + all_mapreduce_data[worker_idx].append(mapreduce_data) linter.msg_status |= msg_status finally: pool.close() pool.join() + _merge_mapreduce_data(linter, all_mapreduce_data) linter.stats = _merge_stats(all_stats) # Insert stats data to local checkers. diff --git a/tests/test_check_parallel.py b/tests/test_check_parallel.py index e8f67f4b6..c45b0b3b9 100644 --- a/tests/test_check_parallel.py +++ b/tests/test_check_parallel.py @@ -103,9 +103,17 @@ class TestCheckParallelFramework: def test_worker_check_single_file_no_checkers(self): linter = PyLinter(reporter=Reporter()) worker_initialize(linter=linter) - (name, _, _, msgs, stats, msg_status) = worker_check_single_file( - _gen_file_data() - ) + + ( + _, # proc-id + name, + _, # file_path + _, # base_name + msgs, + stats, + msg_status, + _, # mapreduce_data + ) = worker_check_single_file(_gen_file_data()) assert name == "--test-file_data-name-0--" assert [] == msgs no_errors_status = 0 @@ -140,9 +148,16 @@ class TestCheckParallelFramework: # Add the only checker we care about in this test linter.register_checker(SequentialTestChecker(linter)) - (name, _, _, msgs, stats, msg_status) = worker_check_single_file( - _gen_file_data() - ) + ( + _, # proc-id + name, + _, # file_path + _, # base_name + msgs, + stats, + msg_status, + _, # mapreduce_data + ) = worker_check_single_file(_gen_file_data()) # Ensure we return the same data as the single_file_no_checkers test assert name == "--test-file_data-name-0--" diff --git a/tests/test_self.py b/tests/test_self.py index cd138e789..e8f9f848f 100644 --- a/tests/test_self.py +++ b/tests/test_self.py @@ -46,7 +46,7 @@ from unittest import mock import pytest -from pylint.constants import MAIN_CHECKER_NAME +from pylint.constants import MAIN_CHECKER_NAME, MSG_TYPES_STATUS from pylint.lint import Run from pylint.reporters import JSONReporter from pylint.reporters.text import BaseReporter, ColorizedTextReporter, TextReporter @@ -243,13 +243,19 @@ class TestRunTC: ) def test_parallel_execution(self): + out = StringIO() self._runtest( [ "-j 2", join(HERE, "functional", "a", "arguments.py"), - join(HERE, "functional", "a", "arguments.py"), ], - code=2, + out=out, + # We expect similarities to fail and an error + code=MSG_TYPES_STATUS["E"], + ) + assert ( + "Unexpected keyword argument 'fourth' in function call" + in out.getvalue().strip() ) def test_parallel_execution_missing_arguments(self): |