summaryrefslogtreecommitdiff
path: root/lib/dialyzer/src/dialyzer_coordinator.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl')
-rw-r--r--lib/dialyzer/src/dialyzer_coordinator.erl122
1 files changed, 73 insertions, 49 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl
index 2ca3acc4cb..085c4e938c 100644
--- a/lib/dialyzer/src/dialyzer_coordinator.erl
+++ b/lib/dialyzer/src/dialyzer_coordinator.erl
@@ -27,6 +27,9 @@
%%% Exports for the typesig and dataflow analysis workers
-export([wait_for_success_typings/2]).
+%% Exports to handle SCC labels
+-export([get_job_label/2, get_job_input/2]).
+
%%% Exports for the compilation workers
-export([get_next_label/2]).
@@ -36,9 +39,9 @@
-type collector() :: pid().
-type regulator() :: pid().
--type scc_to_pid() :: ets:tid() | 'none'.
+-type job_labels_to_pid() :: ets:tid() | 'none'.
--opaque coordinator() :: {collector(), regulator(), scc_to_pid()}.
+-opaque coordinator() :: {collector(), regulator(), job_labels_to_pid()}.
-type timing() :: dialyzer_timing:timing_server().
-type scc() :: [mfa_or_funlbl()].
@@ -46,7 +49,7 @@
'contract_remote_types' | 'record_remote_types'.
-type compile_job() :: file:filename().
--type typesig_job() :: scc().
+-type typesig_job() :: {integer(),scc()}.
-type dataflow_job() :: module().
-type warnings_job() :: module().
-type contract_remote_types_job() :: module().
@@ -98,7 +101,7 @@
job_fun :: fun(),
init_data :: init_data(),
regulator :: regulator(),
- scc_to_pid :: scc_to_pid()
+ job_labels_to_pid :: job_labels_to_pid()
}).
-include("dialyzer.hrl").
@@ -128,49 +131,26 @@ parallel_job(Mode, Jobs, InitData, Timing) ->
%%--------------------------------------------------------------------
%% API functions for workers (dialyzer_worker).
--spec request_activation(coordinator()) -> ok.
-
-request_activation({_Collector, Regulator, _SCCtoPid}) ->
- Regulator ! {req, self()},
- wait_activation().
-
--spec job_done(job(), job_result(), coordinator()) -> ok.
-
-job_done(Job, Result, {Collector, Regulator, _SCCtoPid}) ->
- Regulator ! done,
- Collector ! {done, Job, Result},
- ok.
-
--spec get_next_label(integer(), coordinator()) -> integer().
-
-%% For the 'compile' worker.
-get_next_label(EstimatedSize, {Collector, _Regulator, _SCCtoPid}) ->
- Collector ! {next_label_request, EstimatedSize, self()},
- receive
- {next_label_reply, NextLabel} -> NextLabel
- end.
-
--spec wait_for_success_typings([scc() | module()], coordinator()) ->
+-spec wait_for_success_typings([job_label()], coordinator()) ->
'ok'.
%% Helper for 'sigtype' and 'dataflow' workers.
-wait_for_success_typings(SCCs, {_Collector, _Regulator, SCCtoPid}) ->
- F = fun(SCC) ->
- %% The SCCs that SCC depends on have always been started.
- try ets:lookup_element(SCCtoPid, SCC, 2) of
+wait_for_success_typings(Labels, {_Collector, _Regulator, JobLabelsToPid}) ->
+ F = fun(JobLabel) ->
+ %% The jobs that job depends on have always been started.
+ case ets:lookup_element(JobLabelsToPid, JobLabel, 2, ok) of
Pid when is_pid(Pid) ->
Ref = erlang:monitor(process, Pid),
receive
{'DOWN', Ref, process, Pid, _Info} ->
ok
- end
- catch
- _:_ ->
+ end;
+ ok ->
%% Already finished.
ok
end
end,
- lists:foreach(F, SCCs).
+ lists:foreach(F, Labels).
%%--------------------------------------------------------------------
@@ -179,18 +159,16 @@ wait_for_success_typings(SCCs, {_Collector, _Regulator, SCCtoPid}) ->
spawn_jobs(Mode, Jobs, InitData, Timing) ->
Collector = self(),
Regulator = spawn_regulator(),
-
- SCCtoPid =
+ JobLabelsToPid =
if
Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- ets:new(scc_to_pid, [{read_concurrency, true}]);
+ ets:new(job_labels_to_pid, [{read_concurrency, true}]);
true ->
none
end,
+ Coordinator = {Collector, Regulator, JobLabelsToPid},
- Coordinator = {Collector, Regulator, SCCtoPid},
-
- JobFun = job_fun(SCCtoPid, Mode, InitData, Coordinator),
+ JobFun = job_fun(JobLabelsToPid, Mode, InitData, Coordinator),
%% Limit the number of processes we start in order to save memory.
MaxNumberOfInitJobs = 20 * dialyzer_utils:parallelism(),
@@ -212,7 +190,8 @@ spawn_jobs(Mode, Jobs, InitData, Timing) ->
#state{mode = Mode, active = JobCount, result = InitResult,
next_label = 0, job_fun = JobFun, jobs = RestJobs,
- init_data = InitData, regulator = Regulator, scc_to_pid = SCCtoPid}.
+ init_data = InitData, regulator = Regulator,
+ job_labels_to_pid = JobLabelsToPid}.
launch_jobs(Jobs, _JobFun, 0) ->
Jobs;
@@ -227,17 +206,18 @@ job_fun(none, Mode, InitData, Coordinator) ->
_ = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
ok
end;
-job_fun(SCCtoPid, Mode, InitData, Coordinator) ->
+job_fun(JobLabelsToPid, Mode, InitData, Coordinator) ->
fun(Job) ->
+ JobLabel = get_job_label(Mode, Job),
Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
- true = ets:insert(SCCtoPid, {Job, Pid}),
+ true = ets:insert(JobLabelsToPid, {JobLabel, Pid}),
ok
end.
collect_result(#state{mode = Mode, active = Active, result = Result,
next_label = NextLabel, init_data = InitData,
jobs = JobsLeft, job_fun = JobFun,
- regulator = Regulator, scc_to_pid = SCCtoPid} = State) ->
+ regulator = Regulator, job_labels_to_pid = JobLabelsToPID} = State) ->
receive
{next_label_request, Estimation, Pid} ->
Pid ! {next_label_reply, NextLabel},
@@ -253,15 +233,15 @@ collect_result(#state{mode = Mode, active = Active, result = Result,
{NewResult, NextLabel};
_ ->
if
- SCCtoPid =:= none -> ok;
- true -> ets:delete(SCCtoPid)
+ JobLabelsToPID =:= none -> ok;
+ true -> ets:delete(JobLabelsToPID)
end,
NewResult
end;
N ->
if
- SCCtoPid =:= none -> ok;
- true -> true = ets:delete(SCCtoPid, Job)
+ JobLabelsToPID =:= none -> ok;
+ true -> true = ets:delete(JobLabelsToPID, get_job_label(Mode, Job))
end,
NewJobsLeft =
case JobsLeft of
@@ -288,6 +268,44 @@ update_result(Mode, InitData, Job, Data, Result) ->
Data ++ Result
end.
+
+-type job_label() :: integer() | module().
+
+-type job_input() :: scc() | module().
+
+-spec get_job_label(mode(), job()) -> job_label().
+
+get_job_label(typesig, {Label, _Input}) -> Label;
+get_job_label(dataflow, Job) -> Job;
+get_job_label(contract_remote_types, Job) -> Job;
+get_job_label(record_remote_types, Job) -> Job;
+get_job_label(warnings, Job) -> Job;
+get_job_label(compile, Job) -> Job.
+
+-spec get_job_input(mode(), job()) -> job_input().
+
+get_job_input(typesig, {_Label, Input}) -> Input;
+get_job_input(dataflow, Job) -> Job;
+get_job_input(contract_remote_types, Job) -> Job;
+get_job_input(record_remote_types, Job) -> Job;
+get_job_input(warnings, Job) -> Job;
+get_job_input(compile, Job) -> Job.
+
+-spec job_done(job(), job_result(), coordinator()) -> ok.
+
+job_done(Job, Result, {Collector, Regulator, _JobLabelsToPID}) ->
+ Regulator ! done,
+ Collector ! {done, Job, Result},
+ ok.
+
+-spec get_next_label(integer(), coordinator()) -> integer().
+
+get_next_label(EstimatedSize, {Collector, _Regulator, _JobLabelsToPID}) ->
+ Collector ! {next_label_request, EstimatedSize, self()},
+ receive
+ {next_label_reply, NextLabel} -> NextLabel
+ end.
+
%%--------------------------------------------------------------------
%% The regulator server
%%
@@ -304,6 +322,12 @@ wait_activation() ->
activate_pid(Pid) ->
Pid ! activate.
+-spec request_activation(coordinator()) -> ok.
+
+request_activation({_Collector, Regulator, _JobLabelsToPID}) ->
+ Regulator ! {req, self()},
+ wait_activation().
+
spawn_regulator() ->
InitTickets = dialyzer_utils:parallelism(),
spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end).