diff options
Diffstat (limited to 'lib/kernel/src/inet_tcp_dist.erl')
-rw-r--r-- | lib/kernel/src/inet_tcp_dist.erl | 492 |
1 files changed, 268 insertions, 224 deletions
diff --git a/lib/kernel/src/inet_tcp_dist.erl b/lib/kernel/src/inet_tcp_dist.erl index b53de6281b..8c5c8a1cf2 100644 --- a/lib/kernel/src/inet_tcp_dist.erl +++ b/lib/kernel/src/inet_tcp_dist.erl @@ -1,8 +1,8 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1997-2022. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 1997-2023. All Rights Reserved. +%% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at @@ -14,10 +14,11 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. -%% +%% %% %CopyrightEnd% %% -module(inet_tcp_dist). +-feature(maybe_expr, enable). %% Handles the connection setup phase with other Erlang nodes. @@ -30,6 +31,11 @@ %% Generalized dist API -export([gen_listen/3, gen_accept/2, gen_accept_connection/6, gen_setup/6, gen_select/2, gen_address/1]). +-export([fam_select/2, fam_address/1, fam_listen/4, fam_setup/4]). +%% OTP internal (e.g ssl) +-export([gen_hs_data/2, nodelay/0]). + +-export([merge_options/2, merge_options/3]). %% internal exports @@ -42,20 +48,28 @@ -include("dist.hrl"). -include("dist_util.hrl"). +-define(DRIVER, inet_tcp). +-define(PROTOCOL, tcp). + %% ------------------------------------------------------------ %% Select this protocol based on node name %% select(Node) => Bool %% ------------------------------------------------------------ select(Node) -> - gen_select(inet_tcp, Node). + gen_select(?DRIVER, Node). gen_select(Driver, Node) -> + fam_select(Driver:family(), Node). + +fam_select(Family, Node) -> case dist_util:split_node(Node) of {node, Name, Host} -> - case call_epmd_function( - net_kernel:epmd_module(), address_please, - [Name, Host, Driver:family()]) of + EpmdMod = net_kernel:epmd_module(), + case + call_epmd_function( + EpmdMod, address_please, [Name, Host, Family]) + of {ok, _Addr} -> true; {ok, _Addr, _Port, _Creation} -> true; _ -> false @@ -67,47 +81,115 @@ gen_select(Driver, Node) -> %% Get the address family that this distribution uses %% ------------------------------------------------------------ address() -> - gen_address(inet_tcp). + gen_address(?DRIVER). + gen_address(Driver) -> - get_tcp_address(Driver). + fam_address(Driver:family()). + +fam_address(Family) -> + {ok, Host} = inet:gethostname(), + #net_address{ + host = Host, + protocol = ?PROTOCOL, + family = Family + }. + +%% ------------------------------------------------------------ +%% Set up the general fields in #hs_data{} +%% ------------------------------------------------------------ +gen_hs_data(Driver, Socket) -> + %% The only thing Driver actually is used for is to + %% implement non-blocking send of distribution tick + Nodelay = nodelay(), + #hs_data{ + socket = Socket, + f_send = fun Driver:send/2, + f_recv = fun Driver:recv/3, + f_setopts_pre_nodeup = + fun (S) -> + inet:setopts( + S, + [{active, false}, {packet, 4}, Nodelay]) + end, + f_setopts_post_nodeup = + fun (S) -> + inet:setopts( + S, + [{active, true}, {packet,4}, + {deliver, port}, binary, Nodelay]) + end, + f_getll = fun inet:getll/1, + mf_tick = fun (S) -> ?MODULE:tick(Driver, S) end, + mf_getstat = fun ?MODULE:getstat/1, + mf_setopts = fun ?MODULE:setopts/2, + mf_getopts = fun ?MODULE:getopts/2}. %% ------------------------------------------------------------ %% Create the listen socket, i.e. the port that this erlang %% node is accessible through. %% ------------------------------------------------------------ -listen(Name, Host) -> - gen_listen(inet_tcp, Name, Host). - -%% Keep this clause for third-party dist controllers reusing this API +%% Keep this function for third-party dist controllers reusing this API listen(Name) -> {ok, Host} = inet:gethostname(), listen(Name, Host). +listen(Name, Host) -> + gen_listen(?DRIVER, Name, Host). + gen_listen(Driver, Name, Host) -> - ErlEpmd = net_kernel:epmd_module(), - case gen_listen(ErlEpmd, Name, Host, Driver) of - {ok, Socket} -> - TcpAddress = get_tcp_address(Driver, Socket), - {_,Port} = TcpAddress#net_address.address, - case ErlEpmd:register_node(Name, Port, Driver) of - {ok, Creation} -> - {ok, {Socket, TcpAddress, Creation}}; - Error -> - Error - end; - Error -> - Error + ForcedOptions = [{active, false}, {packet,2}, {nodelay, true}], + ListenFun = + fun (First, Last, ListenOptions) -> + listen_loop( + Driver, First, Last, + merge_options(ListenOptions, ForcedOptions)) + end, + Family = Driver:family(), + maybe + %% + {ok, {ListenSocket, Address, Creation}} ?= + fam_listen(Family, Name, Host, ListenFun), + NetAddress = + #net_address{ + host = Host, + protocol = ?PROTOCOL, + family = Family, + address = Address}, + {ok, {ListenSocket, NetAddress, Creation}} + end. + +listen_loop(_Driver, First, Last, _Options) when First > Last -> + {error,eaddrinuse}; +listen_loop(Driver, First, Last, Options) -> + case Driver:listen(First, Options) of + {error, eaddrinuse} -> + listen_loop(Driver, First+1, Last, Options); + Other -> + Other end. -gen_listen(ErlEpmd, Name, Host, Driver) -> - ListenOptions = listen_options(), - case call_epmd_function(ErlEpmd, listen_port_please, [Name, Host]) of - {ok, 0} -> - {First,Last} = get_port_range(), - do_listen(Driver, First, Last, ListenOptions); - {ok, Prt} -> - do_listen(Driver, Prt, Prt, ListenOptions) + +fam_listen(Family, Name, Host, ListenFun) -> + maybe + EpmdMod = net_kernel:epmd_module(), + %% + {ok, ListenSocket} ?= + case + call_epmd_function( + EpmdMod, listen_port_please, [Name, Host]) + of + {ok, 0} -> + {First,Last} = get_port_range(), + ListenFun(First, Last, listen_options()); + {ok, PortNum} -> + ListenFun(PortNum, PortNum, listen_options()) + end, + {ok, {_IP,Port} = Address} = inet:sockname(ListenSocket), + %% + {ok, Creation} ?= + EpmdMod:register_node(Name, Port, Family), + {ok, {ListenSocket, Address, Creation}} end. get_port_range() -> @@ -123,70 +205,89 @@ get_port_range() -> {0,0} end. -do_listen(_Driver, First,Last,_) when First > Last -> - {error,eaddrinuse}; -do_listen(Driver, First,Last,Options) -> - case Driver:listen(First, Options) of - {error, eaddrinuse} -> - do_listen(Driver, First+1,Last,Options); - Other -> - Other - end. listen_options() -> DefaultOpts = [{reuseaddr, true}, {backlog, 128}], ForcedOpts = - [{active, false}, {packet,2} | - case application:get_env(kernel, inet_dist_use_interface) of - {ok, Ip} -> [{ip, Ip}]; - undefined -> [] - end], - Force = maps:from_list(ForcedOpts), + case application:get_env(kernel, inet_dist_use_interface) of + {ok, Ip} -> [{ip, Ip}]; + undefined -> [] + end, InetDistListenOpts = case application:get_env(kernel, inet_dist_listen_options) of {ok, Opts} -> Opts; undefined -> [] end, - ListenOpts = listen_options(InetDistListenOpts, ForcedOpts, Force), - Seen = - maps:from_list( - lists:filter( - fun ({_,_}) -> true; - (_) -> false - end, ListenOpts)), - lists:filter( - fun ({OptName,_}) when is_map_key(OptName, Seen) -> - false; - (_) -> - true - end, DefaultOpts) ++ ListenOpts. - -%% Pass through all but forced -listen_options([Opt | Opts], ForcedOpts, Force) -> - case Opt of - {OptName,_} -> - case is_map_key(OptName, Force) of + merge_options(InetDistListenOpts, ForcedOpts, DefaultOpts). + + +merge_options(Opts, ForcedOpts) -> + merge_options(Opts, ForcedOpts, []). +%% +merge_options(Opts, ForcedOpts, DefaultOpts) -> + Forced = merge_options(ForcedOpts), + Default = merge_options(DefaultOpts), + ForcedOpts ++ merge_options(Opts, Forced, DefaultOpts, Default). + +%% Collect expanded 2-tuple options in a map +merge_options(Opts) -> + lists:foldr( + fun (Opt, Acc) -> + case expand_option(Opt) of + {OptName, OptVal} -> + maps:put(OptName, OptVal, Acc); + _ -> + Acc + end + end, #{}, Opts). + +%% Pass through all options that are not forced, +%% which we already have prepended, +%% and remove options that we see from the Default map +%% +merge_options([Opt | Opts], Forced, DefaultOpts, Default) -> + case expand_option(Opt) of + {OptName, _} -> + %% Remove from the Default map + Default_1 = maps:remove(OptName, Default), + if + is_map_key(OptName, Forced) -> + %% Forced option - do not pass through + merge_options(Opts, Forced, DefaultOpts, Default_1); true -> - listen_options(Opts, ForcedOpts, Force); - false -> + %% Pass through [Opt | - listen_options(Opts, ForcedOpts, Force)] + merge_options(Opts, Forced, DefaultOpts, Default_1)] end; _ -> - [Opt | - listen_options(Opts, ForcedOpts, Force)] + %% Unhandled options e.g {raw, ...} - pass through + [Opt | merge_options(Opts, Forced, DefaultOpts, Default)] end; -listen_options([], ForcedOpts, _Force) -> - %% Append forced - ForcedOpts. - +merge_options([], _Forced, DefaultOpts, Default) -> + %% Append the needed default options (that we have not seen) + [Opt || + Opt <- DefaultOpts, + is_map_key(element(1, expand_option(Opt)), Default)]. + +%% Expand an atom option into its tuple equivalence, +%% pass through others +expand_option(Opt) -> + if + Opt =:= list; Opt =:= binary -> + {mode, Opt}; + Opt =:= inet; Opt =:= inet6; Opt =:= local -> + %% 'family' is not quite an option name, but could/should be + {family, Opt}; + true -> + Opt + end. %% ------------------------------------------------------------ %% Accepts new connection attempts from other Erlang nodes. %% ------------------------------------------------------------ accept(Listen) -> - gen_accept(inet_tcp, Listen). + gen_accept(?DRIVER, Listen). gen_accept(Driver, Listen) -> spawn_opt(?MODULE, accept_loop, [Driver, self(), Listen], [link, {priority, max}]). @@ -194,7 +295,7 @@ gen_accept(Driver, Listen) -> accept_loop(Driver, Kernel, Listen) -> case Driver:accept(Listen) of {ok, Socket} -> - Kernel ! {accept,self(),Socket,Driver:family(),tcp}, + Kernel ! {accept,self(),Socket,Driver:family(),?PROTOCOL}, _ = controller(Driver, Kernel, Socket), accept_loop(Driver, Kernel, Listen); Error -> @@ -230,7 +331,7 @@ flush_controller(Pid, Socket) -> %% ------------------------------------------------------------ accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) -> - gen_accept_connection(inet_tcp, AcceptPid, Socket, MyNode, Allowed, SetupTime). + gen_accept_connection(?DRIVER, AcceptPid, Socket, MyNode, Allowed, SetupTime). gen_accept_connection(Driver, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> spawn_opt(?MODULE, do_accept, @@ -243,40 +344,19 @@ do_accept(Driver, Kernel, AcceptPid, Socket, MyNode, Allowed, SetupTime) -> Timer = dist_util:start_timer(SetupTime), case check_ip(Driver, Socket) of true -> - HSData = #hs_data{ - kernel_pid = Kernel, - this_node = MyNode, - socket = Socket, - timer = Timer, - this_flags = 0, - allowed = Allowed, - f_send = fun Driver:send/2, - f_recv = fun Driver:recv/3, - f_setopts_pre_nodeup = - fun(S) -> - inet:setopts(S, - [{active, false}, - {packet, 4}, - nodelay()]) - end, - f_setopts_post_nodeup = - fun(S) -> - inet:setopts(S, - [{active, true}, - {deliver, port}, - {packet, 4}, - binary, - nodelay()]) - end, - f_getll = fun(S) -> - inet:getll(S) - end, - f_address = fun(S, Node) -> get_remote_id(Driver, S, Node) end, - mf_tick = fun(S) -> ?MODULE:tick(Driver, S) end, - mf_getstat = fun ?MODULE:getstat/1, - mf_setopts = fun ?MODULE:setopts/2, - mf_getopts = fun ?MODULE:getopts/2 - }, + Family = Driver:family(), + HSData = + (gen_hs_data(Driver, Socket)) + #hs_data{ + kernel_pid = Kernel, + this_node = MyNode, + timer = Timer, + this_flags = 0, + allowed = Allowed, + f_address = + fun (S, Node) -> + get_remote_id(Family, S, Node) + end}, dist_util:handshake_other_started(HSData); {false,IP} -> error_msg("** Connection attempt from " @@ -304,13 +384,13 @@ nodelay() -> %% ------------------------------------------------------------ %% Get remote information about a Socket. %% ------------------------------------------------------------ -get_remote_id(Driver, Socket, Node) -> +get_remote_id(Family, Socket, Node) -> case inet:peername(Socket) of {ok,Address} -> case split_node(atom_to_list(Node), $@, []) of [_,Host] -> #net_address{address=Address,host=Host, - protocol=tcp,family=Driver:family()}; + protocol=?PROTOCOL,family=Family}; _ -> %% No '@' or more than one '@' in node name. ?shutdown(no_node) @@ -325,130 +405,109 @@ get_remote_id(Driver, Socket, Node) -> %% ------------------------------------------------------------ setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> - gen_setup(inet_tcp, Node, Type, MyNode, LongOrShortNames, SetupTime). + gen_setup(?DRIVER, Node, Type, MyNode, LongOrShortNames, SetupTime). gen_setup(Driver, Node, Type, MyNode, LongOrShortNames, SetupTime) -> - spawn_opt(?MODULE, do_setup, + spawn_opt(?MODULE, do_setup, [Driver, self(), Node, Type, MyNode, LongOrShortNames, SetupTime], dist_util:net_ticker_spawn_options()). do_setup(Driver, Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> - ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]), - [Name, Address] = splitnode(Driver, Node, LongOrShortNames), - AddressFamily = Driver:family(), - ErlEpmd = net_kernel:epmd_module(), + ?trace("~p~n",[{?MODULE,self(),setup,Node}]), Timer = dist_util:start_timer(SetupTime), - case call_epmd_function(ErlEpmd,address_please,[Name, Address, AddressFamily]) of + Family = Driver:family(), + {#net_address{ address = {Ip, TcpPort} } = NetAddress, + ConnectOptions, + Version} = + fam_setup( + Family, Node, LongOrShortNames, fun Driver:parse_address/1), + dist_util:reset_timer(Timer), + case Driver:connect(Ip, TcpPort, ConnectOptions) of + {ok, Socket} -> + HSData = + (gen_hs_data(Driver, Socket)) + #hs_data{ + kernel_pid = Kernel, + other_node = Node, + this_node = MyNode, + timer = Timer, + this_flags = 0, + other_version = Version, + f_address = + fun(_,_) -> + NetAddress + end, + request_type = Type}, + dist_util:handshake_we_started(HSData); + _ -> + %% Other Node may have closed since + %% discovery ! + ?trace("other node (~p) " + "closed since discovery (port_please).~n", + [Node]), + ?shutdown(Node) + end. + +fam_setup(Family, Node, LongOrShortNames, ParseAddress) -> + ?trace("~p~n",[{?MODULE,self(),?FUNCTION_NAME,Node}]), + [Name, Host] = splitnode(ParseAddress, Node, LongOrShortNames), + ErlEpmd = net_kernel:epmd_module(), + case + call_epmd_function( + ErlEpmd, address_please, [Name, Host, Family]) + of {ok, Ip, TcpPort, Version} -> - ?trace("address_please(~p) -> version ~p~n", - [Node,Version]), - do_setup_connect(Driver, Kernel, Node, Address, AddressFamily, - Ip, TcpPort, Version, Type, MyNode, Timer); - {ok, Ip} -> + ?trace("address_please(~p) -> version ~p~n", [Node,Version]), + fam_setup(Family, Host, Ip, TcpPort, Version); + {ok, Ip} -> case ErlEpmd:port_please(Name, Ip) of {port, TcpPort, Version} -> - ?trace("port_please(~p) -> version ~p~n", + ?trace("port_please(~p) -> version ~p~n", [Node,Version]), - do_setup_connect(Driver, Kernel, Node, Address, AddressFamily, - Ip, TcpPort, Version, Type, MyNode, Timer); + fam_setup(Family, Host, Ip, TcpPort, Version); _ -> ?trace("port_please (~p) failed.~n", [Node]), ?shutdown(Node) end; _Other -> - ?trace("inet_getaddr(~p) " - "failed (~p).~n", [Node,_Other]), + ?trace("inet_getaddr(~p) failed (~p).~n", [Node,_Other]), ?shutdown(Node) end. -%% -%% Actual setup of connection -%% -do_setup_connect(Driver, Kernel, Node, Address, AddressFamily, - Ip, TcpPort, Version, Type, MyNode, Timer) -> - dist_util:reset_timer(Timer), - case - Driver:connect( - Ip, TcpPort, - connect_options([{active, false}, {packet, 2}])) - of - {ok, Socket} -> - HSData = #hs_data{ - kernel_pid = Kernel, - other_node = Node, - this_node = MyNode, - socket = Socket, - timer = Timer, - this_flags = 0, - other_version = Version, - f_send = fun Driver:send/2, - f_recv = fun Driver:recv/3, - f_setopts_pre_nodeup = - fun(S) -> - inet:setopts - (S, - [{active, false}, - {packet, 4}, - nodelay()]) - end, - f_setopts_post_nodeup = - fun(S) -> - inet:setopts - (S, - [{active, true}, - {deliver, port}, - {packet, 4}, - nodelay()]) - end, - - f_getll = fun inet:getll/1, - f_address = - fun(_,_) -> - #net_address{ - address = {Ip,TcpPort}, - host = Address, - protocol = tcp, - family = AddressFamily} - end, - mf_tick = fun(S) -> ?MODULE:tick(Driver, S) end, - mf_getstat = fun ?MODULE:getstat/1, - request_type = Type, - mf_setopts = fun ?MODULE:setopts/2, - mf_getopts = fun ?MODULE:getopts/2 - }, - dist_util:handshake_we_started(HSData); - _ -> - %% Other Node may have closed since - %% discovery ! - ?trace("other node (~p) " - "closed since discovery (port_please).~n", - [Node]), - ?shutdown(Node) - end. - -connect_options(Opts) -> - case application:get_env(kernel, inet_dist_connect_options) of - {ok,ConnectOpts} -> - ConnectOpts ++ Opts; - _ -> - Opts - end. +fam_setup(Family, Host, Ip, TcpPort, Version) -> + NetAddress = + #net_address{ + address = {Ip, TcpPort}, + host = Host, + protocol = ?PROTOCOL, + family = Family}, + {NetAddress, connect_options(), Version}. + +connect_options() -> + merge_options( + case application:get_env(kernel, inet_dist_connect_options) of + {ok, ConnectOpts} -> + ConnectOpts; + _ -> + [] + end, [{active, false}, {packet, 2}]). + %% %% Close a socket. %% close(Socket) -> - inet_tcp:close(Socket). + ?DRIVER:close(Socket). %% If Node is illegal terminate the connection setup!! -splitnode(Driver, Node, LongOrShortNames) -> +splitnode(ParseAddress, Node, LongOrShortNames) -> case split_node(atom_to_list(Node), $@, []) of [Name|Tail] when Tail =/= [] -> Host = lists:append(Tail), case split_node(Host, $., []) of [_] when LongOrShortNames =:= longnames -> - case Driver:parse_address(Host) of + case ParseAddress(Host) of {ok, _} -> [Name, Host]; _ -> @@ -482,21 +541,6 @@ split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); split_node([], _, Ack) -> [lists:reverse(Ack)]. %% ------------------------------------------------------------ -%% Fetch local information about a Socket. -%% ------------------------------------------------------------ -get_tcp_address(Driver, Socket) -> - {ok, Address} = inet:sockname(Socket), - NetAddr = get_tcp_address(Driver), - NetAddr#net_address{ address = Address }. -get_tcp_address(Driver) -> - {ok, Host} = inet:gethostname(), - #net_address { - host = Host, - protocol = tcp, - family = Driver:family() - }. - -%% ------------------------------------------------------------ %% Determine if EPMD module supports the called functions. %% If not call the builtin erl_epmd %% ------------------------------------------------------------ |