diff options
Diffstat (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl')
-rw-r--r-- | lib/dialyzer/src/dialyzer_coordinator.erl | 122 |
1 files changed, 73 insertions, 49 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 2ca3acc4cb..085c4e938c 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -27,6 +27,9 @@ %%% Exports for the typesig and dataflow analysis workers -export([wait_for_success_typings/2]). +%% Exports to handle SCC labels +-export([get_job_label/2, get_job_input/2]). + %%% Exports for the compilation workers -export([get_next_label/2]). @@ -36,9 +39,9 @@ -type collector() :: pid(). -type regulator() :: pid(). --type scc_to_pid() :: ets:tid() | 'none'. +-type job_labels_to_pid() :: ets:tid() | 'none'. --opaque coordinator() :: {collector(), regulator(), scc_to_pid()}. +-opaque coordinator() :: {collector(), regulator(), job_labels_to_pid()}. -type timing() :: dialyzer_timing:timing_server(). -type scc() :: [mfa_or_funlbl()]. @@ -46,7 +49,7 @@ 'contract_remote_types' | 'record_remote_types'. -type compile_job() :: file:filename(). --type typesig_job() :: scc(). +-type typesig_job() :: {integer(),scc()}. -type dataflow_job() :: module(). -type warnings_job() :: module(). -type contract_remote_types_job() :: module(). @@ -98,7 +101,7 @@ job_fun :: fun(), init_data :: init_data(), regulator :: regulator(), - scc_to_pid :: scc_to_pid() + job_labels_to_pid :: job_labels_to_pid() }). -include("dialyzer.hrl"). @@ -128,49 +131,26 @@ parallel_job(Mode, Jobs, InitData, Timing) -> %%-------------------------------------------------------------------- %% API functions for workers (dialyzer_worker). --spec request_activation(coordinator()) -> ok. - -request_activation({_Collector, Regulator, _SCCtoPid}) -> - Regulator ! {req, self()}, - wait_activation(). - --spec job_done(job(), job_result(), coordinator()) -> ok. - -job_done(Job, Result, {Collector, Regulator, _SCCtoPid}) -> - Regulator ! done, - Collector ! {done, Job, Result}, - ok. - --spec get_next_label(integer(), coordinator()) -> integer(). - -%% For the 'compile' worker. -get_next_label(EstimatedSize, {Collector, _Regulator, _SCCtoPid}) -> - Collector ! {next_label_request, EstimatedSize, self()}, - receive - {next_label_reply, NextLabel} -> NextLabel - end. - --spec wait_for_success_typings([scc() | module()], coordinator()) -> +-spec wait_for_success_typings([job_label()], coordinator()) -> 'ok'. %% Helper for 'sigtype' and 'dataflow' workers. -wait_for_success_typings(SCCs, {_Collector, _Regulator, SCCtoPid}) -> - F = fun(SCC) -> - %% The SCCs that SCC depends on have always been started. - try ets:lookup_element(SCCtoPid, SCC, 2) of +wait_for_success_typings(Labels, {_Collector, _Regulator, JobLabelsToPid}) -> + F = fun(JobLabel) -> + %% The jobs that job depends on have always been started. + case ets:lookup_element(JobLabelsToPid, JobLabel, 2, ok) of Pid when is_pid(Pid) -> Ref = erlang:monitor(process, Pid), receive {'DOWN', Ref, process, Pid, _Info} -> ok - end - catch - _:_ -> + end; + ok -> %% Already finished. ok end end, - lists:foreach(F, SCCs). + lists:foreach(F, Labels). %%-------------------------------------------------------------------- @@ -179,18 +159,16 @@ wait_for_success_typings(SCCs, {_Collector, _Regulator, SCCtoPid}) -> spawn_jobs(Mode, Jobs, InitData, Timing) -> Collector = self(), Regulator = spawn_regulator(), - - SCCtoPid = + JobLabelsToPid = if Mode =:= 'typesig'; Mode =:= 'dataflow' -> - ets:new(scc_to_pid, [{read_concurrency, true}]); + ets:new(job_labels_to_pid, [{read_concurrency, true}]); true -> none end, + Coordinator = {Collector, Regulator, JobLabelsToPid}, - Coordinator = {Collector, Regulator, SCCtoPid}, - - JobFun = job_fun(SCCtoPid, Mode, InitData, Coordinator), + JobFun = job_fun(JobLabelsToPid, Mode, InitData, Coordinator), %% Limit the number of processes we start in order to save memory. MaxNumberOfInitJobs = 20 * dialyzer_utils:parallelism(), @@ -212,7 +190,8 @@ spawn_jobs(Mode, Jobs, InitData, Timing) -> #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0, job_fun = JobFun, jobs = RestJobs, - init_data = InitData, regulator = Regulator, scc_to_pid = SCCtoPid}. + init_data = InitData, regulator = Regulator, + job_labels_to_pid = JobLabelsToPid}. launch_jobs(Jobs, _JobFun, 0) -> Jobs; @@ -227,17 +206,18 @@ job_fun(none, Mode, InitData, Coordinator) -> _ = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), ok end; -job_fun(SCCtoPid, Mode, InitData, Coordinator) -> +job_fun(JobLabelsToPid, Mode, InitData, Coordinator) -> fun(Job) -> + JobLabel = get_job_label(Mode, Job), Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - true = ets:insert(SCCtoPid, {Job, Pid}), + true = ets:insert(JobLabelsToPid, {JobLabel, Pid}), ok end. collect_result(#state{mode = Mode, active = Active, result = Result, next_label = NextLabel, init_data = InitData, jobs = JobsLeft, job_fun = JobFun, - regulator = Regulator, scc_to_pid = SCCtoPid} = State) -> + regulator = Regulator, job_labels_to_pid = JobLabelsToPID} = State) -> receive {next_label_request, Estimation, Pid} -> Pid ! {next_label_reply, NextLabel}, @@ -253,15 +233,15 @@ collect_result(#state{mode = Mode, active = Active, result = Result, {NewResult, NextLabel}; _ -> if - SCCtoPid =:= none -> ok; - true -> ets:delete(SCCtoPid) + JobLabelsToPID =:= none -> ok; + true -> ets:delete(JobLabelsToPID) end, NewResult end; N -> if - SCCtoPid =:= none -> ok; - true -> true = ets:delete(SCCtoPid, Job) + JobLabelsToPID =:= none -> ok; + true -> true = ets:delete(JobLabelsToPID, get_job_label(Mode, Job)) end, NewJobsLeft = case JobsLeft of @@ -288,6 +268,44 @@ update_result(Mode, InitData, Job, Data, Result) -> Data ++ Result end. + +-type job_label() :: integer() | module(). + +-type job_input() :: scc() | module(). + +-spec get_job_label(mode(), job()) -> job_label(). + +get_job_label(typesig, {Label, _Input}) -> Label; +get_job_label(dataflow, Job) -> Job; +get_job_label(contract_remote_types, Job) -> Job; +get_job_label(record_remote_types, Job) -> Job; +get_job_label(warnings, Job) -> Job; +get_job_label(compile, Job) -> Job. + +-spec get_job_input(mode(), job()) -> job_input(). + +get_job_input(typesig, {_Label, Input}) -> Input; +get_job_input(dataflow, Job) -> Job; +get_job_input(contract_remote_types, Job) -> Job; +get_job_input(record_remote_types, Job) -> Job; +get_job_input(warnings, Job) -> Job; +get_job_input(compile, Job) -> Job. + +-spec job_done(job(), job_result(), coordinator()) -> ok. + +job_done(Job, Result, {Collector, Regulator, _JobLabelsToPID}) -> + Regulator ! done, + Collector ! {done, Job, Result}, + ok. + +-spec get_next_label(integer(), coordinator()) -> integer(). + +get_next_label(EstimatedSize, {Collector, _Regulator, _JobLabelsToPID}) -> + Collector ! {next_label_request, EstimatedSize, self()}, + receive + {next_label_reply, NextLabel} -> NextLabel + end. + %%-------------------------------------------------------------------- %% The regulator server %% @@ -304,6 +322,12 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. +-spec request_activation(coordinator()) -> ok. + +request_activation({_Collector, Regulator, _JobLabelsToPID}) -> + Regulator ! {req, self()}, + wait_activation(). + spawn_regulator() -> InitTickets = dialyzer_utils:parallelism(), spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end). |