diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2008-12-09 17:52:10 +0000 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2008-12-09 17:52:10 +0000 |
commit | 0de13ab0f1da5dec85b691a570218e0cc72f6cef (patch) | |
tree | 37fc91be7259ffd5599773865fccefe85e37c579 /src | |
parent | 68afd00b8bc9e4b8d29243a5ed47b9b1ab3962bc (diff) | |
parent | 84030fb5c52543c9fa700c46cb18f7d6bce5f2a2 (diff) | |
download | rabbitmq-server-0de13ab0f1da5dec85b691a570218e0cc72f6cef.tar.gz |
merged default into bug18381bug18381
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_access_control.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 27 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 60 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 51 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 22 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 76 |
7 files changed, 238 insertions, 12 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 5b0202e4..b73090fc 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -203,7 +203,7 @@ delete_vhost(VHostPath) -> lists:foreach(fun (Q) -> {ok,_} = rabbit_amqqueue:delete(Q, false, false) end, - rabbit_amqqueue:list_vhost_queues(VHostPath)), + rabbit_amqqueue:list(VHostPath)), R = rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_vhost( VHostPath, @@ -217,7 +217,7 @@ internal_delete_vhost(VHostPath) -> lists:foreach(fun (#exchange{name=Name}) -> ok = rabbit_exchange:delete(Name, false) end, - rabbit_exchange:list_vhost_exchanges(VHostPath)), + rabbit_exchange:list(VHostPath)), lists:foreach(fun (Username) -> ok = unmap_user_vhost(Username, VHostPath) end, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b3418a1d..2b9abb29 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,8 +33,9 @@ -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, +-export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). +-export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2]). @@ -60,6 +61,7 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). + -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> @@ -67,7 +69,11 @@ -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). --spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]). +-spec(list/1 :: (vhost()) -> [amqqueue()]). +-spec(info/1 :: (amqqueue()) -> [info()]). +-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). +-spec(info_all/1 :: (vhost()) -> [[info()]]). +-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -184,10 +190,25 @@ with_or_die(Name, F) -> not_found, "no ~s", [rabbit_misc:rs(Name)]) end). -list_vhost_queues(VHostPath) -> +list(VHostPath) -> mnesia:dirty_match_object( #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). +map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). + +info(#amqqueue{ pid = QPid }) -> + gen_server:call(QPid, info). + +info(#amqqueue{ pid = QPid }, Items) -> + case gen_server:call(QPid, {info, Items}) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). + stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). stat_all() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1588c4be..709e355e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -67,6 +67,21 @@ is_overload_protection_active, unsent_message_count}). +-define(INFO_KEYS, + [name, + durable, + auto_delete, + arguments, + pid, + messages_ready, + messages_unacknowledged, + messages_uncommitted, + messages, + acks_uncommitted, + consumers, + transactions, + memory]). + %%---------------------------------------------------------------------------- start_link(Q) -> @@ -413,6 +428,9 @@ store_tx(Txn, Tx) -> erase_tx(Txn) -> erase({txn, Txn}). +all_tx_record() -> + [T || {{txn, _}, T} <- get()]. + all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. @@ -464,8 +482,50 @@ purge_message_buffer(QName, MessageBuffer) -> %% artifically ack them. persist_acks(none, QName, lists:append(Messages)). +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(name, #q{q = #amqqueue{name = Name}}) -> Name; +i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; +i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; +i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; +i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid; +i(messages_ready, #q{message_buffer = MessageBuffer}) -> + queue:len(MessageBuffer); +i(messages_unacknowledged, _) -> + lists:sum([dict:size(UAM) || + #cr{unacked_messages = UAM} <- all_ch_record()]); +i(messages_uncommitted, _) -> + lists:sum([length(Pending) || + #tx{pending_messages = Pending} <- all_tx_record()]); +i(messages, State) -> + lists:sum([i(Item, State) || Item <- [messages_ready, + messages_unacknowledged, + messages_uncommitted]]); +i(acks_uncommitted, _) -> + lists:sum([length(Pending) || + #tx{pending_acks = Pending} <- all_tx_record()]); +i(consumers, _) -> + lists:sum([length(Consumers) || + #cr{consumers = Consumers} <- all_ch_record()]); +i(transactions, _) -> + length(all_tx_record()); +i(memory, _) -> + {memory, M} = process_info(self(), memory), + M; +i(Item, _) -> + throw({bad_argument, Item}). + %--------------------------------------------------------------------------- +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Items}, _From, State) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; + handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% Synchronous, "immediate" delivery mode %% diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 64de1b52..299747d1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -35,10 +35,10 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list_vhost_exchanges/1, + list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/4, delete_binding/4]). +-export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). -export([check_type/1, assert_type/2, topic_matches/2]). @@ -68,7 +68,11 @@ -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). --spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). +-spec(list/1 :: (vhost()) -> [exchange()]). +-spec(info/1 :: (exchange()) -> [info()]). +-spec(info/2 :: (exchange(), [info_key()]) -> [info()]). +-spec(info_all/1 :: (vhost()) -> [[info()]]). +-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). @@ -80,6 +84,8 @@ -spec(delete_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'binding_not_found'}). +-spec(list_bindings/1 :: (vhost()) -> + [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> @@ -93,6 +99,8 @@ %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. + recover() -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -160,10 +168,32 @@ lookup_or_die(Name) -> not_found, "no ~s", [rabbit_misc:rs(Name)]) end. -list_vhost_exchanges(VHostPath) -> +list(VHostPath) -> mnesia:dirty_match_object( #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. + +i(name, #exchange{name = Name}) -> Name; +i(type, #exchange{type = Type}) -> Type; +i(durable, #exchange{durable = Durable}) -> Durable; +i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; +i(arguments, #exchange{arguments = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(X = #exchange{}) -> infos(?INFO_KEYS, X). + +info(X = #exchange{}, Items) -> infos(Items, X). + +info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). + %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> @@ -342,6 +372,19 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> R <- tuple_to_list(route_with_reverse(Binding))], ok. +list_bindings(VHostPath) -> + [{ExchangeName, QueueName, RoutingKey, Arguments} || + #route{binding = #binding{ + exchange_name = ExchangeName, + key = RoutingKey, + queue_name = QueueName, + args = Arguments}} + <- mnesia:dirty_match_object( + #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + _ = '_'}, + _ = '_'})]. + route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); route_with_reverse(Binding = #binding{}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d0d8fe8b..973e163b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -40,7 +40,7 @@ -export([dirty_read/1]). -export([r/3, r/2, rs/1]). -export([enable_cover/0, report_cover/0]). --export([throw_on_error/2, with_exit_handler/2]). +-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). @@ -87,6 +87,7 @@ -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). @@ -224,6 +225,13 @@ with_exit_handler(Handler, Thunk) -> Handler() end. +filter_exit_map(F, L) -> + Ref = make_ref(), + lists:filter(fun (R) -> R =/= Ref end, + [with_exit_handler( + fun () -> Ref end, + fun () -> F(I) end) || I <- L]). + with_user(Username, Thunk) -> fun () -> case mnesia:read({user, Username}) of diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 727f4f67..99ea37d8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -32,7 +32,9 @@ -module(rabbit_networking). -export([start/0, start_tcp_listener/2, stop_tcp_listener/2, - on_node_down/1, active_listeners/0, node_listeners/1]). + on_node_down/1, active_listeners/0, node_listeners/1, + connections/0, connection_info/1, connection_info/2, + connection_info_all/0, connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). @@ -46,12 +48,18 @@ -ifdef(use_specs). -type(host() :: ip_address() | string() | atom()). +-type(connection() :: pid()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). +-spec(connections/0 :: () -> [connection()]). +-spec(connection_info/1 :: (connection()) -> [info()]). +-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]). +-spec(connection_info_all/0 :: () -> [[info()]]). +-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) -> {ip_address(), atom()}). @@ -142,6 +150,16 @@ start_client(Sock) -> Child ! {go, Sock}, Child. +connections() -> + [Pid || {_, Pid, _, _} <- supervisor:which_children( + rabbit_tcp_client_sup)]. + +connection_info(Pid) -> rabbit_reader:info(Pid). +connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). + +connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). +connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> @@ -155,3 +173,5 @@ tcp_host(IPAddress) -> {ok, #hostent{h_name = Name}} -> Name; {error, _Reason} -> inet_parse:ntoa(IPAddress) end. + +cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b985029d..10c6e0ca 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0]). +-export([start_link/0, info/1, info/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -56,6 +56,11 @@ -record(v1, {sock, connection, callback, recv_ref, connection_state}). +-define(INFO_KEYS, + [pid, address, port, peer_address, peer_port, + recv_oct, recv_cnt, send_oct, send_cnt, send_pend, + state, channels, user, vhost, timeout, frame_max]). + %% connection lifecycle %% %% all state transitions and terminations are marked with *...* @@ -126,6 +131,15 @@ %% %% TODO: refactor the code so that the above is obvious +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(info/1 :: (pid()) -> [info()]). +-spec(info/2 :: (pid(), [info_key()]) -> [info()]). + +-endif. + %%-------------------------------------------------------------------------- start_link() -> @@ -146,6 +160,15 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +info(Pid) -> + gen_server:call(Pid, info). + +info(Pid, Items) -> + case gen_server:call(Pid, {info, Items}) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of @@ -276,6 +299,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, info} -> + gen_server:reply(From, infos(?INFO_KEYS, State)), + mainloop(Parent, Deb, State); + {'$gen_call', From, {info, Items}} -> + gen_server:reply(From, try {ok, infos(Items, State)} + catch Error -> {error, Error} + end), + mainloop(Parent, Deb, State); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -623,6 +654,49 @@ compute_redirects(false) -> %%-------------------------------------------------------------------------- +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, #v1{}) -> + self(); +i(address, #v1{sock = Sock}) -> + {ok, {A, _}} = inet:sockname(Sock), + A; +i(port, #v1{sock = Sock}) -> + {ok, {_, P}} = inet:sockname(Sock), + P; +i(peer_address, #v1{sock = Sock}) -> + {ok, {A, _}} = inet:peername(Sock), + A; +i(peer_port, #v1{sock = Sock}) -> + {ok, {_, P}} = inet:peername(Sock), + P; +i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; + SockStat =:= recv_cnt; + SockStat =:= send_oct; + SockStat =:= send_cnt; + SockStat =:= send_pend -> + case inet:getstat(Sock, [SockStat]) of + {ok, [{SockStat, StatVal}]} -> StatVal; + {error, einval} -> undefined; + {error, Error} -> throw({cannot_get_socket_stats, Error}) + end; +i(state, #v1{connection_state = S}) -> + S; +i(channels, #v1{}) -> + length(all_channels()); +i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> + Username; +i(vhost, #v1{connection = #connection{vhost = VHost}}) -> + VHost; +i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> + Timeout; +i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> + FrameMax; +i(Item, #v1{}) -> + throw({bad_argument, Item}). + +%%-------------------------------------------------------------------------- + send_to_new_channel(Channel, AnalyzedFrame, State) -> case get({closing_channel, Channel}) of undefined -> |