summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl76
1 files changed, 75 insertions, 1 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b985029d..10c6e0ca 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0]).
+-export([start_link/0, info/1, info/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -56,6 +56,11 @@
-record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-define(INFO_KEYS,
+ [pid, address, port, peer_address, peer_port,
+ recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
+ state, channels, user, vhost, timeout, frame_max]).
+
%% connection lifecycle
%%
%% all state transitions and terminations are marked with *...*
@@ -126,6 +131,15 @@
%%
%% TODO: refactor the code so that the above is obvious
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+
+-endif.
+
%%--------------------------------------------------------------------------
start_link() ->
@@ -146,6 +160,15 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+info(Pid) ->
+ gen_server:call(Pid, info).
+
+info(Pid, Items) ->
+ case gen_server:call(Pid, {info, Items}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -276,6 +299,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, info} ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Parent, Deb, State);
+ {'$gen_call', From, {info, Items}} ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Parent, Deb, State);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -623,6 +654,49 @@ compute_redirects(false) ->
%%--------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, #v1{}) ->
+ self();
+i(address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:sockname(Sock),
+ A;
+i(port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:sockname(Sock),
+ P;
+i(peer_address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:peername(Sock),
+ A;
+i(peer_port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:peername(Sock),
+ P;
+i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ case inet:getstat(Sock, [SockStat]) of
+ {ok, [{SockStat, StatVal}]} -> StatVal;
+ {error, einval} -> undefined;
+ {error, Error} -> throw({cannot_get_socket_stats, Error})
+ end;
+i(state, #v1{connection_state = S}) ->
+ S;
+i(channels, #v1{}) ->
+ length(all_channels());
+i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+ VHost;
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+ Timeout;
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+ FrameMax;
+i(Item, #v1{}) ->
+ throw({bad_argument, Item}).
+
+%%--------------------------------------------------------------------------
+
send_to_new_channel(Channel, AnalyzedFrame, State) ->
case get({closing_channel, Channel}) of
undefined ->