From 0798dfad95793219c5ee53b81decbc451252e636 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 1 Aug 2011 15:01:06 +0100 Subject: Introduce rabbit:force_event_refresh which will: * Emit a foo_exists for every foo that can emit foo_created * As a side effect, wake everything up and thus send out foo_stats events too. Currently this doesn't work for direct connections (rather more fiddly), but it does for everything else. --- src/rabbit.erl | 8 +++++++- src/rabbit_amqqueue.erl | 7 +++++++ src/rabbit_amqqueue_process.erl | 23 +++++++++++++++++++++-- src/rabbit_channel.erl | 16 ++++++++++++++++ src/rabbit_networking.erl | 7 ++++++- src/rabbit_reader.erl | 11 ++++++++++- 6 files changed, 67 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/rabbit.erl b/src/rabbit.erl index e067607d..5e9c84ef 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -19,7 +19,7 @@ -behaviour(application). -export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0, - rotate_logs/1]). + rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -189,6 +189,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). +-spec(force_event_refresh/0 :: () -> 'ok'). -spec(status/0 :: () -> [{pid, integer()} | {running_applications, [{atom(), string(), string()}]} | @@ -512,6 +513,11 @@ log_rotation_result(ok, {error, SaslLogError}) -> log_rotation_result(ok, ok) -> ok. +force_event_refresh() -> + rabbit_networking:force_connection_event_refresh(), + rabbit_channel:force_event_refresh(), + rabbit_amqqueue:force_event_refresh(). + %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e9d01d12..c5e2f908 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,6 +22,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -90,6 +91,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). +-spec(force_event_refresh/0 :: () -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -387,6 +389,11 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +force_event_refresh() -> + [map(VHost, fun(Q) -> delegate_cast(Q#amqqueue.pid, + force_event_refresh) end) || + VHost <- rabbit_vhost:list()]. + consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6019413..c7e36283 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -767,7 +767,15 @@ emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> - rabbit_event:notify(consumer_created, + emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, + consumer_created). + +emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) -> + emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, + consumer_exists). + +emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) -> + rabbit_event:notify(Type, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, @@ -1118,7 +1126,18 @@ handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; + +handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + _ -> [emit_consumer_exists(Ch, CTag, true, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State), + Exclusive = {Ch, CTag}] + end, + noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 13fb7ce1..7c8a07b9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -24,6 +24,7 @@ -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). +-export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -93,6 +94,7 @@ -spec(refresh_config_all/0 :: () -> 'ok'). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/0 :: () -> 'ok'). -endif. @@ -159,6 +161,16 @@ emit_stats(Pid) -> ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). +force_event_refresh() -> + %% TODO roll in bug 23897? + All = [Pid || + Node <- rabbit_mnesia:running_clustered_nodes(), + Pid <- rpc:call(Node, rabbit_channel, list, [])], + rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, All). + +force_event_refresh(Pid) -> + gen_server2:cast(Pid, force_event_refresh). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, @@ -300,6 +312,10 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> noreply([ensure_stats_timer], State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); +handle_cast(force_event_refresh, State) -> + rabbit_event:notify(channel_exists, infos(?CREATION_EVENT_KEYS, State)), + noreply(State); + handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 451e56e8..a75a5fc0 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -21,7 +21,7 @@ node_listeners/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2]). + close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/2, @@ -69,6 +69,8 @@ -spec(connection_info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). +-spec(force_connection_event_refresh/0 :: () -> 'ok'). + -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/2 :: (atom(), listener_config()) -> [{inet:ip_address(), ip_port(), family(), atom()}]). @@ -289,6 +291,9 @@ close_connection(Pid, Explanation) -> false -> throw({error, {not_a_connection_pid, Pid}}) end. +force_connection_event_refresh() -> + cmap(fun (C) -> rabbit_reader:force_event_refresh(C) end). + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index dffabf85..b322af73 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -28,7 +28,7 @@ -export([process_channel_frame/5]). %% used by erlang-client --export([emit_stats/1]). +-export([emit_stats/1, force_event_refresh/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -71,6 +71,7 @@ -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -129,6 +130,9 @@ info(Pid, Items) -> emit_stats(Pid) -> gen_server:cast(Pid, emit_stats). +force_event_refresh(Pid) -> + gen_server:cast(Pid, force_event_refresh). + conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -325,6 +329,11 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> mainloop(Deb, State); handle_other({'$gen_cast', emit_stats}, Deb, State) -> mainloop(Deb, internal_emit_stats(State)); +handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + rabbit_event:notify(connection_exists, + [{type, network} | + infos(?CREATION_EVENT_KEYS, State)]), + mainloop(Deb, State); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other(Other, _Deb, _State) -> -- cgit v1.2.1 From 55c360d7b4fad37b5fd80b50b2352083c9f9d5c4 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 2 Aug 2011 16:33:41 +0100 Subject: Tidy up slightly and make rabbitmqctl list_channels cluster aware. --- src/rabbit_channel.erl | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7c8a07b9..9dbef43a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -133,6 +133,11 @@ confirm(Pid, MsgSeqNos) -> list() -> pg_local:get_members(rabbit_channels). +list_all_nodes() -> + [Pid || + Node <- rabbit_mnesia:running_clustered_nodes(), + Pid <- rpc:call(Node, rabbit_channel, list, [])]. + info_keys() -> ?INFO_KEYS. info(Pid) -> @@ -145,10 +150,11 @@ info(Pid, Items) -> end. info_all() -> - rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list_all_nodes()). info_all(Items) -> - rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, + list_all_nodes()). refresh_config_all() -> rabbit_misc:upmap( @@ -162,11 +168,8 @@ ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). force_event_refresh() -> - %% TODO roll in bug 23897? - All = [Pid || - Node <- rabbit_mnesia:running_clustered_nodes(), - Pid <- rpc:call(Node, rabbit_channel, list, [])], - rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, All). + rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, + list_all_nodes()). force_event_refresh(Pid) -> gen_server2:cast(Pid, force_event_refresh). -- cgit v1.2.1 From b18de45aaea54480827f593e1caced01c89d7e73 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 2 Aug 2011 17:13:33 +0100 Subject: Fix list function names, introduce an abstraction. --- src/rabbit_channel.erl | 22 +++++++++------------- src/rabbit_misc.erl | 8 ++++++++ 2 files changed, 17 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9dbef43a..7e40ae57 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -22,7 +22,7 @@ -export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). --export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([list_local/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). -export([force_event_refresh/0]). @@ -85,7 +85,7 @@ -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). --spec(list/0 :: () -> [pid()]). +-spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). @@ -130,13 +130,11 @@ flushed(Pid, QPid) -> confirm(Pid, MsgSeqNos) -> gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). -list() -> +list_local() -> pg_local:get_members(rabbit_channels). -list_all_nodes() -> - [Pid || - Node <- rabbit_mnesia:running_clustered_nodes(), - Pid <- rpc:call(Node, rabbit_channel, list, [])]. +list() -> + rabbit_misc:rpc_list_all_nodes(rabbit_channel, list_local, []). info_keys() -> ?INFO_KEYS. @@ -150,15 +148,14 @@ info(Pid, Items) -> end. info_all() -> - rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list_all_nodes()). + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). info_all(Items) -> - rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, - list_all_nodes()). + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). refresh_config_all() -> rabbit_misc:upmap( - fun (C) -> gen_server2:call(C, refresh_config) end, list()), + fun (C) -> gen_server2:call(C, refresh_config) end, list_local()), ok. emit_stats(Pid) -> @@ -168,8 +165,7 @@ ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). force_event_refresh() -> - rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, - list_all_nodes()). + rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, list()). force_event_refresh(Pid) -> gen_server2:cast(Pid, force_event_refresh). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3bbfb1d7..e9f2cc15 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -58,6 +58,7 @@ -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). +-export([rpc_list_all_nodes/3]). %%---------------------------------------------------------------------------- @@ -207,6 +208,7 @@ -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). +-spec(rpc_list_all_nodes/3 :: (atom(), atom(), [any()]) -> [any()]). -endif. @@ -942,3 +944,9 @@ format_message_queue_entry(V) when is_tuple(V) -> list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); format_message_queue_entry(_V) -> '_'. + +rpc_list_all_nodes(M, F, A) -> + [Res || Node <- [node() | nodes()], Res <- case rpc:call(Node, M, F, A) of + {badrpc, _} -> []; + R -> R + end]. -- cgit v1.2.1