summaryrefslogtreecommitdiff
path: root/lib/kernel/src
diff options
context:
space:
mode:
authorHans Bolinder <hasse@erlang.org>2020-08-21 09:38:25 +0200
committerHans Bolinder <hasse@erlang.org>2020-10-05 08:55:08 +0200
commit0e71844663a10d6b237cc96804cd000f12d22709 (patch)
tree9c6b964cb6c18302d4200060dc1a6402dae5130d /lib/kernel/src
parentcd23d705c0a15740506dfdb9fc7a6375cd13c0ca (diff)
downloaderlang-0e71844663a10d6b237cc96804cd000f12d22709.tar.gz
kernel: Remove support for distributed disk logs
disk_log:all/0 is a new function, which is to be used instead of disk_log:accessible_logs/0. disk_log:accessible_logs/0 and disk_log:lclose/1,2 are deprecated.
Diffstat (limited to 'lib/kernel/src')
-rw-r--r--lib/kernel/src/disk_log.erl124
-rw-r--r--lib/kernel/src/disk_log.hrl6
-rw-r--r--lib/kernel/src/disk_log_server.erl203
3 files changed, 65 insertions, 268 deletions
diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl
index 6b3eb35f92..89a9bd2f51 100644
--- a/lib/kernel/src/disk_log.erl
+++ b/lib/kernel/src/disk_log.erl
@@ -30,7 +30,7 @@
change_notify/3, change_header/2,
chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1,
block/1, block/2, unblock/1, info/1, format_error/1,
- accessible_logs/0]).
+ accessible_logs/0, all/0]).
%% Internal exports
-export([init/2, internal_open/2,
@@ -47,6 +47,10 @@
-export_type([continuation/0]).
+-deprecated([{accessible_logs, 0, "use disk_log:all/0 instead"},
+ {lclose, 1, "use disk_log:close/1 instead"},
+ {lclose, 2, "use disk_log:close/1 instead"}]).
+
-type dlog_state_error() :: 'ok' | {'error', term()}.
-record(state, {queue = [],
@@ -102,16 +106,13 @@
| {'invalid_header', invalid_header()}
| {'file_error', file:filename(), file_error()}
| {'node_already_open', Log :: log()}.
--type dist_error_rsn() :: 'nodedown' | open_error_rsn().
--type ret() :: {'ok', Log :: log()}
+-type open_ret() :: {'ok', Log :: log()}
| {'repaired', Log :: log(),
{'recovered', Rec :: non_neg_integer()},
- {'badbytes', Bad :: non_neg_integer()}}.
--type open_ret() :: ret() | {'error', open_error_rsn()}.
--type dist_open_ret() :: {[{node(), ret()}],
- [{node(), {'error', dist_error_rsn()}}]}.
+ {'badbytes', Bad :: non_neg_integer()}}
+ | {'error', open_error_rsn()}.
--spec open(ArgL) -> open_ret() | dist_open_ret() when
+-spec open(ArgL) -> open_ret() when
ArgL :: dlog_options().
open(A) ->
disk_log_server:open(check_arg(A, #arg{options = A})).
@@ -195,8 +196,10 @@ lclose(Log) ->
-spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when
Log :: log(),
Node :: node().
-lclose(Log, Node) ->
- lreq(Log, close, Node).
+lclose(Log, Node) when node() =:= Node ->
+ req(Log, close);
+lclose(_Log, _Node) ->
+ {error, no_such_log}.
-type trunc_error_rsn() :: 'no_such_log' | 'nonode'
| {'read_only_mode', log()}
@@ -337,7 +340,6 @@ format_error(Error) ->
| {status, Status ::
ok | {blocked, QueueLogRecords :: boolean()}}
| {node, Node :: node()}
- | {distributed, Dist :: local | [node()]}
| {head, Head :: none
| {head, binary()}
| (MFA :: {atom(), atom(), list()})}
@@ -353,7 +355,7 @@ format_error(Error) ->
Log :: log(),
InfoList :: [dlog_info()].
info(Log) ->
- sreq(Log, info).
+ req(Log, info).
-spec pid2name(Pid) -> {'ok', Log} | 'undefined' when
Pid :: pid(),
@@ -401,7 +403,7 @@ chunk(Log, Cont, N) when is_integer(N), N > 0 ->
ichunk(Log, Cont, N).
ichunk(Log, start, N) ->
- R = sreq(Log, {chunk, 0, [], N}),
+ R = req(Log, {chunk, 0, [], N}),
ichunk_end(R, Log);
ichunk(Log, More, N) when is_record(More, continuation) ->
R = req2(More#continuation.pid,
@@ -484,7 +486,7 @@ bchunk(Log, Cont, N) when is_integer(N), N > 0 ->
bichunk(Log, Cont, N).
bichunk(Log, start, N) ->
- R = sreq(Log, {chunk, 0, [], N}),
+ R = req(Log, {chunk, 0, [], N}),
bichunk_end(R);
bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) ->
R = req2(Pid, {chunk, Pos, B, N}),
@@ -511,7 +513,7 @@ chunk_step(Log, Cont, N) when is_integer(N) ->
ichunk_step(Log, Cont, N).
ichunk_step(Log, start, N) ->
- sreq(Log, {chunk_step, 0, N});
+ req(Log, {chunk_step, 0, N});
ichunk_step(_Log, More, N) when is_record(More, continuation) ->
req2(More#continuation.pid, {chunk_step, More#continuation.pos, N});
ichunk_step(_Log, _, _) ->
@@ -526,11 +528,15 @@ chunk_info(More = #continuation{}) ->
chunk_info(BadCont) ->
{error, {no_continuation, BadCont}}.
--spec accessible_logs() -> {[LocalLog], [DistributedLog]} when
- LocalLog :: log(),
- DistributedLog :: log().
+-spec accessible_logs() -> {[Log], []} when
+ Log :: log().
accessible_logs() ->
- disk_log_server:accessible_logs().
+ {disk_log_server:all(), []}.
+
+-spec all() -> [Log] when
+ Log :: log().
+all() ->
+ disk_log_server:all().
istart_link(Server) ->
{ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}.
@@ -622,10 +628,6 @@ check_arg([{format, internal}|Tail], Res) ->
check_arg(Tail, Res#arg{format = internal});
check_arg([{format, external}|Tail], Res) ->
check_arg(Tail, Res#arg{format = external});
-check_arg([{distributed, []}|Tail], Res) ->
- check_arg(Tail, Res#arg{distributed = false});
-check_arg([{distributed, Nodes}|Tail], Res) when is_list(Nodes) ->
- check_arg(Tail, Res#arg{distributed = {true, Nodes}});
check_arg([{notify, true}|Tail], Res) ->
check_arg(Tail, Res#arg{notify = true});
check_arg([{notify, false}|Tail], Res) ->
@@ -1519,9 +1521,6 @@ do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) ->
"the current value ~tp~n", [ArgValue, Option, FirstValue]);
do_format_error({name_already_open, Log}) ->
io_lib:format("The disk log ~tp has already opened another file~n", [Log]);
-do_format_error({node_already_open, Log}) ->
- io_lib:format("The distribution option of the disk log ~tp does not match "
- "already open log~n", [Log]);
do_format_error({open_read_write, Log}) ->
io_lib:format("The disk log ~tp has already been opened read-write~n",
[Log]);
@@ -1573,15 +1572,6 @@ do_info(L, Cnt) ->
halt ->
Extra#halt.size
end,
- Distribution =
- case disk_log_server:get_log_pids(Name) of
- {local, _Pid} ->
- local;
- {distributed, Pids} ->
- [node(P) || P <- Pids];
- undefined -> % "cannot happen"
- []
- end,
RW = case Type of
wrap when Mode =:= read_write ->
#handle{curB = CurB, curF = CurF,
@@ -1629,8 +1619,7 @@ do_info(L, Cnt) ->
HeadL ++
[{mode, Mode},
{status, Status},
- {node, node()},
- {distributed, Distribution}
+ {node, node()}
],
Common ++ RW.
@@ -1861,65 +1850,13 @@ reply(To, Reply, S) ->
loop(S).
req(Log, R) ->
- case disk_log_server:get_log_pids(Log) of
- {local, Pid} ->
- monitor_request(Pid, R);
- undefined ->
- {error, no_such_log};
- {distributed, Pids} ->
- multi_req({self(), R}, Pids)
- end.
-
-multi_req(Msg, Pids) ->
- Refs =
- lists:map(fun(Pid) ->
- Ref = erlang:monitor(process, Pid),
- Pid ! Msg,
- {Pid, Ref}
- end, Pids),
- lists:foldl(fun({Pid, Ref}, Reply) ->
- receive
- {'DOWN', Ref, process, Pid, _Info} ->
- Reply;
- {disk_log, Pid, _Reply} ->
- erlang:demonitor(Ref, [flush]),
- ok
- end
- end, {error, nonode}, Refs).
-
-sreq(Log, R) ->
- case nearby_pid(Log, node()) of
+ case disk_log_server:get_log_pid(Log) of
undefined ->
{error, no_such_log};
Pid ->
monitor_request(Pid, R)
end.
-%% Local req - always talk to log on Node
-lreq(Log, R, Node) ->
- case nearby_pid(Log, Node) of
- Pid when is_pid(Pid), node(Pid) =:= Node ->
- monitor_request(Pid, R);
- _Else ->
- {error, no_such_log}
- end.
-
-nearby_pid(Log, Node) ->
- case disk_log_server:get_log_pids(Log) of
- undefined ->
- undefined;
- {local, Pid} ->
- Pid;
- {distributed, Pids} ->
- get_near_pid(Pids, Node)
- end.
-
--spec get_near_pid([pid(),...], node()) -> pid().
-
-get_near_pid([Pid | _], Node) when node(Pid) =:= Node -> Pid;
-get_near_pid([Pid], _ ) -> Pid;
-get_near_pid([_ | T], Node) -> get_near_pid(T, Node).
-
monitor_request(Pid, Req) ->
Ref = erlang:monitor(process, Pid),
Pid ! {self(), Req},
@@ -1964,14 +1901,11 @@ add_ext(File, Ext) ->
lists:concat([File, ".", Ext]).
notify(Log, R) ->
- case disk_log_server:get_log_pids(Log) of
+ case disk_log_server:get_log_pid(Log) of
undefined ->
{error, no_such_log};
- {local, Pid} ->
+ Pid ->
Pid ! R,
- ok;
- {distributed, Pids} ->
- lists:foreach(fun(Pid) -> Pid ! R end, Pids),
ok
end.
diff --git a/lib/kernel/src/disk_log.hrl b/lib/kernel/src/disk_log.hrl
index a362881f40..3cd124b8a7 100644
--- a/lib/kernel/src/disk_log.hrl
+++ b/lib/kernel/src/disk_log.hrl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2017. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2020. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -63,7 +63,7 @@
-type dlog_mode() :: 'read_only' | 'read_write'.
-type dlog_name() :: atom() | string().
-type dlog_optattr() :: 'name' | 'file' | 'linkto' | 'repair' | 'type'
- | 'format' | 'size' | 'distributed' | 'notify'
+ | 'format' | 'size' | 'notify'
| 'head' | 'head_func' | 'mode'.
-type dlog_option() :: {name, Log :: log()}
| {file, FileName :: file:filename()}
@@ -72,7 +72,6 @@
| {type, Type :: dlog_type()}
| {format, Format :: dlog_format()}
| {size, Size :: dlog_size()}
- | {distributed, Nodes :: [node()]}
| {notify, boolean()}
| {head, Head :: dlog_head_opt()}
| {head_func, MFA :: {atom(), atom(), list()}}
@@ -97,7 +96,6 @@
repair = true :: dlog_repair(),
size = infinity :: dlog_size(),
type = halt :: dlog_type(),
- distributed = false :: 'false' | {'true', [node()]},
format = internal :: dlog_format(),
linkto = self() :: 'none' | pid(),
head = none,
diff --git a/lib/kernel/src/disk_log_server.erl b/lib/kernel/src/disk_log_server.erl
index 2e22f28b14..ea8cbc808e 100644
--- a/lib/kernel/src/disk_log_server.erl
+++ b/lib/kernel/src/disk_log_server.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1997-2016. All Rights Reserved.
+%% Copyright Ericsson AB 1997-2020. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -21,10 +21,7 @@
-behaviour(gen_server).
-export([start_link/0, start/0, open/1, close/1,
- get_log_pids/1, accessible_logs/0]).
-
-%% Local export.
--export([dist_open/1, get_local_pid/1]).
+ get_log_pid/1, all/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_info/2, terminate/2]).
@@ -32,22 +29,15 @@
-include("disk_log.hrl").
--compile({inline,[{do_get_log_pids,1}]}).
+-compile({inline,[{do_get_log_pid,1}]}).
-record(pending, {log, pid, req, from, attach, clients}). % [{Request,From}]
-record(state, {pending = [] :: [#pending{}]}).
--compile({nowarn_deprecated_function, [{pg2, create, 1}]}).
--compile({nowarn_deprecated_function, [{pg2, join, 2}]}).
--compile({nowarn_deprecated_function, [{pg2, leave, 2}]}).
--compile({nowarn_deprecated_function, [{pg2, which_groups, 0}]}).
--compile({nowarn_deprecated_function, [{pg2, get_members, 1}]}).
-
%%%-----------------------------------------------------------------
%%% This module implements the disk_log server. Its primary purpose
-%%% is to keep the ets table 'disk_log_names' updated and to handle
-%%% distribution data (pids) using the module pg2.
+%%% is to keep the ets table 'disk_log_names' updated.
%%%-----------------------------------------------------------------
%%%----------------------------------------------------------------------
%%% API
@@ -60,42 +50,32 @@ start() ->
open({ok, A}) ->
ensure_started(),
- gen_server:call(disk_log_server, {open, local, A}, infinity);
+ gen_server:call(disk_log_server, {open, A}, infinity);
open(Other) ->
Other.
-%% To be used from this module only.
-dist_open(A) ->
- ensure_started(),
- gen_server:call(disk_log_server, {open, distr, A}, infinity).
-
close(Pid) ->
gen_server:call(disk_log_server, {close, Pid}, infinity).
-get_log_pids(LogName) ->
- do_get_log_pids(LogName).
+get_log_pid(LogName) ->
+ do_get_log_pid(LogName).
-accessible_logs() ->
+all() ->
ensure_started(),
- do_accessible_logs().
+ do_all().
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
-%% It would have been really nice to have a tag for disk log groups,
-%% like {distributed_disk_log, Log}, but backward compatibility makes
-%% it hard to introduce.
--define(group(Log), Log).
-
init([]) ->
process_flag(trap_exit, true),
_ = ets:new(?DISK_LOG_NAME_TABLE, [named_table, set]),
_= ets:new(?DISK_LOG_PID_TABLE, [named_table, set]),
{ok, #state{}}.
-handle_call({open, W, A}, From, State) ->
- open([{{open, W, A}, From}], State);
+handle_call({open, A}, From, State) ->
+ open([{{open, A}, From}], State);
handle_call({close, Pid}, _From, State) ->
Reply = do_close(Pid),
{reply, Reply, State}.
@@ -120,17 +100,10 @@ handle_info({pending_reply, Pid, Result0}, State) ->
_ ->
put(Pid, Name),
link(Pid),
- {_, Locality, _} = Request,
ets:insert(?DISK_LOG_PID_TABLE, {Pid, Name}),
- ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid, Locality}),
- if
- Locality =:= distr ->
- ok = pg2:join(?group(Name), Pid);
- true ->
- ok
- end
+ ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid})
end,
- gen_server:reply(From, result(Request, Result0)),
+ gen_server:reply(From, Result0),
open(Clients, State1)
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
@@ -199,89 +172,29 @@ open([], State) ->
%% -> {OpenRet, NewState} | {{node(),OpenRet}, NewState} |
%% {pending, NewState}
-do_open({open, W, #arg{name = Name}=A}=Req, From, State) ->
+do_open({open, #arg{name = Name}}=Req, From, State) ->
case check_pending(Name, From, State, Req) of
{pending, NewState} ->
{pending, NewState};
- false when W =:= local ->
- case A#arg.distributed of
- {true, Nodes} ->
- Fun = open_distr_rpc_fun(Nodes, A, From),
- _Pid = spawn(Fun),
- %% No pending reply is expected, but don't reply yet.
- {pending, State};
- false ->
- case get_local_pid(Name) of
- {local, Pid} ->
- do_internal_open(Name, Pid, From, Req, true,State);
- {distributed, _Pid} ->
- {{error, {node_already_open, Name}}, State};
- undefined ->
- start_log(Name, Req, From, State)
- end
- end;
- false when W =:= distr ->
- ok = pg2:create(?group(Name)),
- case get_local_pid(Name) of
+ false ->
+ case do_get_log_pid(Name) of
undefined ->
start_log(Name, Req, From, State);
- {local, _Pid} ->
- {{node(),{error, {node_already_open, Name}}}, State};
- {distributed, Pid} ->
- do_internal_open(Name, Pid, From, Req, true, State)
+ Pid ->
+ do_internal_open(Name, Pid, From, Req, true,State)
end
end.
--spec open_distr_rpc_fun([node()], _, _) -> % XXX: underspecified
- fun(() -> no_return()).
-
-open_distr_rpc_fun(Nodes, A, From) ->
- fun() -> open_distr_rpc(Nodes, A, From) end.
-
-%% Spawning a process is a means to avoid deadlock when
-%% disk_log_servers mutually open disk_logs.
-
-open_distr_rpc(Nodes, A, From) ->
- {AllReplies, BadNodes} = rpc:multicall(Nodes, ?MODULE, dist_open, [A]),
- {Ok, Bad} = cr(AllReplies, [], []),
- Old = find_old_nodes(Nodes, AllReplies, BadNodes),
- NotOk = [{BadNode, {error, nodedown}} || BadNode <- BadNodes ++ Old],
- Reply = {Ok, Bad ++ NotOk},
- %% Send the reply to the waiting client:
- gen_server:reply(From, Reply),
- exit(normal).
-
-cr([{badrpc, {'EXIT', _}} | T], Nodes, Bad) ->
- %% This clause can be removed in next release.
- cr(T, Nodes, Bad);
-cr([R={_Node, {error, _}} | T], Nodes, Bad) ->
- cr(T, Nodes, [R | Bad]);
-cr([Reply | T], Nodes, Bad) ->
- cr(T, [Reply | Nodes], Bad);
-cr([], Nodes, Bad) ->
- {Nodes, Bad}.
-
-%% If a "new" node (one that calls dist_open/1) tries to open a log
-%% on an old node (one that does not have dist_open/1), then the old
-%% node is considered 'down'. In next release, this test will not be
-%% needed since all nodes can be assumed to be "new" by then.
-%% One more thing: if an old node tries to open a log on a new node,
-%% the new node is also considered 'down'.
-find_old_nodes(Nodes, Replies, BadNodes) ->
- R = [X || {X, _} <- Replies],
- ordsets:to_list(ordsets:subtract(ordsets:from_list(Nodes),
- ordsets:from_list(R ++ BadNodes))).
-
start_log(Name, Req, From, State) ->
Server = self(),
case supervisor:start_child(disk_log_sup, [Server]) of
{ok, Pid} ->
do_internal_open(Name, Pid, From, Req, false, State);
Error ->
- {result(Req, Error), State}
+ {Error, State}
end.
-do_internal_open(Name, Pid, From, {open, _W, A}=Req, Attach, State) ->
+do_internal_open(Name, Pid, From, {open, A}=Req, Attach, State) ->
Server = self(),
F = fun() ->
Res = disk_log:internal_open(Pid, A),
@@ -303,11 +216,6 @@ check_pending(Name, From, State, Req) ->
false
end.
-result({_, distr, _}, R) ->
- {node(), R};
-result({_, local, _}, R) ->
- R.
-
do_close(Pid) ->
case get(Pid) of
undefined ->
@@ -319,71 +227,28 @@ do_close(Pid) ->
end.
erase_log(Name, Pid) ->
- case get_local_pid(Name) of
+ case do_get_log_pid(Name) of
undefined ->
ok;
- {local, Pid} ->
+ Pid ->
true = ets:delete(?DISK_LOG_NAME_TABLE, Name),
- true = ets:delete(?DISK_LOG_PID_TABLE, Pid);
- {distributed, Pid} ->
- true = ets:delete(?DISK_LOG_NAME_TABLE, Name),
- true = ets:delete(?DISK_LOG_PID_TABLE, Pid),
- ok = pg2:leave(?group(Name), Pid)
+ true = ets:delete(?DISK_LOG_PID_TABLE, Pid)
end,
erase(Pid).
-do_accessible_logs() ->
+do_all() ->
LocalSpec = {'$1','_',local},
Local0 = [hd(L) || L <- ets:match(?DISK_LOG_NAME_TABLE, LocalSpec)],
- Local = lists:sort(Local0),
- Groups0 = ordsets:from_list(pg2:which_groups()),
- Groups = ordsets:to_list(ordsets:subtract(Groups0, Local)),
- Dist = [L || L <- Groups, dist_pids(L) =/= []],
- {Local, Dist}.
-
-get_local_pid(LogName) ->
- case ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
- [{LogName, Pid, local}] ->
- {local, Pid};
- [{LogName, Pid, distr}] ->
- {distributed, Pid};
- [] ->
- undefined
- end.
+ lists:sort(Local0).
%% Inlined.
-do_get_log_pids(LogName) ->
- case catch ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
- [{LogName, Pid, local}] ->
- {local, Pid};
- [{LogName, _Pid, distr}] ->
- case pg2:get_members(?group(LogName)) of
- [] -> % The disk_log process has died recently
- undefined;
- Members ->
- {distributed, Members}
- end;
- _EmptyOrError ->
- case dist_pids(LogName) of
- [] -> undefined;
- Pids -> {distributed, Pids}
- end
- end.
-
-dist_pids(LogName) ->
- %% Would be much simpler if disk log group names were tagged.
- GroupName = ?group(LogName),
- case catch pg2:get_members(GroupName) of
- [Pid | _] = Pids ->
- case rpc:call(node(Pid), ?MODULE, get_local_pid, [LogName]) of
- undefined -> % does not seem to be a disk_log group
- case catch lists:member(Pid,pg2:get_members(GroupName)) of
- true -> [];
- _ -> dist_pids(LogName)
- end;
- _ -> % badrpc if get_local_pid is not exported
- Pids
- end;
- _ ->
- []
+do_get_log_pid(LogName) ->
+ try ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
+ [{LogName, Pid}] ->
+ Pid;
+ [] ->
+ undefined
+ catch
+ _:_ ->
+ undefined
end.