summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2008-12-09 17:52:10 +0000
committerTony Garnock-Jones <tonyg@lshift.net>2008-12-09 17:52:10 +0000
commit0de13ab0f1da5dec85b691a570218e0cc72f6cef (patch)
tree37fc91be7259ffd5599773865fccefe85e37c579
parent68afd00b8bc9e4b8d29243a5ed47b9b1ab3962bc (diff)
parent84030fb5c52543c9fa700c46cb18f7d6bce5f2a2 (diff)
downloadrabbitmq-server-bug18381.tar.gz
merged default into bug18381bug18381
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_access_control.erl4
-rw-r--r--src/rabbit_amqqueue.erl27
-rw-r--r--src/rabbit_amqqueue_process.erl60
-rw-r--r--src/rabbit_exchange.erl51
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_networking.erl22
-rw-r--r--src/rabbit_reader.erl76
8 files changed, 240 insertions, 12 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 2af8691c..d07aeaf8 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -72,6 +72,8 @@
-type(erlang_node() :: atom()).
-type(socket() :: port()).
-type(thunk(T) :: fun(() -> T)).
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
%% this is really an abstract type, but dialyzer does not support them
-type(guid() :: any()).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 5b0202e4..b73090fc 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -203,7 +203,7 @@ delete_vhost(VHostPath) ->
lists:foreach(fun (Q) ->
{ok,_} = rabbit_amqqueue:delete(Q, false, false)
end,
- rabbit_amqqueue:list_vhost_queues(VHostPath)),
+ rabbit_amqqueue:list(VHostPath)),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_vhost(
VHostPath,
@@ -217,7 +217,7 @@ internal_delete_vhost(VHostPath) ->
lists:foreach(fun (#exchange{name=Name}) ->
ok = rabbit_exchange:delete(Name, false)
end,
- rabbit_exchange:list_vhost_exchanges(VHostPath)),
+ rabbit_exchange:list(VHostPath)),
lists:foreach(fun (Username) ->
ok = unmap_user_vhost(Username, VHostPath)
end,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b3418a1d..2b9abb29 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,8 +33,9 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
+-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
+-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2]).
@@ -60,6 +61,7 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@@ -67,7 +69,11 @@
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
--spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]).
+-spec(list/1 :: (vhost()) -> [amqqueue()]).
+-spec(info/1 :: (amqqueue()) -> [info()]).
+-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
+-spec(info_all/1 :: (vhost()) -> [[info()]]).
+-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -184,10 +190,25 @@ with_or_die(Name, F) ->
not_found, "no ~s", [rabbit_misc:rs(Name)])
end).
-list_vhost_queues(VHostPath) ->
+list(VHostPath) ->
mnesia:dirty_match_object(
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
+map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
+
+info(#amqqueue{ pid = QPid }) ->
+ gen_server:call(QPid, info).
+
+info(#amqqueue{ pid = QPid }, Items) ->
+ case gen_server:call(QPid, {info, Items}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+
stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
stat_all() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1588c4be..709e355e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -67,6 +67,21 @@
is_overload_protection_active,
unsent_message_count}).
+-define(INFO_KEYS,
+ [name,
+ durable,
+ auto_delete,
+ arguments,
+ pid,
+ messages_ready,
+ messages_unacknowledged,
+ messages_uncommitted,
+ messages,
+ acks_uncommitted,
+ consumers,
+ transactions,
+ memory]).
+
%%----------------------------------------------------------------------------
start_link(Q) ->
@@ -413,6 +428,9 @@ store_tx(Txn, Tx) ->
erase_tx(Txn) ->
erase({txn, Txn}).
+all_tx_record() ->
+ [T || {{txn, _}, T} <- get()].
+
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
@@ -464,8 +482,50 @@ purge_message_buffer(QName, MessageBuffer) ->
%% artifically ack them.
persist_acks(none, QName, lists:append(Messages)).
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(name, #q{q = #amqqueue{name = Name}}) -> Name;
+i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
+i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
+i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
+i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid;
+i(messages_ready, #q{message_buffer = MessageBuffer}) ->
+ queue:len(MessageBuffer);
+i(messages_unacknowledged, _) ->
+ lists:sum([dict:size(UAM) ||
+ #cr{unacked_messages = UAM} <- all_ch_record()]);
+i(messages_uncommitted, _) ->
+ lists:sum([length(Pending) ||
+ #tx{pending_messages = Pending} <- all_tx_record()]);
+i(messages, State) ->
+ lists:sum([i(Item, State) || Item <- [messages_ready,
+ messages_unacknowledged,
+ messages_uncommitted]]);
+i(acks_uncommitted, _) ->
+ lists:sum([length(Pending) ||
+ #tx{pending_acks = Pending} <- all_tx_record()]);
+i(consumers, _) ->
+ lists:sum([length(Consumers) ||
+ #cr{consumers = Consumers} <- all_ch_record()]);
+i(transactions, _) ->
+ length(all_tx_record());
+i(memory, _) ->
+ {memory, M} = process_info(self(), memory),
+ M;
+i(Item, _) ->
+ throw({bad_argument, Item}).
+
%---------------------------------------------------------------------------
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Items}, _From, State) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 64de1b52..299747d1 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -35,10 +35,10 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list_vhost_exchanges/1,
+ list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/4, delete_binding/4]).
+-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_bindings_for_queue/1]).
-export([check_type/1, assert_type/2, topic_matches/2]).
@@ -68,7 +68,11 @@
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
--spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]).
+-spec(list/1 :: (vhost()) -> [exchange()]).
+-spec(info/1 :: (exchange()) -> [info()]).
+-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
+-spec(info_all/1 :: (vhost()) -> [[info()]]).
+-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
@@ -80,6 +84,8 @@
-spec(delete_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'binding_not_found'}).
+-spec(list_bindings/1 :: (vhost()) ->
+ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
@@ -93,6 +99,8 @@
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+
recover() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -160,10 +168,32 @@ lookup_or_die(Name) ->
not_found, "no ~s", [rabbit_misc:rs(Name)])
end.
-list_vhost_exchanges(VHostPath) ->
+list(VHostPath) ->
mnesia:dirty_match_object(
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+map(VHostPath, F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list(VHostPath)).
+
+infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
+
+i(name, #exchange{name = Name}) -> Name;
+i(type, #exchange{type = Type}) -> Type;
+i(durable, #exchange{durable = Durable}) -> Durable;
+i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
+i(arguments, #exchange{arguments = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(X = #exchange{}) -> infos(?INFO_KEYS, X).
+
+info(X = #exchange{}, Items) -> infos(Items, X).
+
+info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
+
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
ContentTypeBin, BodyBin) ->
@@ -342,6 +372,19 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
R <- tuple_to_list(route_with_reverse(Binding))],
ok.
+list_bindings(VHostPath) ->
+ [{ExchangeName, QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{
+ exchange_name = ExchangeName,
+ key = RoutingKey,
+ queue_name = QueueName,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(
+ #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ _ = '_'},
+ _ = '_'})].
+
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
route_with_reverse(Binding = #binding{}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d0d8fe8b..973e163b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -40,7 +40,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, rs/1]).
-export([enable_cover/0, report_cover/0]).
--export([throw_on_error/2, with_exit_handler/2]).
+-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
@@ -87,6 +87,7 @@
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
@@ -224,6 +225,13 @@ with_exit_handler(Handler, Thunk) ->
Handler()
end.
+filter_exit_map(F, L) ->
+ Ref = make_ref(),
+ lists:filter(fun (R) -> R =/= Ref end,
+ [with_exit_handler(
+ fun () -> Ref end,
+ fun () -> F(I) end) || I <- L]).
+
with_user(Username, Thunk) ->
fun () ->
case mnesia:read({user, Username}) of
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 727f4f67..99ea37d8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,7 +32,9 @@
-module(rabbit_networking).
-export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
- on_node_down/1, active_listeners/0, node_listeners/1]).
+ on_node_down/1, active_listeners/0, node_listeners/1,
+ connections/0, connection_info/1, connection_info/2,
+ connection_info_all/0, connection_info_all/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
@@ -46,12 +48,18 @@
-ifdef(use_specs).
-type(host() :: ip_address() | string() | atom()).
+-type(connection() :: pid()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
+-spec(connections/0 :: () -> [connection()]).
+-spec(connection_info/1 :: (connection()) -> [info()]).
+-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
+-spec(connection_info_all/0 :: () -> [[info()]]).
+-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
{ip_address(), atom()}).
@@ -142,6 +150,16 @@ start_client(Sock) ->
Child ! {go, Sock},
Child.
+connections() ->
+ [Pid || {_, Pid, _, _} <- supervisor:which_children(
+ rabbit_tcp_client_sup)].
+
+connection_info(Pid) -> rabbit_reader:info(Pid).
+connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
+
+connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
+connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
@@ -155,3 +173,5 @@ tcp_host(IPAddress) ->
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> inet_parse:ntoa(IPAddress)
end.
+
+cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b985029d..10c6e0ca 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0]).
+-export([start_link/0, info/1, info/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -56,6 +56,11 @@
-record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-define(INFO_KEYS,
+ [pid, address, port, peer_address, peer_port,
+ recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
+ state, channels, user, vhost, timeout, frame_max]).
+
%% connection lifecycle
%%
%% all state transitions and terminations are marked with *...*
@@ -126,6 +131,15 @@
%%
%% TODO: refactor the code so that the above is obvious
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+
+-endif.
+
%%--------------------------------------------------------------------------
start_link() ->
@@ -146,6 +160,15 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+info(Pid) ->
+ gen_server:call(Pid, info).
+
+info(Pid, Items) ->
+ case gen_server:call(Pid, {info, Items}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -276,6 +299,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, info} ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Parent, Deb, State);
+ {'$gen_call', From, {info, Items}} ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Parent, Deb, State);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -623,6 +654,49 @@ compute_redirects(false) ->
%%--------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, #v1{}) ->
+ self();
+i(address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:sockname(Sock),
+ A;
+i(port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:sockname(Sock),
+ P;
+i(peer_address, #v1{sock = Sock}) ->
+ {ok, {A, _}} = inet:peername(Sock),
+ A;
+i(peer_port, #v1{sock = Sock}) ->
+ {ok, {_, P}} = inet:peername(Sock),
+ P;
+i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ case inet:getstat(Sock, [SockStat]) of
+ {ok, [{SockStat, StatVal}]} -> StatVal;
+ {error, einval} -> undefined;
+ {error, Error} -> throw({cannot_get_socket_stats, Error})
+ end;
+i(state, #v1{connection_state = S}) ->
+ S;
+i(channels, #v1{}) ->
+ length(all_channels());
+i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+ VHost;
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+ Timeout;
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+ FrameMax;
+i(Item, #v1{}) ->
+ throw({bad_argument, Item}).
+
+%%--------------------------------------------------------------------------
+
send_to_new_channel(Channel, AnalyzedFrame, State) ->
case get({closing_channel, Channel}) of
undefined ->