summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/kernel/src/dist_util.erl11
-rw-r--r--lib/kernel/src/net_kernel.erl160
2 files changed, 99 insertions, 72 deletions
diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl
index 9495ac282a..3b9d88da6a 100644
--- a/lib/kernel/src/dist_util.erl
+++ b/lib/kernel/src/dist_util.erl
@@ -360,22 +360,11 @@ shutdown(_Module, _Line, _Data, Reason) ->
?shutdown_trace("Net Kernel 2: shutting down connection "
"~p:~p, data ~p,reason ~p~n",
[_Module,_Line, _Data, Reason]),
- flush_down(),
exit(Reason).
%% Use this line to debug connection.
%% Set net_kernel verbose = 1 as well.
%% exit({Reason, ?MODULE, _Line, _Data, erlang:timestamp()}).
-
-flush_down() ->
- receive
- {From, get_status} ->
- From ! {self(), get_status, error},
- flush_down()
- after 0 ->
- ok
- end.
-
handshake_we_started(#hs_data{request_type=ReqType,
other_node=Node,
add_flags=AddFlgs0,
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index 4a68e6676d..e4b214a6c5 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -121,17 +121,23 @@
-define(LISTEN_ID, #listen.listen).
-define(ACCEPT_ID, #listen.accept).
+-type connection_state() :: pending | up | up_pending.
+-type connection_type() :: normal | hidden.
+
+-include("net_address.hrl").
+
+%% Relaxed typing to allow ets:select without Dialyzer complains.
-record(connection, {
- node, %% remote node name
- conn_id, %% Connection identity
- state, %% pending | up | up_pending
- owner, %% owner pid
- ctrlr, %% Controller port or pid
- pending_owner, %% possible new owner
- address, %% #net_address
- waiting = [], %% queued processes
- type %% normal | hidden
- }).
+ node :: node() | '_', %% remote node name
+ conn_id, %% Connection identity
+ state :: connection_state() | '_',
+ owner :: pid() | '_', %% owner pid
+ ctrlr, %% Controller port or pid
+ pending_owner :: pid() | '_' | undefined, %% possible new owner
+ address :: #net_address{} | '_',
+ waiting = [], %% queued processes
+ type :: connection_type() | '_'
+}).
-record(barred_connection, {
node %% remote node name
@@ -151,8 +157,6 @@
%% the connection setup.
-define(SETUPTIME, 7000).
--include("net_address.hrl").
-
%%% BIF
-export([dflag_unicode_io/1]).
@@ -180,9 +184,31 @@ longnames() -> request(longnames).
Reason :: not_allowed | not_found.
stop() -> erl_distribution:stop().
-node_info(Node) -> get_node_info(Node).
-node_info(Node, Key) -> get_node_info(Node, Key).
-nodes_info() -> get_nodes_info().
+-type node_info() ::
+ {address, #net_address{}} |
+ {type, connection_type()} |
+ {in, non_neg_integer()} |
+ {out, non_neg_integer()} |
+ {owner, pid()} |
+ {state, connection_state()}.
+
+-spec node_info(node()) -> {ok, [node_info()]} | {error, bad_node}.
+node_info(Node) ->
+ get_node_info(Node).
+
+-spec node_info(node(), address) -> {ok, Address} | {error, bad_node} when Address :: #net_address{};
+ (node(), type) -> {ok, Type} | {error, bad_node} when Type :: connection_type();
+ (node(), in | out) -> {ok, Bytes} | {error, bad_node} when Bytes :: non_neg_integer();
+ (node(), owner) -> {ok, Owner} | {error, bad_node} when Owner :: pid();
+ (node(), state) -> {ok, State} | {error, bad_node} when State :: connection_state().
+ %(node(), term()) -> {error, invalid_key} | {error, bad_node}.
+node_info(Node, Key) ->
+ get_node_info(Node, Key).
+
+-spec nodes_info() -> {ok, [{node(), [node_info()]}]}.
+nodes_info() ->
+ get_nodes_info().
+
i() -> print_info().
i(Node) -> print_info(Node).
@@ -1621,61 +1647,73 @@ connecttime() ->
get_node_info(Node) ->
case ets:lookup(sys_dist, Node) of
- [Conn = #connection{owner = Owner, state = State}] ->
- case get_status(Owner, Node, State) of
- {ok, In, Out} ->
- {ok, [{owner, Owner},
- {state, State},
- {address, Conn#connection.address},
- {type, Conn#connection.type},
- {in, In},
- {out, Out}]};
+ [#connection{owner = Owner, state = up, address = Addr, type = Type}] ->
+ MRef = monitor(process, Owner),
+ Owner ! {self(), get_status},
+ receive
+ {Owner, get_status, {ok, Read, Write}} ->
+ demonitor(MRef, [flush]),
+ {ok, [{owner, Owner}, {state, up}, {address, Addr},
+ {type, Type}, {in, Read}, {out, Write}]};
+ {'DOWN', MRef, process, Owner, _Info} ->
+ {error, bad_node}
+ end;
+ [#connection{owner = Owner, state = State, address = Addr, type = Type}] ->
+ {ok, [{owner, Owner}, {state, State}, {address, Addr},
+ {type, Type}, {in, 0}, {out, 0}]};
_ ->
{error, bad_node}
- end;
- _ ->
- {error, bad_node}
end.
-%%
-%% We can't do monitor_node here incase the node is pending,
-%% the monitor_node/2 call hangs until the connection is ready.
-%% We will not ask about in/out information either for pending
-%% connections as this also would block this call awhile.
-%%
-get_status(Owner, Node, up) ->
- monitor_node(Node, true),
- Owner ! {self(), get_status},
- receive
- {Owner, get_status, Res} ->
- monitor_node(Node, false),
- Res;
- {nodedown, Node} ->
- error
- end;
-get_status(_, _, _) ->
- {ok, 0, 0}.
-
get_node_info(Node, Key) ->
case get_node_info(Node) of
- {ok, Info} ->
- case lists:keysearch(Key, 1, Info) of
- {value, {Key, Value}} -> {ok, Value};
- _ -> {error, invalid_key}
- end;
- Error ->
- Error
+ {ok, Info} ->
+ case lists:keyfind(Key, 1, Info) of
+ {Key, Value} ->
+ {ok, Value};
+ false ->
+ {error, invalid_key}
+ end;
+ {error, bad_node} ->
+ {error, bad_node}
end.
+
get_nodes_info() ->
- Nodes = ets:select(sys_dist, [{#connection{node = '$1', _ = '_'}, [], ['$1']}]),
- {ok, lists:filtermap(
- fun(Node) ->
- case get_node_info(Node) of
- {ok, Info} -> {true, {Node, Info}};
- _ -> false
- end
- end, Nodes)}.
+ Conns = ets:select(sys_dist, [{#connection{_ = '_'}, [], ['$_']}]),
+ Info = multi_info(Conns, {self(), get_status}, #{}, []),
+ {ok, Info}.
+
+multi_info([], _Msg, PidToRef, NodeInfos) ->
+ multi_receive(PidToRef, NodeInfos);
+multi_info([#connection{owner = Owner, state = up} = Conn | Conns], Msg, PidToRef, NodeInfos) ->
+ % connection is up, try to figure out in/out bytes
+ MRef = erlang:monitor(process, Owner),
+ Owner ! Msg,
+ multi_info(Conns, Msg, maps:put(Owner, {MRef, Conn}, PidToRef), NodeInfos);
+multi_info([#connection{node = Node, owner = Owner, type = Type,
+ state = State, address = Addr} | Conns], Msg, PidToRef, NodeInfos) ->
+ % connection is not up: in/out bytes are zero
+ multi_info(Conns, Msg, PidToRef, [
+ {Node, [{owner, Owner}, {state, State}, {address, Addr}, {type, Type}, {in, 0}, {out, 0}]}
+ | NodeInfos]).
+
+multi_receive(PidToRef, NodeInfos) when map_size(PidToRef) =:= 0 ->
+ NodeInfos;
+multi_receive(PidToRef, NodeInfos) ->
+ receive
+ {DistProc, get_status, {ok, Read, Write}} ->
+ {{MRef, #connection{node = Node, owner = Owner, type = Type,
+ state = State, address = Addr}}, NewRefs} = maps:take(DistProc, PidToRef),
+ erlang:demonitor(MRef, [flush]),
+ multi_receive(NewRefs, [
+ {Node, [{owner, Owner}, {state, State}, {address, Addr}, {type, Type}, {in, Read}, {out, Write}]}
+ | NodeInfos]);
+ {'DOWN', _MRef, process, Pid, _Info} ->
+ % connection went down: reproducing compatible behaviour with
+ % not showing any information about this connection
+ multi_receive(maps:remove(Pid, PidToRef), NodeInfos)
+ end.
%% ------------------------------------------------------------
%% Misc. functions