summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAshley Whetter <ashley@awhetter.co.uk>2016-06-23 22:52:59 +0100
committerAshley Whetter <ashley@awhetter.co.uk>2019-02-09 13:22:35 -0800
commit82889d6a1295318d36adce352ff9af7b7855c69c (patch)
treeaa627252078013cbc84e48d0b3658d4db03d9345
parent4552fab95fcb35f54d4578c788534e9a9d4d7e5a (diff)
downloadpylint-git-82889d6a1295318d36adce352ff9af7b7855c69c.tar.gz
Moved parallel code outside of PyLinter
-rw-r--r--pylint/lint.py222
1 files changed, 119 insertions, 103 deletions
diff --git a/pylint/lint.py b/pylint/lint.py
index bbdeada14..0e351e51d 100644
--- a/pylint/lint.py
+++ b/pylint/lint.py
@@ -228,7 +228,7 @@ def _cpu_count() -> int:
if multiprocessing is not None:
- class ChildLinter(multiprocessing.Process):
+ class ChildRunner(multiprocessing.Process):
def run(self):
# pylint: disable=no-member, unbalanced-tuple-unpacking
tasks_queue, results_queue, self._config = self._args
@@ -919,113 +919,23 @@ class PyLinter(
# pylint: enable=unused-argument
+ def _init_msg_states(self):
+ for msg in self.msgs_store.messages:
+ if not msg.may_be_emitted():
+ self._msgs_state[msg.msgid] = False
+
def check(self, files_or_modules):
"""main checking entry: check a list of files or modules from their
name.
"""
# initialize msgs_state now that all messages have been registered into
# the store
- for msg in self.msgs_store.messages:
- if not msg.may_be_emitted():
- self._msgs_state[msg.msgid] = False
+ self._init_msg_states()
if not isinstance(files_or_modules, (list, tuple)):
files_or_modules = (files_or_modules,)
- if self.config.jobs == 1:
- self._do_check(files_or_modules)
- else:
- self._parallel_check(files_or_modules)
-
- def _get_jobs_config(self):
- child_config = collections.OrderedDict()
- filter_options = {"long-help"}
- filter_options.update((opt_name for opt_name, _ in self._external_opts))
- for opt_providers in self._all_options.values():
- for optname, optdict, val in opt_providers.options_and_values():
- if optdict.get("deprecated"):
- continue
-
- if optname not in filter_options:
- child_config[optname] = utils._format_option_value(optdict, val)
- child_config["python3_porting_mode"] = self._python3_porting_mode
- child_config["plugins"] = self._dynamic_plugins
- return child_config
-
- def _parallel_task(self, files_or_modules):
- # Prepare configuration for child linters.
- child_config = self._get_jobs_config()
-
- children = []
- manager = multiprocessing.Manager()
- tasks_queue = manager.Queue()
- results_queue = manager.Queue()
-
- # Send files to child linters.
- expanded_files = utils.expand_files(
- files_or_modules, self, self.config.black_list, self.config.black_list_re
- )
-
- # do not start more jobs than needed
- for _ in range(min(self.config.jobs, len(expanded_files))):
- child_linter = ChildLinter(args=(tasks_queue, results_queue, child_config))
- child_linter.start()
- children.append(child_linter)
-
- for module_desc in expanded_files:
- tasks_queue.put([module_desc.path])
-
- # collect results from child linters
- failed = False
- for _ in expanded_files:
- try:
- result = results_queue.get()
- except Exception as ex:
- print(
- "internal error while receiving results from child linter",
- file=sys.stderr,
- )
- print(ex, file=sys.stderr)
- failed = True
- break
- yield result
-
- # Stop child linters and wait for their completion.
- for _ in range(self.config.jobs):
- tasks_queue.put("STOP")
- for child in children:
- child.join()
-
- if failed:
- print("Error occurred, stopping the linter.", file=sys.stderr)
- sys.exit(32)
-
- def _parallel_check(self, files_or_modules):
- # Reset stats.
- self.open()
-
- all_stats = []
- module = None
- for result in self._parallel_task(files_or_modules):
- if not result:
- continue
- (_, self.file_state.base_name, module, messages, stats, msg_status) = result
-
- for msg in messages:
- msg = utils.Message(*msg)
- self.set_current_module(module)
- self.reporter.handle_message(msg)
-
- all_stats.append(stats)
- self.msg_status |= msg_status
-
- self.stats = _merge_stats(all_stats)
- self.current_name = module
-
- # Insert stats data to local checkers.
- for checker in self.get_checkers():
- if checker is not self:
- checker.stats = self.stats
+ self._do_check(files_or_modules)
def _do_check(self, files_or_modules):
walker = utils.PyLintASTWalker(self)
@@ -1584,18 +1494,21 @@ group are mutually exclusive.",
file=sys.stderr,
)
linter.set_option("jobs", 1)
- else:
- if linter.config.jobs == 0:
- linter.config.jobs = _cpu_count()
+ elif linter.config.jobs == 0:
+ linter.config.jobs = _cpu_count()
# We have loaded configuration from config file and command line. Now, we can
# load plugin specific configuration.
- linter.load_plugin_configuration()
+ #linter.load_plugin_configuration()
# insert current working directory to the python path to have a correct
# behaviour
with fix_import_path(args):
- linter.check(args)
+ if linter.config.jobs == 1:
+ linter.check(args)
+ else:
+ self._parallel_run(args)
+
linter.generate_reports()
if do_exit:
if linter.config.exit_zero:
@@ -1603,6 +1516,109 @@ group are mutually exclusive.",
else:
sys.exit(self.linter.msg_status)
+ def _parallel_run(self, files_or_modules):
+ with _patch_sysmodules():
+ self.linter._init_msg_states()
+ self._parallel_check(files_or_modules)
+
+ def _parallel_task(self, files_or_modules):
+ # Prepare configuration for child linters.
+ child_config = self._get_jobs_config()
+
+ children = []
+ manager = multiprocessing.Manager()
+ tasks_queue = manager.Queue()
+ results_queue = manager.Queue()
+
+ for _ in range(self.linter.config.jobs):
+ child_linter = ChildRunner(args=(tasks_queue, results_queue, child_config))
+ child_linter.start()
+ children.append(child_linter)
+
+ # Send files to child linters.
+ expanded_files = utils.expand_files(
+ files_or_modules,
+ self.linter,
+ self.linter.config.black_list,
+ self.linter.config.black_list_re,
+ )
+ for module_desc in expanded_files:
+ tasks_queue.put([module_desc.path])
+
+ # collect results from child linters
+ failed = False
+ for _ in expanded_files:
+ try:
+ result = results_queue.get()
+ except Exception as ex:
+ print(
+ "internal error while receiving results from child linter",
+ file=sys.stderr,
+ )
+ print(ex, file=sys.stderr)
+ failed = True
+ break
+ yield result
+
+ # Stop child linters and wait for their completion.
+ for _ in range(self.linter.config.jobs):
+ tasks_queue.put("STOP")
+ for child in children:
+ child.join()
+
+ if failed:
+ print("Error occured, stopping the linter.", file=sys.stderr)
+ sys.exit(32)
+
+ def _parallel_check(self, files_or_modules):
+ # Reset stats.
+ self.linter.open()
+
+ all_stats = []
+ module = None
+ for result in self._parallel_task(files_or_modules):
+ if not result:
+ continue
+ (
+ _,
+ self.linter.file_state.base_name,
+ module,
+ messages,
+ stats,
+ msg_status,
+ ) = result
+
+ for msg in messages:
+ msg = utils.Message(*msg)
+ self.linter.set_current_module(module)
+ self.linter.reporter.handle_message(msg)
+
+ all_stats.append(stats)
+ self.linter.msg_status |= msg_status
+
+ self.linter.stats = _merge_stats(all_stats)
+ self.linter.current_name = module
+
+ # Insert stats data to local checkers.
+ for checker in self.linter.get_checkers():
+ if checker is not self.linter:
+ checker.stats = self.linter.stats
+
+ def _get_jobs_config(self):
+ child_config = collections.OrderedDict()
+ filter_options = {"long-help"}
+ filter_options.update((opt_name for opt_name, _ in self.linter._external_opts))
+ for opt_providers in six.itervalues(self.linter._all_options):
+ for optname, optdict, val in opt_providers.options_and_values():
+ if optdict.get("deprecated"):
+ continue
+
+ if optname not in filter_options:
+ child_config[optname] = utils._format_option_value(optdict, val)
+ child_config["python3_porting_mode"] = self.linter._python3_porting_mode
+ child_config["plugins"] = self.linter._dynamic_plugins
+ return child_config
+
def cb_set_rcfile(self, name, value):
"""callback for option preprocessing (i.e. before option parsing)"""
self._rcfile = value