summaryrefslogtreecommitdiff
path: root/lib/kernel/src/inet_tcp_dist.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/inet_tcp_dist.erl')
-rw-r--r--lib/kernel/src/inet_tcp_dist.erl492
1 files changed, 268 insertions, 224 deletions
diff --git a/lib/kernel/src/inet_tcp_dist.erl b/lib/kernel/src/inet_tcp_dist.erl
index b53de6281b..8c5c8a1cf2 100644
--- a/lib/kernel/src/inet_tcp_dist.erl
+++ b/lib/kernel/src/inet_tcp_dist.erl
@@ -1,8 +1,8 @@
%%
%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1997-2022. All Rights Reserved.
-%%
+%%
+%% Copyright Ericsson AB 1997-2023. All Rights Reserved.
+%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
@@ -14,10 +14,11 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-%%
+%%
%% %CopyrightEnd%
%%
-module(inet_tcp_dist).
+-feature(maybe_expr, enable).
%% Handles the connection setup phase with other Erlang nodes.
@@ -30,6 +31,11 @@
%% Generalized dist API
-export([gen_listen/3, gen_accept/2, gen_accept_connection/6,
gen_setup/6, gen_select/2, gen_address/1]).
+-export([fam_select/2, fam_address/1, fam_listen/4, fam_setup/4]).
+%% OTP internal (e.g ssl)
+-export([gen_hs_data/2, nodelay/0]).
+
+-export([merge_options/2, merge_options/3]).
%% internal exports
@@ -42,20 +48,28 @@
-include("dist.hrl").
-include("dist_util.hrl").
+-define(DRIVER, inet_tcp).
+-define(PROTOCOL, tcp).
+
%% ------------------------------------------------------------
%% Select this protocol based on node name
%% select(Node) => Bool
%% ------------------------------------------------------------
select(Node) ->
- gen_select(inet_tcp, Node).
+ gen_select(?DRIVER, Node).
gen_select(Driver, Node) ->
+ fam_select(Driver:family(), Node).
+
+fam_select(Family, Node) ->
case dist_util:split_node(Node) of
{node, Name, Host} ->
- case call_epmd_function(
- net_kernel:epmd_module(), address_please,
- [Name, Host, Driver:family()]) of
+ EpmdMod = net_kernel:epmd_module(),
+ case
+ call_epmd_function(
+ EpmdMod, address_please, [Name, Host, Family])
+ of
{ok, _Addr} -> true;
{ok, _Addr, _Port, _Creation} -> true;
_ -> false
@@ -67,47 +81,115 @@ gen_select(Driver, Node) ->
%% Get the address family that this distribution uses
%% ------------------------------------------------------------
address() ->
- gen_address(inet_tcp).
+ gen_address(?DRIVER).
+
gen_address(Driver) ->
- get_tcp_address(Driver).
+ fam_address(Driver:family()).
+
+fam_address(Family) ->
+ {ok, Host} = inet:gethostname(),
+ #net_address{
+ host = Host,
+ protocol = ?PROTOCOL,
+ family = Family
+ }.
+
+%% ------------------------------------------------------------
+%% Set up the general fields in #hs_data{}
+%% ------------------------------------------------------------
+gen_hs_data(Driver, Socket) ->
+ %% The only thing Driver actually is used for is to
+ %% implement non-blocking send of distribution tick
+ Nodelay = nodelay(),
+ #hs_data{
+ socket = Socket,
+ f_send = fun Driver:send/2,
+ f_recv = fun Driver:recv/3,
+ f_setopts_pre_nodeup =
+ fun (S) ->
+ inet:setopts(
+ S,
+ [{active, false}, {packet, 4}, Nodelay])
+ end,
+ f_setopts_post_nodeup =
+ fun (S) ->
+ inet:setopts(
+ S,
+ [{active, true}, {packet,4},
+ {deliver, port}, binary, Nodelay])
+ end,
+ f_getll = fun inet:getll/1,
+ mf_tick = fun (S) -> ?MODULE:tick(Driver, S) end,
+ mf_getstat = fun ?MODULE:getstat/1,
+ mf_setopts = fun ?MODULE:setopts/2,
+ mf_getopts = fun ?MODULE:getopts/2}.
%% ------------------------------------------------------------
%% Create the listen socket, i.e. the port that this erlang
%% node is accessible through.
%% ------------------------------------------------------------
-listen(Name, Host) ->
- gen_listen(inet_tcp, Name, Host).
-
-%% Keep this clause for third-party dist controllers reusing this API
+%% Keep this function for third-party dist controllers reusing this API
listen(Name) ->
{ok, Host} = inet:gethostname(),
listen(Name, Host).
+listen(Name, Host) ->
+ gen_listen(?DRIVER, Name, Host).
+
gen_listen(Driver, Name, Host) ->
- ErlEpmd = net_kernel:epmd_module(),
- case gen_listen(ErlEpmd, Name, Host, Driver) of
- {ok, Socket} ->
- TcpAddress = get_tcp_address(Driver, Socket),
- {_,Port} = TcpAddress#net_address.address,
- case ErlEpmd:register_node(Name, Port, Driver) of
- {ok, Creation} ->
- {ok, {Socket, TcpAddress, Creation}};
- Error ->
- Error
- end;
- Error ->
- Error
+ ForcedOptions = [{active, false}, {packet,2}, {nodelay, true}],
+ ListenFun =
+ fun (First, Last, ListenOptions) ->
+ listen_loop(
+ Driver, First, Last,
+ merge_options(ListenOptions, ForcedOptions))
+ end,
+ Family = Driver:family(),
+ maybe
+ %%
+ {ok, {ListenSocket, Address, Creation}} ?=
+ fam_listen(Family, Name, Host, ListenFun),
+ NetAddress =
+ #net_address{
+ host = Host,
+ protocol = ?PROTOCOL,
+ family = Family,
+ address = Address},
+ {ok, {ListenSocket, NetAddress, Creation}}
+ end.
+
+listen_loop(_Driver, First, Last, _Options) when First > Last ->
+ {error,eaddrinuse};
+listen_loop(Driver, First, Last, Options) ->
+ case Driver:listen(First, Options) of
+ {error, eaddrinuse} ->
+ listen_loop(Driver, First+1, Last, Options);
+ Other ->
+ Other
end.
-gen_listen(ErlEpmd, Name, Host, Driver) ->
- ListenOptions = listen_options(),
- case call_epmd_function(ErlEpmd, listen_port_please, [Name, Host]) of
- {ok, 0} ->
- {First,Last} = get_port_range(),
- do_listen(Driver, First, Last, ListenOptions);
- {ok, Prt} ->
- do_listen(Driver, Prt, Prt, ListenOptions)
+
+fam_listen(Family, Name, Host, ListenFun) ->
+ maybe
+ EpmdMod = net_kernel:epmd_module(),
+ %%
+ {ok, ListenSocket} ?=
+ case
+ call_epmd_function(
+ EpmdMod, listen_port_please, [Name, Host])
+ of
+ {ok, 0} ->
+ {First,Last} = get_port_range(),
+ ListenFun(First, Last, listen_options());
+ {ok, PortNum} ->
+ ListenFun(PortNum, PortNum, listen_options())
+ end,
+ {ok, {_IP,Port} = Address} = inet:sockname(ListenSocket),
+ %%
+ {ok, Creation} ?=
+ EpmdMod:register_node(Name, Port, Family),
+ {ok, {ListenSocket, Address, Creation}}
end.
get_port_range() ->
@@ -123,70 +205,89 @@ get_port_range() ->
{0,0}
end.
-do_listen(_Driver, First,Last,_) when First > Last ->
- {error,eaddrinuse};
-do_listen(Driver, First,Last,Options) ->
- case Driver:listen(First, Options) of
- {error, eaddrinuse} ->
- do_listen(Driver, First+1,Last,Options);
- Other ->
- Other
- end.
listen_options() ->
DefaultOpts = [{reuseaddr, true}, {backlog, 128}],
ForcedOpts =
- [{active, false}, {packet,2} |
- case application:get_env(kernel, inet_dist_use_interface) of
- {ok, Ip} -> [{ip, Ip}];
- undefined -> []
- end],
- Force = maps:from_list(ForcedOpts),
+ case application:get_env(kernel, inet_dist_use_interface) of
+ {ok, Ip} -> [{ip, Ip}];
+ undefined -> []
+ end,
InetDistListenOpts =
case application:get_env(kernel, inet_dist_listen_options) of
{ok, Opts} -> Opts;
undefined -> []
end,
- ListenOpts = listen_options(InetDistListenOpts, ForcedOpts, Force),
- Seen =
- maps:from_list(
- lists:filter(
- fun ({_,_}) -> true;
- (_) -> false
- end, ListenOpts)),
- lists:filter(
- fun ({OptName,_}) when is_map_key(OptName, Seen) ->
- false;
- (_) ->
- true
- end, DefaultOpts) ++ ListenOpts.
-
-%% Pass through all but forced
-listen_options([Opt | Opts], ForcedOpts, Force) ->
- case Opt of
- {OptName,_} ->
- case is_map_key(OptName, Force) of
+ merge_options(InetDistListenOpts, ForcedOpts, DefaultOpts).
+
+
+merge_options(Opts, ForcedOpts) ->
+ merge_options(Opts, ForcedOpts, []).
+%%
+merge_options(Opts, ForcedOpts, DefaultOpts) ->
+ Forced = merge_options(ForcedOpts),
+ Default = merge_options(DefaultOpts),
+ ForcedOpts ++ merge_options(Opts, Forced, DefaultOpts, Default).
+
+%% Collect expanded 2-tuple options in a map
+merge_options(Opts) ->
+ lists:foldr(
+ fun (Opt, Acc) ->
+ case expand_option(Opt) of
+ {OptName, OptVal} ->
+ maps:put(OptName, OptVal, Acc);
+ _ ->
+ Acc
+ end
+ end, #{}, Opts).
+
+%% Pass through all options that are not forced,
+%% which we already have prepended,
+%% and remove options that we see from the Default map
+%%
+merge_options([Opt | Opts], Forced, DefaultOpts, Default) ->
+ case expand_option(Opt) of
+ {OptName, _} ->
+ %% Remove from the Default map
+ Default_1 = maps:remove(OptName, Default),
+ if
+ is_map_key(OptName, Forced) ->
+ %% Forced option - do not pass through
+ merge_options(Opts, Forced, DefaultOpts, Default_1);
true ->
- listen_options(Opts, ForcedOpts, Force);
- false ->
+ %% Pass through
[Opt |
- listen_options(Opts, ForcedOpts, Force)]
+ merge_options(Opts, Forced, DefaultOpts, Default_1)]
end;
_ ->
- [Opt |
- listen_options(Opts, ForcedOpts, Force)]
+ %% Unhandled options e.g {raw, ...} - pass through
+ [Opt | merge_options(Opts, Forced, DefaultOpts, Default)]
end;
-listen_options([], ForcedOpts, _Force) ->
- %% Append forced
- ForcedOpts.
-
+merge_options([], _Forced, DefaultOpts, Default) ->
+ %% Append the needed default options (that we have not seen)
+ [Opt ||
+ Opt <- DefaultOpts,
+ is_map_key(element(1, expand_option(Opt)), Default)].
+
+%% Expand an atom option into its tuple equivalence,
+%% pass through others
+expand_option(Opt) ->
+ if
+ Opt =:= list; Opt =:= binary ->
+ {mode, Opt};
+ Opt =:= inet; Opt =:= inet6; Opt =:= local ->
+ %% 'family' is not quite an option name, but could/should be
+ {family, Opt};
+ true ->
+ Opt
+ end.
%% ------------------------------------------------------------
%% Accepts new connection attempts from other Erlang nodes.
%% ------------------------------------------------------------
accept(Listen) ->
- gen_accept(inet_tcp, Listen).
+ gen_accept(?DRIVER, Listen).
gen_accept(Driver, Listen) ->
spawn_opt(?MODULE, accept_loop, [Driver, self(), Listen], [link, {priority, max}]).
@@ -194,7 +295,7 @@ gen_accept(Driver, Listen) ->
accept_loop(Driver, Kernel, Listen) ->
case Driver:accept(Listen) of
{ok, Socket} ->
- Kernel ! {accept,self(),Socket,Driver:family(),tcp},
+ Kernel ! {accept,self(),Socket,Driver:family(),?PROTOCOL},
_ = controller(Driver, Kernel, Socket),
accept_loop(Driver, Kernel, Listen);
Error ->
@@ -230,7 +331,7 @@ flush_controller(Pid, Socket) ->
%% ------------------------------------------------------------
accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
- gen_accept_connection(inet_tcp, AcceptPid, Socket, MyNode, Allowed, SetupTime).
+ gen_accept_connection(?DRIVER, AcceptPid, Socket, MyNode, Allowed, SetupTime).
gen_accept_connection(Driver, AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
spawn_opt(?MODULE, do_accept,
@@ -243,40 +344,19 @@ do_accept(Driver, Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
Timer = dist_util:start_timer(SetupTime),
case check_ip(Driver, Socket) of
true ->
- HSData = #hs_data{
- kernel_pid = Kernel,
- this_node = MyNode,
- socket = Socket,
- timer = Timer,
- this_flags = 0,
- allowed = Allowed,
- f_send = fun Driver:send/2,
- f_recv = fun Driver:recv/3,
- f_setopts_pre_nodeup =
- fun(S) ->
- inet:setopts(S,
- [{active, false},
- {packet, 4},
- nodelay()])
- end,
- f_setopts_post_nodeup =
- fun(S) ->
- inet:setopts(S,
- [{active, true},
- {deliver, port},
- {packet, 4},
- binary,
- nodelay()])
- end,
- f_getll = fun(S) ->
- inet:getll(S)
- end,
- f_address = fun(S, Node) -> get_remote_id(Driver, S, Node) end,
- mf_tick = fun(S) -> ?MODULE:tick(Driver, S) end,
- mf_getstat = fun ?MODULE:getstat/1,
- mf_setopts = fun ?MODULE:setopts/2,
- mf_getopts = fun ?MODULE:getopts/2
- },
+ Family = Driver:family(),
+ HSData =
+ (gen_hs_data(Driver, Socket))
+ #hs_data{
+ kernel_pid = Kernel,
+ this_node = MyNode,
+ timer = Timer,
+ this_flags = 0,
+ allowed = Allowed,
+ f_address =
+ fun (S, Node) ->
+ get_remote_id(Family, S, Node)
+ end},
dist_util:handshake_other_started(HSData);
{false,IP} ->
error_msg("** Connection attempt from "
@@ -304,13 +384,13 @@ nodelay() ->
%% ------------------------------------------------------------
%% Get remote information about a Socket.
%% ------------------------------------------------------------
-get_remote_id(Driver, Socket, Node) ->
+get_remote_id(Family, Socket, Node) ->
case inet:peername(Socket) of
{ok,Address} ->
case split_node(atom_to_list(Node), $@, []) of
[_,Host] ->
#net_address{address=Address,host=Host,
- protocol=tcp,family=Driver:family()};
+ protocol=?PROTOCOL,family=Family};
_ ->
%% No '@' or more than one '@' in node name.
?shutdown(no_node)
@@ -325,130 +405,109 @@ get_remote_id(Driver, Socket, Node) ->
%% ------------------------------------------------------------
setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
- gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames, SetupTime).
+ gen_setup(?DRIVER, Node, Type, MyNode, LongOrShortNames, SetupTime).
gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
- spawn_opt(?MODULE, do_setup,
+ spawn_opt(?MODULE, do_setup,
[Driver, self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
dist_util:net_ticker_spawn_options()).
do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
- ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
- [Name, Address] = splitnode(Driver, Node, LongOrShortNames),
- AddressFamily = Driver:family(),
- ErlEpmd = net_kernel:epmd_module(),
+ ?trace("~p~n",[{?MODULE,self(),setup,Node}]),
Timer = dist_util:start_timer(SetupTime),
- case call_epmd_function(ErlEpmd,address_please,[Name, Address, AddressFamily]) of
+ Family = Driver:family(),
+ {#net_address{ address = {Ip, TcpPort} } = NetAddress,
+ ConnectOptions,
+ Version} =
+ fam_setup(
+ Family, Node, LongOrShortNames, fun Driver:parse_address/1),
+ dist_util:reset_timer(Timer),
+ case Driver:connect(Ip, TcpPort, ConnectOptions) of
+ {ok, Socket} ->
+ HSData =
+ (gen_hs_data(Driver, Socket))
+ #hs_data{
+ kernel_pid = Kernel,
+ other_node = Node,
+ this_node = MyNode,
+ timer = Timer,
+ this_flags = 0,
+ other_version = Version,
+ f_address =
+ fun(_,_) ->
+ NetAddress
+ end,
+ request_type = Type},
+ dist_util:handshake_we_started(HSData);
+ _ ->
+ %% Other Node may have closed since
+ %% discovery !
+ ?trace("other node (~p) "
+ "closed since discovery (port_please).~n",
+ [Node]),
+ ?shutdown(Node)
+ end.
+
+fam_setup(Family, Node, LongOrShortNames, ParseAddress) ->
+ ?trace("~p~n",[{?MODULE,self(),?FUNCTION_NAME,Node}]),
+ [Name, Host] = splitnode(ParseAddress, Node, LongOrShortNames),
+ ErlEpmd = net_kernel:epmd_module(),
+ case
+ call_epmd_function(
+ ErlEpmd, address_please, [Name, Host, Family])
+ of
{ok, Ip, TcpPort, Version} ->
- ?trace("address_please(~p) -> version ~p~n",
- [Node,Version]),
- do_setup_connect(Driver, Kernel, Node, Address, AddressFamily,
- Ip, TcpPort, Version, Type, MyNode, Timer);
- {ok, Ip} ->
+ ?trace("address_please(~p) -> version ~p~n", [Node,Version]),
+ fam_setup(Family, Host, Ip, TcpPort, Version);
+ {ok, Ip} ->
case ErlEpmd:port_please(Name, Ip) of
{port, TcpPort, Version} ->
- ?trace("port_please(~p) -> version ~p~n",
+ ?trace("port_please(~p) -> version ~p~n",
[Node,Version]),
- do_setup_connect(Driver, Kernel, Node, Address, AddressFamily,
- Ip, TcpPort, Version, Type, MyNode, Timer);
+ fam_setup(Family, Host, Ip, TcpPort, Version);
_ ->
?trace("port_please (~p) failed.~n", [Node]),
?shutdown(Node)
end;
_Other ->
- ?trace("inet_getaddr(~p) "
- "failed (~p).~n", [Node,_Other]),
+ ?trace("inet_getaddr(~p) failed (~p).~n", [Node,_Other]),
?shutdown(Node)
end.
-%%
-%% Actual setup of connection
-%%
-do_setup_connect(Driver, Kernel, Node, Address, AddressFamily,
- Ip, TcpPort, Version, Type, MyNode, Timer) ->
- dist_util:reset_timer(Timer),
- case
- Driver:connect(
- Ip, TcpPort,
- connect_options([{active, false}, {packet, 2}]))
- of
- {ok, Socket} ->
- HSData = #hs_data{
- kernel_pid = Kernel,
- other_node = Node,
- this_node = MyNode,
- socket = Socket,
- timer = Timer,
- this_flags = 0,
- other_version = Version,
- f_send = fun Driver:send/2,
- f_recv = fun Driver:recv/3,
- f_setopts_pre_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, false},
- {packet, 4},
- nodelay()])
- end,
- f_setopts_post_nodeup =
- fun(S) ->
- inet:setopts
- (S,
- [{active, true},
- {deliver, port},
- {packet, 4},
- nodelay()])
- end,
-
- f_getll = fun inet:getll/1,
- f_address =
- fun(_,_) ->
- #net_address{
- address = {Ip,TcpPort},
- host = Address,
- protocol = tcp,
- family = AddressFamily}
- end,
- mf_tick = fun(S) -> ?MODULE:tick(Driver, S) end,
- mf_getstat = fun ?MODULE:getstat/1,
- request_type = Type,
- mf_setopts = fun ?MODULE:setopts/2,
- mf_getopts = fun ?MODULE:getopts/2
- },
- dist_util:handshake_we_started(HSData);
- _ ->
- %% Other Node may have closed since
- %% discovery !
- ?trace("other node (~p) "
- "closed since discovery (port_please).~n",
- [Node]),
- ?shutdown(Node)
- end.
-
-connect_options(Opts) ->
- case application:get_env(kernel, inet_dist_connect_options) of
- {ok,ConnectOpts} ->
- ConnectOpts ++ Opts;
- _ ->
- Opts
- end.
+fam_setup(Family, Host, Ip, TcpPort, Version) ->
+ NetAddress =
+ #net_address{
+ address = {Ip, TcpPort},
+ host = Host,
+ protocol = ?PROTOCOL,
+ family = Family},
+ {NetAddress, connect_options(), Version}.
+
+connect_options() ->
+ merge_options(
+ case application:get_env(kernel, inet_dist_connect_options) of
+ {ok, ConnectOpts} ->
+ ConnectOpts;
+ _ ->
+ []
+ end, [{active, false}, {packet, 2}]).
+
%%
%% Close a socket.
%%
close(Socket) ->
- inet_tcp:close(Socket).
+ ?DRIVER:close(Socket).
%% If Node is illegal terminate the connection setup!!
-splitnode(Driver, Node, LongOrShortNames) ->
+splitnode(ParseAddress, Node, LongOrShortNames) ->
case split_node(atom_to_list(Node), $@, []) of
[Name|Tail] when Tail =/= [] ->
Host = lists:append(Tail),
case split_node(Host, $., []) of
[_] when LongOrShortNames =:= longnames ->
- case Driver:parse_address(Host) of
+ case ParseAddress(Host) of
{ok, _} ->
[Name, Host];
_ ->
@@ -482,21 +541,6 @@ split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
split_node([], _, Ack) -> [lists:reverse(Ack)].
%% ------------------------------------------------------------
-%% Fetch local information about a Socket.
-%% ------------------------------------------------------------
-get_tcp_address(Driver, Socket) ->
- {ok, Address} = inet:sockname(Socket),
- NetAddr = get_tcp_address(Driver),
- NetAddr#net_address{ address = Address }.
-get_tcp_address(Driver) ->
- {ok, Host} = inet:gethostname(),
- #net_address {
- host = Host,
- protocol = tcp,
- family = Driver:family()
- }.
-
-%% ------------------------------------------------------------
%% Determine if EPMD module supports the called functions.
%% If not call the builtin erl_epmd
%% ------------------------------------------------------------