summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@lshift.net>2008-11-23 20:41:50 +0000
committerEmile Joubert <emile@lshift.net>2008-11-23 20:41:50 +0000
commit27d131adf4419d486ee4fc20b80f40178fc4af4d (patch)
tree0ef87018f8908d8ca1dd904ef062aa9bbaf9fe61
parent46a0d8c35e8801f8d4e9630846cf5bd4128a1c47 (diff)
parenta54bfb42f0501c82c16cd9a984b67929c29e85ed (diff)
downloadrabbitmq-server-27d131adf4419d486ee4fc20b80f40178fc4af4d.tar.gz
Merged bug18381 into bug19684
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_networking.erl18
-rw-r--r--src/rabbit_reader.erl65
4 files changed, 90 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c93ae89a..e2174e11 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -187,14 +187,7 @@ with_or_die(Name, F) ->
list() -> rabbit_misc:dirty_read_all(amqqueue).
-map(F) ->
- %% TODO: there is scope for optimisation here, e.g. using a
- %% cursor, parallelising the function invocation
- Ref = make_ref(),
- lists:filter(fun (R) -> R =/= Ref end,
- [rabbit_misc:with_exit_handler(
- fun () -> Ref end,
- fun () -> F(Q) end) || Q <- list()]).
+map(F) -> rabbit_misc:filter_exit_map(F, list()).
list_vhost_queues(VHostPath) ->
mnesia:dirty_match_object(
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 7638af58..3c8da720 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -34,7 +34,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, rs/1]).
-export([enable_cover/0, report_cover/0]).
--export([throw_on_error/2, with_exit_handler/2]).
+-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
@@ -80,6 +80,7 @@
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
@@ -215,6 +216,13 @@ with_exit_handler(Handler, Thunk) ->
Handler()
end.
+filter_exit_map(F, L) ->
+ Ref = make_ref(),
+ lists:filter(fun (R) -> R =/= Ref end,
+ [with_exit_handler(
+ fun () -> Ref end,
+ fun () -> F(I) end) || I <- L]).
+
with_user(Username, Thunk) ->
fun () ->
case mnesia:read({user, Username}) of
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9fc34503..2ed57d77 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -27,7 +27,8 @@
-export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
on_node_down/1, active_listeners/0, node_listeners/1,
- connections/0]).
+ connections/0, connection_info/1, connection_info/2,
+ connection_info_all/0, connection_info_all/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
@@ -41,13 +42,18 @@
-ifdef(use_specs).
-type(host() :: ip_address() | string() | atom()).
+-type(connection() :: pid()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (node()) -> [listener()]).
--spec(connections/0 :: () -> [pid()]).
+-spec(connections/0 :: () -> [connection()]).
+-spec(connection_info/1 :: (connection()) -> [info()]).
+-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
+-spec(connection_info_all/0 :: () -> [[info()]]).
+-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
{ip_address(), atom()}).
@@ -142,6 +148,12 @@ connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
+connection_info(Pid) -> rabbit_reader:info(Pid).
+connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
+
+connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
+connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
@@ -155,3 +167,5 @@ tcp_host(IPAddress) ->
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> inet_parse:ntoa(IPAddress)
end.
+
+cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ce26c11a..cbadaff2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0]).
+-export([start_link/0, info/1, info/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -50,6 +50,10 @@
-record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-define(INFO_KEYS,
+ [pid, address, port, peer_address, peer_port,
+ state, channels, user, vhost, timeout, frame_max]).
+
%% connection lifecycle
%%
%% all state transitions and terminations are marked with *...*
@@ -120,6 +124,15 @@
%%
%% TODO: refactor the code so that the above is obvious
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+
+-endif.
+
%%--------------------------------------------------------------------------
start_link() ->
@@ -140,6 +153,15 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+info(Pid) ->
+ gen_server:call(Pid, info).
+
+info(Pid, Items) ->
+ case gen_server:call(Pid, {info, Items}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -270,6 +292,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, info} ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Parent, Deb, State);
+ {'$gen_call', From, {info, Items}} ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Parent, Deb, State);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -617,6 +647,39 @@ compute_redirects(false) ->
%%--------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, #v1{}) ->
+ self();
+i(address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:sockname(Sock),
+ A;
+i(port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:sockname(Sock),
+ P;
+i(peer_address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:peername(Sock),
+ A;
+i(peer_port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:peername(Sock),
+ P;
+i(state, #v1{connection_state = S}) ->
+ S;
+i(channels, #v1{}) ->
+ length(all_channels());
+i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+ VHost;
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+ Timeout;
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+ FrameMax;
+i(Item, #v1{}) ->
+ throw({bad_argument, Item}).
+
+%%--------------------------------------------------------------------------
+
send_to_new_channel(Channel, AnalyzedFrame, State) ->
case get({closing_channel, Channel}) of
undefined ->