diff options
author | Emile Joubert <emile@lshift.net> | 2008-11-23 20:41:50 +0000 |
---|---|---|
committer | Emile Joubert <emile@lshift.net> | 2008-11-23 20:41:50 +0000 |
commit | 27d131adf4419d486ee4fc20b80f40178fc4af4d (patch) | |
tree | 0ef87018f8908d8ca1dd904ef062aa9bbaf9fe61 | |
parent | 46a0d8c35e8801f8d4e9630846cf5bd4128a1c47 (diff) | |
parent | a54bfb42f0501c82c16cd9a984b67929c29e85ed (diff) | |
download | rabbitmq-server-27d131adf4419d486ee4fc20b80f40178fc4af4d.tar.gz |
Merged bug18381 into bug19684
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 18 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 65 |
4 files changed, 90 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c93ae89a..e2174e11 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -187,14 +187,7 @@ with_or_die(Name, F) -> list() -> rabbit_misc:dirty_read_all(amqqueue). -map(F) -> - %% TODO: there is scope for optimisation here, e.g. using a - %% cursor, parallelising the function invocation - Ref = make_ref(), - lists:filter(fun (R) -> R =/= Ref end, - [rabbit_misc:with_exit_handler( - fun () -> Ref end, - fun () -> F(Q) end) || Q <- list()]). +map(F) -> rabbit_misc:filter_exit_map(F, list()). list_vhost_queues(VHostPath) -> mnesia:dirty_match_object( diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 7638af58..3c8da720 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -34,7 +34,7 @@ -export([dirty_read/1]). -export([r/3, r/2, rs/1]). -export([enable_cover/0, report_cover/0]). --export([throw_on_error/2, with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -80,6 +80,7 @@ -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). @@ -215,6 +216,13 @@ with_exit_handler(Handler, Thunk) -> Handler() end. +filter_exit_map(F, L) -> + Ref = make_ref(), + lists:filter(fun (R) -> R =/= Ref end, + [with_exit_handler( + fun () -> Ref end, + fun () -> F(I) end) || I <- L]). + with_user(Username, Thunk) -> fun () -> case mnesia:read({user, Username}) of diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9fc34503..2ed57d77 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -27,7 +27,8 @@ -export([start/0, start_tcp_listener/2, stop_tcp_listener/2, on_node_down/1, active_listeners/0, node_listeners/1, - connections/0]). + connections/0, connection_info/1, connection_info/2, + connection_info_all/0, connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). @@ -41,13 +42,18 @@ -ifdef(use_specs). -type(host() :: ip_address() | string() | atom()). +-type(connection() :: pid()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (node()) -> [listener()]). --spec(connections/0 :: () -> [pid()]). +-spec(connections/0 :: () -> [connection()]). +-spec(connection_info/1 :: (connection()) -> [info()]). +-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]). +-spec(connection_info_all/0 :: () -> [[info()]]). +-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) -> {ip_address(), atom()}). @@ -142,6 +148,12 @@ connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. +connection_info(Pid) -> rabbit_reader:info(Pid). +connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). + +connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). +connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> @@ -155,3 +167,5 @@ tcp_host(IPAddress) -> {ok, #hostent{h_name = Name}} -> Name; {error, _Reason} -> inet_parse:ntoa(IPAddress) end. + +cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ce26c11a..cbadaff2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,7 +27,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0]). +-export([start_link/0, info/1, info/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -50,6 +50,10 @@ -record(v1, {sock, connection, callback, recv_ref, connection_state}). +-define(INFO_KEYS, + [pid, address, port, peer_address, peer_port, + state, channels, user, vhost, timeout, frame_max]). + %% connection lifecycle %% %% all state transitions and terminations are marked with *...* @@ -120,6 +124,15 @@ %% %% TODO: refactor the code so that the above is obvious +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(info/1 :: (pid()) -> [info()]). +-spec(info/2 :: (pid(), [info_key()]) -> [info()]). + +-endif. + %%-------------------------------------------------------------------------- start_link() -> @@ -140,6 +153,15 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +info(Pid) -> + gen_server:call(Pid, info). + +info(Pid, Items) -> + case gen_server:call(Pid, {info, Items}) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of @@ -270,6 +292,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, info} -> + gen_server:reply(From, infos(?INFO_KEYS, State)), + mainloop(Parent, Deb, State); + {'$gen_call', From, {info, Items}} -> + gen_server:reply(From, try {ok, infos(Items, State)} + catch Error -> {error, Error} + end), + mainloop(Parent, Deb, State); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -617,6 +647,39 @@ compute_redirects(false) -> %%-------------------------------------------------------------------------- +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, #v1{}) -> + self(); +i(address, #v1{sock = Sock}) -> + {ok, {A, _}} = inet:sockname(Sock), + A; +i(port, #v1{sock = Sock}) -> + {ok, {_, P}} = inet:sockname(Sock), + P; +i(peer_address, #v1{sock = Sock}) -> + {ok, {A, _}} = inet:peername(Sock), + A; +i(peer_port, #v1{sock = Sock}) -> + {ok, {_, P}} = inet:peername(Sock), + P; +i(state, #v1{connection_state = S}) -> + S; +i(channels, #v1{}) -> + length(all_channels()); +i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> + Username; +i(vhost, #v1{connection = #connection{vhost = VHost}}) -> + VHost; +i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> + Timeout; +i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> + FrameMax; +i(Item, #v1{}) -> + throw({bad_argument, Item}). + +%%-------------------------------------------------------------------------- + send_to_new_channel(Channel, AnalyzedFrame, State) -> case get({closing_channel, Channel}) of undefined -> |