diff options
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 7 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 11 |
6 files changed, 67 insertions, 5 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index e067607d..5e9c84ef 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -19,7 +19,7 @@ -behaviour(application). -export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0, - rotate_logs/1]). + rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -189,6 +189,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). +-spec(force_event_refresh/0 :: () -> 'ok'). -spec(status/0 :: () -> [{pid, integer()} | {running_applications, [{atom(), string(), string()}]} | @@ -512,6 +513,11 @@ log_rotation_result(ok, {error, SaslLogError}) -> log_rotation_result(ok, ok) -> ok. +force_event_refresh() -> + rabbit_networking:force_connection_event_refresh(), + rabbit_channel:force_event_refresh(), + rabbit_amqqueue:force_event_refresh(). + %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e9d01d12..c5e2f908 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,6 +22,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -90,6 +91,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). +-spec(force_event_refresh/0 :: () -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -387,6 +389,11 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +force_event_refresh() -> + [map(VHost, fun(Q) -> delegate_cast(Q#amqqueue.pid, + force_event_refresh) end) || + VHost <- rabbit_vhost:list()]. + consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6019413..c7e36283 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -767,7 +767,15 @@ emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> - rabbit_event:notify(consumer_created, + emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, + consumer_created). + +emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) -> + emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, + consumer_exists). + +emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) -> + rabbit_event:notify(Type, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, @@ -1118,7 +1126,18 @@ handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; + +handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + _ -> [emit_consumer_exists(Ch, CTag, true, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State), + Exclusive = {Ch, CTag}] + end, + noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 13fb7ce1..7c8a07b9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -24,6 +24,7 @@ -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). +-export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -93,6 +94,7 @@ -spec(refresh_config_all/0 :: () -> 'ok'). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/0 :: () -> 'ok'). -endif. @@ -159,6 +161,16 @@ emit_stats(Pid) -> ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). +force_event_refresh() -> + %% TODO roll in bug 23897? + All = [Pid || + Node <- rabbit_mnesia:running_clustered_nodes(), + Pid <- rpc:call(Node, rabbit_channel, list, [])], + rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, All). + +force_event_refresh(Pid) -> + gen_server2:cast(Pid, force_event_refresh). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, @@ -300,6 +312,10 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> noreply([ensure_stats_timer], State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); +handle_cast(force_event_refresh, State) -> + rabbit_event:notify(channel_exists, infos(?CREATION_EVENT_KEYS, State)), + noreply(State); + handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 451e56e8..a75a5fc0 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -21,7 +21,7 @@ node_listeners/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2]). + close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/2, @@ -69,6 +69,8 @@ -spec(connection_info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). +-spec(force_connection_event_refresh/0 :: () -> 'ok'). + -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/2 :: (atom(), listener_config()) -> [{inet:ip_address(), ip_port(), family(), atom()}]). @@ -289,6 +291,9 @@ close_connection(Pid, Explanation) -> false -> throw({error, {not_a_connection_pid, Pid}}) end. +force_connection_event_refresh() -> + cmap(fun (C) -> rabbit_reader:force_event_refresh(C) end). + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index dffabf85..b322af73 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -28,7 +28,7 @@ -export([process_channel_frame/5]). %% used by erlang-client --export([emit_stats/1]). +-export([emit_stats/1, force_event_refresh/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -71,6 +71,7 @@ -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -129,6 +130,9 @@ info(Pid, Items) -> emit_stats(Pid) -> gen_server:cast(Pid, emit_stats). +force_event_refresh(Pid) -> + gen_server:cast(Pid, force_event_refresh). + conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -325,6 +329,11 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> mainloop(Deb, State); handle_other({'$gen_cast', emit_stats}, Deb, State) -> mainloop(Deb, internal_emit_stats(State)); +handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + rabbit_event:notify(connection_exists, + [{type, network} | + infos(?CREATION_EVENT_KEYS, State)]), + mainloop(Deb, State); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other(Other, _Deb, _State) -> |