diff options
author | Hans Bolinder <hasse@erlang.org> | 2020-08-21 09:38:25 +0200 |
---|---|---|
committer | Hans Bolinder <hasse@erlang.org> | 2020-10-05 08:55:08 +0200 |
commit | 0e71844663a10d6b237cc96804cd000f12d22709 (patch) | |
tree | 9c6b964cb6c18302d4200060dc1a6402dae5130d /lib/kernel/src | |
parent | cd23d705c0a15740506dfdb9fc7a6375cd13c0ca (diff) | |
download | erlang-0e71844663a10d6b237cc96804cd000f12d22709.tar.gz |
kernel: Remove support for distributed disk logs
disk_log:all/0 is a new function, which is to be used instead of
disk_log:accessible_logs/0.
disk_log:accessible_logs/0 and disk_log:lclose/1,2 are deprecated.
Diffstat (limited to 'lib/kernel/src')
-rw-r--r-- | lib/kernel/src/disk_log.erl | 124 | ||||
-rw-r--r-- | lib/kernel/src/disk_log.hrl | 6 | ||||
-rw-r--r-- | lib/kernel/src/disk_log_server.erl | 203 |
3 files changed, 65 insertions, 268 deletions
diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 6b3eb35f92..89a9bd2f51 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -30,7 +30,7 @@ change_notify/3, change_header/2, chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1, block/1, block/2, unblock/1, info/1, format_error/1, - accessible_logs/0]). + accessible_logs/0, all/0]). %% Internal exports -export([init/2, internal_open/2, @@ -47,6 +47,10 @@ -export_type([continuation/0]). +-deprecated([{accessible_logs, 0, "use disk_log:all/0 instead"}, + {lclose, 1, "use disk_log:close/1 instead"}, + {lclose, 2, "use disk_log:close/1 instead"}]). + -type dlog_state_error() :: 'ok' | {'error', term()}. -record(state, {queue = [], @@ -102,16 +106,13 @@ | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()} | {'node_already_open', Log :: log()}. --type dist_error_rsn() :: 'nodedown' | open_error_rsn(). --type ret() :: {'ok', Log :: log()} +-type open_ret() :: {'ok', Log :: log()} | {'repaired', Log :: log(), {'recovered', Rec :: non_neg_integer()}, - {'badbytes', Bad :: non_neg_integer()}}. --type open_ret() :: ret() | {'error', open_error_rsn()}. --type dist_open_ret() :: {[{node(), ret()}], - [{node(), {'error', dist_error_rsn()}}]}. + {'badbytes', Bad :: non_neg_integer()}} + | {'error', open_error_rsn()}. --spec open(ArgL) -> open_ret() | dist_open_ret() when +-spec open(ArgL) -> open_ret() when ArgL :: dlog_options(). open(A) -> disk_log_server:open(check_arg(A, #arg{options = A})). @@ -195,8 +196,10 @@ lclose(Log) -> -spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when Log :: log(), Node :: node(). -lclose(Log, Node) -> - lreq(Log, close, Node). +lclose(Log, Node) when node() =:= Node -> + req(Log, close); +lclose(_Log, _Node) -> + {error, no_such_log}. -type trunc_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} @@ -337,7 +340,6 @@ format_error(Error) -> | {status, Status :: ok | {blocked, QueueLogRecords :: boolean()}} | {node, Node :: node()} - | {distributed, Dist :: local | [node()]} | {head, Head :: none | {head, binary()} | (MFA :: {atom(), atom(), list()})} @@ -353,7 +355,7 @@ format_error(Error) -> Log :: log(), InfoList :: [dlog_info()]. info(Log) -> - sreq(Log, info). + req(Log, info). -spec pid2name(Pid) -> {'ok', Log} | 'undefined' when Pid :: pid(), @@ -401,7 +403,7 @@ chunk(Log, Cont, N) when is_integer(N), N > 0 -> ichunk(Log, Cont, N). ichunk(Log, start, N) -> - R = sreq(Log, {chunk, 0, [], N}), + R = req(Log, {chunk, 0, [], N}), ichunk_end(R, Log); ichunk(Log, More, N) when is_record(More, continuation) -> R = req2(More#continuation.pid, @@ -484,7 +486,7 @@ bchunk(Log, Cont, N) when is_integer(N), N > 0 -> bichunk(Log, Cont, N). bichunk(Log, start, N) -> - R = sreq(Log, {chunk, 0, [], N}), + R = req(Log, {chunk, 0, [], N}), bichunk_end(R); bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) -> R = req2(Pid, {chunk, Pos, B, N}), @@ -511,7 +513,7 @@ chunk_step(Log, Cont, N) when is_integer(N) -> ichunk_step(Log, Cont, N). ichunk_step(Log, start, N) -> - sreq(Log, {chunk_step, 0, N}); + req(Log, {chunk_step, 0, N}); ichunk_step(_Log, More, N) when is_record(More, continuation) -> req2(More#continuation.pid, {chunk_step, More#continuation.pos, N}); ichunk_step(_Log, _, _) -> @@ -526,11 +528,15 @@ chunk_info(More = #continuation{}) -> chunk_info(BadCont) -> {error, {no_continuation, BadCont}}. --spec accessible_logs() -> {[LocalLog], [DistributedLog]} when - LocalLog :: log(), - DistributedLog :: log(). +-spec accessible_logs() -> {[Log], []} when + Log :: log(). accessible_logs() -> - disk_log_server:accessible_logs(). + {disk_log_server:all(), []}. + +-spec all() -> [Log] when + Log :: log(). +all() -> + disk_log_server:all(). istart_link(Server) -> {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}. @@ -622,10 +628,6 @@ check_arg([{format, internal}|Tail], Res) -> check_arg(Tail, Res#arg{format = internal}); check_arg([{format, external}|Tail], Res) -> check_arg(Tail, Res#arg{format = external}); -check_arg([{distributed, []}|Tail], Res) -> - check_arg(Tail, Res#arg{distributed = false}); -check_arg([{distributed, Nodes}|Tail], Res) when is_list(Nodes) -> - check_arg(Tail, Res#arg{distributed = {true, Nodes}}); check_arg([{notify, true}|Tail], Res) -> check_arg(Tail, Res#arg{notify = true}); check_arg([{notify, false}|Tail], Res) -> @@ -1519,9 +1521,6 @@ do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) -> "the current value ~tp~n", [ArgValue, Option, FirstValue]); do_format_error({name_already_open, Log}) -> io_lib:format("The disk log ~tp has already opened another file~n", [Log]); -do_format_error({node_already_open, Log}) -> - io_lib:format("The distribution option of the disk log ~tp does not match " - "already open log~n", [Log]); do_format_error({open_read_write, Log}) -> io_lib:format("The disk log ~tp has already been opened read-write~n", [Log]); @@ -1573,15 +1572,6 @@ do_info(L, Cnt) -> halt -> Extra#halt.size end, - Distribution = - case disk_log_server:get_log_pids(Name) of - {local, _Pid} -> - local; - {distributed, Pids} -> - [node(P) || P <- Pids]; - undefined -> % "cannot happen" - [] - end, RW = case Type of wrap when Mode =:= read_write -> #handle{curB = CurB, curF = CurF, @@ -1629,8 +1619,7 @@ do_info(L, Cnt) -> HeadL ++ [{mode, Mode}, {status, Status}, - {node, node()}, - {distributed, Distribution} + {node, node()} ], Common ++ RW. @@ -1861,65 +1850,13 @@ reply(To, Reply, S) -> loop(S). req(Log, R) -> - case disk_log_server:get_log_pids(Log) of - {local, Pid} -> - monitor_request(Pid, R); - undefined -> - {error, no_such_log}; - {distributed, Pids} -> - multi_req({self(), R}, Pids) - end. - -multi_req(Msg, Pids) -> - Refs = - lists:map(fun(Pid) -> - Ref = erlang:monitor(process, Pid), - Pid ! Msg, - {Pid, Ref} - end, Pids), - lists:foldl(fun({Pid, Ref}, Reply) -> - receive - {'DOWN', Ref, process, Pid, _Info} -> - Reply; - {disk_log, Pid, _Reply} -> - erlang:demonitor(Ref, [flush]), - ok - end - end, {error, nonode}, Refs). - -sreq(Log, R) -> - case nearby_pid(Log, node()) of + case disk_log_server:get_log_pid(Log) of undefined -> {error, no_such_log}; Pid -> monitor_request(Pid, R) end. -%% Local req - always talk to log on Node -lreq(Log, R, Node) -> - case nearby_pid(Log, Node) of - Pid when is_pid(Pid), node(Pid) =:= Node -> - monitor_request(Pid, R); - _Else -> - {error, no_such_log} - end. - -nearby_pid(Log, Node) -> - case disk_log_server:get_log_pids(Log) of - undefined -> - undefined; - {local, Pid} -> - Pid; - {distributed, Pids} -> - get_near_pid(Pids, Node) - end. - --spec get_near_pid([pid(),...], node()) -> pid(). - -get_near_pid([Pid | _], Node) when node(Pid) =:= Node -> Pid; -get_near_pid([Pid], _ ) -> Pid; -get_near_pid([_ | T], Node) -> get_near_pid(T, Node). - monitor_request(Pid, Req) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Req}, @@ -1964,14 +1901,11 @@ add_ext(File, Ext) -> lists:concat([File, ".", Ext]). notify(Log, R) -> - case disk_log_server:get_log_pids(Log) of + case disk_log_server:get_log_pid(Log) of undefined -> {error, no_such_log}; - {local, Pid} -> + Pid -> Pid ! R, - ok; - {distributed, Pids} -> - lists:foreach(fun(Pid) -> Pid ! R end, Pids), ok end. diff --git a/lib/kernel/src/disk_log.hrl b/lib/kernel/src/disk_log.hrl index a362881f40..3cd124b8a7 100644 --- a/lib/kernel/src/disk_log.hrl +++ b/lib/kernel/src/disk_log.hrl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1997-2017. All Rights Reserved. +%% Copyright Ericsson AB 1997-2020. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ -type dlog_mode() :: 'read_only' | 'read_write'. -type dlog_name() :: atom() | string(). -type dlog_optattr() :: 'name' | 'file' | 'linkto' | 'repair' | 'type' - | 'format' | 'size' | 'distributed' | 'notify' + | 'format' | 'size' | 'notify' | 'head' | 'head_func' | 'mode'. -type dlog_option() :: {name, Log :: log()} | {file, FileName :: file:filename()} @@ -72,7 +72,6 @@ | {type, Type :: dlog_type()} | {format, Format :: dlog_format()} | {size, Size :: dlog_size()} - | {distributed, Nodes :: [node()]} | {notify, boolean()} | {head, Head :: dlog_head_opt()} | {head_func, MFA :: {atom(), atom(), list()}} @@ -97,7 +96,6 @@ repair = true :: dlog_repair(), size = infinity :: dlog_size(), type = halt :: dlog_type(), - distributed = false :: 'false' | {'true', [node()]}, format = internal :: dlog_format(), linkto = self() :: 'none' | pid(), head = none, diff --git a/lib/kernel/src/disk_log_server.erl b/lib/kernel/src/disk_log_server.erl index 2e22f28b14..ea8cbc808e 100644 --- a/lib/kernel/src/disk_log_server.erl +++ b/lib/kernel/src/disk_log_server.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1997-2016. All Rights Reserved. +%% Copyright Ericsson AB 1997-2020. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,10 +21,7 @@ -behaviour(gen_server). -export([start_link/0, start/0, open/1, close/1, - get_log_pids/1, accessible_logs/0]). - -%% Local export. --export([dist_open/1, get_local_pid/1]). + get_log_pid/1, all/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_info/2, terminate/2]). @@ -32,22 +29,15 @@ -include("disk_log.hrl"). --compile({inline,[{do_get_log_pids,1}]}). +-compile({inline,[{do_get_log_pid,1}]}). -record(pending, {log, pid, req, from, attach, clients}). % [{Request,From}] -record(state, {pending = [] :: [#pending{}]}). --compile({nowarn_deprecated_function, [{pg2, create, 1}]}). --compile({nowarn_deprecated_function, [{pg2, join, 2}]}). --compile({nowarn_deprecated_function, [{pg2, leave, 2}]}). --compile({nowarn_deprecated_function, [{pg2, which_groups, 0}]}). --compile({nowarn_deprecated_function, [{pg2, get_members, 1}]}). - %%%----------------------------------------------------------------- %%% This module implements the disk_log server. Its primary purpose -%%% is to keep the ets table 'disk_log_names' updated and to handle -%%% distribution data (pids) using the module pg2. +%%% is to keep the ets table 'disk_log_names' updated. %%%----------------------------------------------------------------- %%%---------------------------------------------------------------------- %%% API @@ -60,42 +50,32 @@ start() -> open({ok, A}) -> ensure_started(), - gen_server:call(disk_log_server, {open, local, A}, infinity); + gen_server:call(disk_log_server, {open, A}, infinity); open(Other) -> Other. -%% To be used from this module only. -dist_open(A) -> - ensure_started(), - gen_server:call(disk_log_server, {open, distr, A}, infinity). - close(Pid) -> gen_server:call(disk_log_server, {close, Pid}, infinity). -get_log_pids(LogName) -> - do_get_log_pids(LogName). +get_log_pid(LogName) -> + do_get_log_pid(LogName). -accessible_logs() -> +all() -> ensure_started(), - do_accessible_logs(). + do_all(). %%%---------------------------------------------------------------------- %%% Callback functions from gen_server %%%---------------------------------------------------------------------- -%% It would have been really nice to have a tag for disk log groups, -%% like {distributed_disk_log, Log}, but backward compatibility makes -%% it hard to introduce. --define(group(Log), Log). - init([]) -> process_flag(trap_exit, true), _ = ets:new(?DISK_LOG_NAME_TABLE, [named_table, set]), _= ets:new(?DISK_LOG_PID_TABLE, [named_table, set]), {ok, #state{}}. -handle_call({open, W, A}, From, State) -> - open([{{open, W, A}, From}], State); +handle_call({open, A}, From, State) -> + open([{{open, A}, From}], State); handle_call({close, Pid}, _From, State) -> Reply = do_close(Pid), {reply, Reply, State}. @@ -120,17 +100,10 @@ handle_info({pending_reply, Pid, Result0}, State) -> _ -> put(Pid, Name), link(Pid), - {_, Locality, _} = Request, ets:insert(?DISK_LOG_PID_TABLE, {Pid, Name}), - ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid, Locality}), - if - Locality =:= distr -> - ok = pg2:join(?group(Name), Pid); - true -> - ok - end + ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid}) end, - gen_server:reply(From, result(Request, Result0)), + gen_server:reply(From, Result0), open(Clients, State1) end; handle_info({'EXIT', Pid, _Reason}, State) -> @@ -199,89 +172,29 @@ open([], State) -> %% -> {OpenRet, NewState} | {{node(),OpenRet}, NewState} | %% {pending, NewState} -do_open({open, W, #arg{name = Name}=A}=Req, From, State) -> +do_open({open, #arg{name = Name}}=Req, From, State) -> case check_pending(Name, From, State, Req) of {pending, NewState} -> {pending, NewState}; - false when W =:= local -> - case A#arg.distributed of - {true, Nodes} -> - Fun = open_distr_rpc_fun(Nodes, A, From), - _Pid = spawn(Fun), - %% No pending reply is expected, but don't reply yet. - {pending, State}; - false -> - case get_local_pid(Name) of - {local, Pid} -> - do_internal_open(Name, Pid, From, Req, true,State); - {distributed, _Pid} -> - {{error, {node_already_open, Name}}, State}; - undefined -> - start_log(Name, Req, From, State) - end - end; - false when W =:= distr -> - ok = pg2:create(?group(Name)), - case get_local_pid(Name) of + false -> + case do_get_log_pid(Name) of undefined -> start_log(Name, Req, From, State); - {local, _Pid} -> - {{node(),{error, {node_already_open, Name}}}, State}; - {distributed, Pid} -> - do_internal_open(Name, Pid, From, Req, true, State) + Pid -> + do_internal_open(Name, Pid, From, Req, true,State) end end. --spec open_distr_rpc_fun([node()], _, _) -> % XXX: underspecified - fun(() -> no_return()). - -open_distr_rpc_fun(Nodes, A, From) -> - fun() -> open_distr_rpc(Nodes, A, From) end. - -%% Spawning a process is a means to avoid deadlock when -%% disk_log_servers mutually open disk_logs. - -open_distr_rpc(Nodes, A, From) -> - {AllReplies, BadNodes} = rpc:multicall(Nodes, ?MODULE, dist_open, [A]), - {Ok, Bad} = cr(AllReplies, [], []), - Old = find_old_nodes(Nodes, AllReplies, BadNodes), - NotOk = [{BadNode, {error, nodedown}} || BadNode <- BadNodes ++ Old], - Reply = {Ok, Bad ++ NotOk}, - %% Send the reply to the waiting client: - gen_server:reply(From, Reply), - exit(normal). - -cr([{badrpc, {'EXIT', _}} | T], Nodes, Bad) -> - %% This clause can be removed in next release. - cr(T, Nodes, Bad); -cr([R={_Node, {error, _}} | T], Nodes, Bad) -> - cr(T, Nodes, [R | Bad]); -cr([Reply | T], Nodes, Bad) -> - cr(T, [Reply | Nodes], Bad); -cr([], Nodes, Bad) -> - {Nodes, Bad}. - -%% If a "new" node (one that calls dist_open/1) tries to open a log -%% on an old node (one that does not have dist_open/1), then the old -%% node is considered 'down'. In next release, this test will not be -%% needed since all nodes can be assumed to be "new" by then. -%% One more thing: if an old node tries to open a log on a new node, -%% the new node is also considered 'down'. -find_old_nodes(Nodes, Replies, BadNodes) -> - R = [X || {X, _} <- Replies], - ordsets:to_list(ordsets:subtract(ordsets:from_list(Nodes), - ordsets:from_list(R ++ BadNodes))). - start_log(Name, Req, From, State) -> Server = self(), case supervisor:start_child(disk_log_sup, [Server]) of {ok, Pid} -> do_internal_open(Name, Pid, From, Req, false, State); Error -> - {result(Req, Error), State} + {Error, State} end. -do_internal_open(Name, Pid, From, {open, _W, A}=Req, Attach, State) -> +do_internal_open(Name, Pid, From, {open, A}=Req, Attach, State) -> Server = self(), F = fun() -> Res = disk_log:internal_open(Pid, A), @@ -303,11 +216,6 @@ check_pending(Name, From, State, Req) -> false end. -result({_, distr, _}, R) -> - {node(), R}; -result({_, local, _}, R) -> - R. - do_close(Pid) -> case get(Pid) of undefined -> @@ -319,71 +227,28 @@ do_close(Pid) -> end. erase_log(Name, Pid) -> - case get_local_pid(Name) of + case do_get_log_pid(Name) of undefined -> ok; - {local, Pid} -> + Pid -> true = ets:delete(?DISK_LOG_NAME_TABLE, Name), - true = ets:delete(?DISK_LOG_PID_TABLE, Pid); - {distributed, Pid} -> - true = ets:delete(?DISK_LOG_NAME_TABLE, Name), - true = ets:delete(?DISK_LOG_PID_TABLE, Pid), - ok = pg2:leave(?group(Name), Pid) + true = ets:delete(?DISK_LOG_PID_TABLE, Pid) end, erase(Pid). -do_accessible_logs() -> +do_all() -> LocalSpec = {'$1','_',local}, Local0 = [hd(L) || L <- ets:match(?DISK_LOG_NAME_TABLE, LocalSpec)], - Local = lists:sort(Local0), - Groups0 = ordsets:from_list(pg2:which_groups()), - Groups = ordsets:to_list(ordsets:subtract(Groups0, Local)), - Dist = [L || L <- Groups, dist_pids(L) =/= []], - {Local, Dist}. - -get_local_pid(LogName) -> - case ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of - [{LogName, Pid, local}] -> - {local, Pid}; - [{LogName, Pid, distr}] -> - {distributed, Pid}; - [] -> - undefined - end. + lists:sort(Local0). %% Inlined. -do_get_log_pids(LogName) -> - case catch ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of - [{LogName, Pid, local}] -> - {local, Pid}; - [{LogName, _Pid, distr}] -> - case pg2:get_members(?group(LogName)) of - [] -> % The disk_log process has died recently - undefined; - Members -> - {distributed, Members} - end; - _EmptyOrError -> - case dist_pids(LogName) of - [] -> undefined; - Pids -> {distributed, Pids} - end - end. - -dist_pids(LogName) -> - %% Would be much simpler if disk log group names were tagged. - GroupName = ?group(LogName), - case catch pg2:get_members(GroupName) of - [Pid | _] = Pids -> - case rpc:call(node(Pid), ?MODULE, get_local_pid, [LogName]) of - undefined -> % does not seem to be a disk_log group - case catch lists:member(Pid,pg2:get_members(GroupName)) of - true -> []; - _ -> dist_pids(LogName) - end; - _ -> % badrpc if get_local_pid is not exported - Pids - end; - _ -> - [] +do_get_log_pid(LogName) -> + try ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of + [{LogName, Pid}] -> + Pid; + [] -> + undefined + catch + _:_ -> + undefined end. |