summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-11-19 16:31:13 +0000
committerMatthias Radestock <matthias@lshift.net>2008-11-19 16:31:13 +0000
commit6fb1988c83c1a140ad1856963615860e1a58a6e9 (patch)
tree5c71f576167cd15f2b5c4badcd8d13e78740cfab /src
parent8c0969ec73efbffd31a4d4196447d545ad1da2ac (diff)
downloadrabbitmq-server-6fb1988c83c1a140ad1856963615860e1a58a6e9.tar.gz
add connection_info functions
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_reader.erl65
2 files changed, 85 insertions, 3 deletions
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9fc34503..df9b1b69 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -27,7 +27,8 @@
-export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
on_node_down/1, active_listeners/0, node_listeners/1,
- connections/0]).
+ connections/0, connection_info/1, connection_info/2,
+ connection_info_all/0, connection_info_all/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
@@ -41,13 +42,18 @@
-ifdef(use_specs).
-type(host() :: ip_address() | string() | atom()).
+-type(connection() :: pid()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (node()) -> [listener()]).
--spec(connections/0 :: () -> [pid()]).
+-spec(connections/0 :: () -> [connection()]).
+-spec(connection_info/1 :: (connection()) -> [info()]).
+-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
+-spec(connection_info_all/0 :: () -> [[info()]]).
+-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
{ip_address(), atom()}).
@@ -142,6 +148,12 @@ connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
+connection_info(Pid) -> rabbit_reader:info(Pid).
+connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
+
+connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
+connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
@@ -155,3 +167,10 @@ tcp_host(IPAddress) ->
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> inet_parse:ntoa(IPAddress)
end.
+
+cmap(F) ->
+ Ref = make_ref(),
+ lists:filter(fun (R) -> R =/= Ref end,
+ [rabbit_misc:with_exit_handler(
+ fun () -> Ref end,
+ fun () -> F(Q) end) || Q <- connections()]).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ce26c11a..cbadaff2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0]).
+-export([start_link/0, info/1, info/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -50,6 +50,10 @@
-record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-define(INFO_KEYS,
+ [pid, address, port, peer_address, peer_port,
+ state, channels, user, vhost, timeout, frame_max]).
+
%% connection lifecycle
%%
%% all state transitions and terminations are marked with *...*
@@ -120,6 +124,15 @@
%%
%% TODO: refactor the code so that the above is obvious
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+
+-endif.
+
%%--------------------------------------------------------------------------
start_link() ->
@@ -140,6 +153,15 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+info(Pid) ->
+ gen_server:call(Pid, info).
+
+info(Pid, Items) ->
+ case gen_server:call(Pid, {info, Items}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -270,6 +292,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, info} ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Parent, Deb, State);
+ {'$gen_call', From, {info, Items}} ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Parent, Deb, State);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -617,6 +647,39 @@ compute_redirects(false) ->
%%--------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, #v1{}) ->
+ self();
+i(address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:sockname(Sock),
+ A;
+i(port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:sockname(Sock),
+ P;
+i(peer_address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:peername(Sock),
+ A;
+i(peer_port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:peername(Sock),
+ P;
+i(state, #v1{connection_state = S}) ->
+ S;
+i(channels, #v1{}) ->
+ length(all_channels());
+i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+ VHost;
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+ Timeout;
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+ FrameMax;
+i(Item, #v1{}) ->
+ throw({bad_argument, Item}).
+
+%%--------------------------------------------------------------------------
+
send_to_new_channel(Channel, AnalyzedFrame, State) ->
case get({closing_channel, Channel}) of
undefined ->