diff options
author | Ashley Whetter <ashley@awhetter.co.uk> | 2016-06-23 22:52:59 +0100 |
---|---|---|
committer | Ashley Whetter <ashley@awhetter.co.uk> | 2019-02-09 13:22:35 -0800 |
commit | 82889d6a1295318d36adce352ff9af7b7855c69c (patch) | |
tree | aa627252078013cbc84e48d0b3658d4db03d9345 | |
parent | 4552fab95fcb35f54d4578c788534e9a9d4d7e5a (diff) | |
download | pylint-git-82889d6a1295318d36adce352ff9af7b7855c69c.tar.gz |
Moved parallel code outside of PyLinter
-rw-r--r-- | pylint/lint.py | 222 |
1 files changed, 119 insertions, 103 deletions
diff --git a/pylint/lint.py b/pylint/lint.py index bbdeada14..0e351e51d 100644 --- a/pylint/lint.py +++ b/pylint/lint.py @@ -228,7 +228,7 @@ def _cpu_count() -> int: if multiprocessing is not None: - class ChildLinter(multiprocessing.Process): + class ChildRunner(multiprocessing.Process): def run(self): # pylint: disable=no-member, unbalanced-tuple-unpacking tasks_queue, results_queue, self._config = self._args @@ -919,113 +919,23 @@ class PyLinter( # pylint: enable=unused-argument + def _init_msg_states(self): + for msg in self.msgs_store.messages: + if not msg.may_be_emitted(): + self._msgs_state[msg.msgid] = False + def check(self, files_or_modules): """main checking entry: check a list of files or modules from their name. """ # initialize msgs_state now that all messages have been registered into # the store - for msg in self.msgs_store.messages: - if not msg.may_be_emitted(): - self._msgs_state[msg.msgid] = False + self._init_msg_states() if not isinstance(files_or_modules, (list, tuple)): files_or_modules = (files_or_modules,) - if self.config.jobs == 1: - self._do_check(files_or_modules) - else: - self._parallel_check(files_or_modules) - - def _get_jobs_config(self): - child_config = collections.OrderedDict() - filter_options = {"long-help"} - filter_options.update((opt_name for opt_name, _ in self._external_opts)) - for opt_providers in self._all_options.values(): - for optname, optdict, val in opt_providers.options_and_values(): - if optdict.get("deprecated"): - continue - - if optname not in filter_options: - child_config[optname] = utils._format_option_value(optdict, val) - child_config["python3_porting_mode"] = self._python3_porting_mode - child_config["plugins"] = self._dynamic_plugins - return child_config - - def _parallel_task(self, files_or_modules): - # Prepare configuration for child linters. - child_config = self._get_jobs_config() - - children = [] - manager = multiprocessing.Manager() - tasks_queue = manager.Queue() - results_queue = manager.Queue() - - # Send files to child linters. - expanded_files = utils.expand_files( - files_or_modules, self, self.config.black_list, self.config.black_list_re - ) - - # do not start more jobs than needed - for _ in range(min(self.config.jobs, len(expanded_files))): - child_linter = ChildLinter(args=(tasks_queue, results_queue, child_config)) - child_linter.start() - children.append(child_linter) - - for module_desc in expanded_files: - tasks_queue.put([module_desc.path]) - - # collect results from child linters - failed = False - for _ in expanded_files: - try: - result = results_queue.get() - except Exception as ex: - print( - "internal error while receiving results from child linter", - file=sys.stderr, - ) - print(ex, file=sys.stderr) - failed = True - break - yield result - - # Stop child linters and wait for their completion. - for _ in range(self.config.jobs): - tasks_queue.put("STOP") - for child in children: - child.join() - - if failed: - print("Error occurred, stopping the linter.", file=sys.stderr) - sys.exit(32) - - def _parallel_check(self, files_or_modules): - # Reset stats. - self.open() - - all_stats = [] - module = None - for result in self._parallel_task(files_or_modules): - if not result: - continue - (_, self.file_state.base_name, module, messages, stats, msg_status) = result - - for msg in messages: - msg = utils.Message(*msg) - self.set_current_module(module) - self.reporter.handle_message(msg) - - all_stats.append(stats) - self.msg_status |= msg_status - - self.stats = _merge_stats(all_stats) - self.current_name = module - - # Insert stats data to local checkers. - for checker in self.get_checkers(): - if checker is not self: - checker.stats = self.stats + self._do_check(files_or_modules) def _do_check(self, files_or_modules): walker = utils.PyLintASTWalker(self) @@ -1584,18 +1494,21 @@ group are mutually exclusive.", file=sys.stderr, ) linter.set_option("jobs", 1) - else: - if linter.config.jobs == 0: - linter.config.jobs = _cpu_count() + elif linter.config.jobs == 0: + linter.config.jobs = _cpu_count() # We have loaded configuration from config file and command line. Now, we can # load plugin specific configuration. - linter.load_plugin_configuration() + #linter.load_plugin_configuration() # insert current working directory to the python path to have a correct # behaviour with fix_import_path(args): - linter.check(args) + if linter.config.jobs == 1: + linter.check(args) + else: + self._parallel_run(args) + linter.generate_reports() if do_exit: if linter.config.exit_zero: @@ -1603,6 +1516,109 @@ group are mutually exclusive.", else: sys.exit(self.linter.msg_status) + def _parallel_run(self, files_or_modules): + with _patch_sysmodules(): + self.linter._init_msg_states() + self._parallel_check(files_or_modules) + + def _parallel_task(self, files_or_modules): + # Prepare configuration for child linters. + child_config = self._get_jobs_config() + + children = [] + manager = multiprocessing.Manager() + tasks_queue = manager.Queue() + results_queue = manager.Queue() + + for _ in range(self.linter.config.jobs): + child_linter = ChildRunner(args=(tasks_queue, results_queue, child_config)) + child_linter.start() + children.append(child_linter) + + # Send files to child linters. + expanded_files = utils.expand_files( + files_or_modules, + self.linter, + self.linter.config.black_list, + self.linter.config.black_list_re, + ) + for module_desc in expanded_files: + tasks_queue.put([module_desc.path]) + + # collect results from child linters + failed = False + for _ in expanded_files: + try: + result = results_queue.get() + except Exception as ex: + print( + "internal error while receiving results from child linter", + file=sys.stderr, + ) + print(ex, file=sys.stderr) + failed = True + break + yield result + + # Stop child linters and wait for their completion. + for _ in range(self.linter.config.jobs): + tasks_queue.put("STOP") + for child in children: + child.join() + + if failed: + print("Error occured, stopping the linter.", file=sys.stderr) + sys.exit(32) + + def _parallel_check(self, files_or_modules): + # Reset stats. + self.linter.open() + + all_stats = [] + module = None + for result in self._parallel_task(files_or_modules): + if not result: + continue + ( + _, + self.linter.file_state.base_name, + module, + messages, + stats, + msg_status, + ) = result + + for msg in messages: + msg = utils.Message(*msg) + self.linter.set_current_module(module) + self.linter.reporter.handle_message(msg) + + all_stats.append(stats) + self.linter.msg_status |= msg_status + + self.linter.stats = _merge_stats(all_stats) + self.linter.current_name = module + + # Insert stats data to local checkers. + for checker in self.linter.get_checkers(): + if checker is not self.linter: + checker.stats = self.linter.stats + + def _get_jobs_config(self): + child_config = collections.OrderedDict() + filter_options = {"long-help"} + filter_options.update((opt_name for opt_name, _ in self.linter._external_opts)) + for opt_providers in six.itervalues(self.linter._all_options): + for optname, optdict, val in opt_providers.options_and_values(): + if optdict.get("deprecated"): + continue + + if optname not in filter_options: + child_config[optname] = utils._format_option_value(optdict, val) + child_config["python3_porting_mode"] = self.linter._python3_porting_mode + child_config["plugins"] = self.linter._dynamic_plugins + return child_config + def cb_set_rcfile(self, name, value): """callback for option preprocessing (i.e. before option parsing)""" self._rcfile = value |