summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-08-01 15:01:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-08-01 15:01:06 +0100
commit0798dfad95793219c5ee53b81decbc451252e636 (patch)
tree9b5992eab7404cacc1dc853c5e1b552465f31bff
parente2c57c78fcc0281eeb78dd1914287e539265244c (diff)
downloadrabbitmq-server-0798dfad95793219c5ee53b81decbc451252e636.tar.gz
Introduce rabbit:force_event_refresh which will:
* Emit a foo_exists for every foo that can emit foo_created * As a side effect, wake everything up and thus send out foo_stats events too. Currently this doesn't work for direct connections (rather more fiddly), but it does for everything else.
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_networking.erl7
-rw-r--r--src/rabbit_reader.erl11
6 files changed, 67 insertions, 5 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e067607d..5e9c84ef 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -19,7 +19,7 @@
-behaviour(application).
-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0,
- rotate_logs/1]).
+ rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -189,6 +189,7 @@
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
+-spec(force_event_refresh/0 :: () -> 'ok').
-spec(status/0 ::
() -> [{pid, integer()} |
{running_applications, [{atom(), string(), string()}]} |
@@ -512,6 +513,11 @@ log_rotation_result(ok, {error, SaslLogError}) ->
log_rotation_result(ok, ok) ->
ok.
+force_event_refresh() ->
+ rabbit_networking:force_connection_event_refresh(),
+ rabbit_channel:force_event_refresh(),
+ rabbit_amqqueue:force_event_refresh().
+
%%---------------------------------------------------------------------------
%% misc
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e9d01d12..c5e2f908 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,6 +22,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -90,6 +91,7 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
+-spec(force_event_refresh/0 :: () -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -387,6 +389,11 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+force_event_refresh() ->
+ [map(VHost, fun(Q) -> delegate_cast(Q#amqqueue.pid,
+ force_event_refresh) end) ||
+ VHost <- rabbit_vhost:list()].
+
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c6019413..c7e36283 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -767,7 +767,15 @@ emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
- rabbit_event:notify(consumer_created,
+ emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
+ consumer_created).
+
+emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
+ consumer_exists).
+
+emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) ->
+ rabbit_event:notify(Type,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
@@ -1118,7 +1126,18 @@ handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
emit_stats(State),
State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
assert_invariant(State1),
- {noreply, State1, hibernate}.
+ {noreply, State1, hibernate};
+
+handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)),
+ case Exclusive of
+ none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State)];
+ _ -> [emit_consumer_exists(Ch, CTag, true, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State),
+ Exclusive = {Ch, CTag}]
+ end,
+ noreply(State).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 13fb7ce1..7c8a07b9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -24,6 +24,7 @@
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_all/0, emit_stats/1, ready_for_close/1]).
+-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -93,6 +94,7 @@
-spec(refresh_config_all/0 :: () -> 'ok').
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
+-spec(force_event_refresh/0 :: () -> 'ok').
-endif.
@@ -159,6 +161,16 @@ emit_stats(Pid) ->
ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).
+force_event_refresh() ->
+ %% TODO roll in bug 23897?
+ All = [Pid ||
+ Node <- rabbit_mnesia:running_clustered_nodes(),
+ Pid <- rpc:call(Node, rabbit_channel, list, [])],
+ rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, All).
+
+force_event_refresh(Pid) ->
+ gen_server2:cast(Pid, force_event_refresh).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
@@ -300,6 +312,10 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
noreply([ensure_stats_timer],
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+handle_cast(force_event_refresh, State) ->
+ rabbit_event:notify(channel_exists, infos(?CREATION_EVENT_KEYS, State)),
+ noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 451e56e8..a75a5fc0 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -21,7 +21,7 @@
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
- close_connection/2]).
+ close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/2,
@@ -69,6 +69,8 @@
-spec(connection_info_all/1 ::
(rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
+-spec(force_connection_event_refresh/0 :: () -> 'ok').
+
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
-> [{inet:ip_address(), ip_port(), family(), atom()}]).
@@ -289,6 +291,9 @@ close_connection(Pid, Explanation) ->
false -> throw({error, {not_a_connection_pid, Pid}})
end.
+force_connection_event_refresh() ->
+ cmap(fun (C) -> rabbit_reader:force_event_refresh(C) end).
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index dffabf85..b322af73 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -28,7 +28,7 @@
-export([process_channel_frame/5]). %% used by erlang-client
--export([emit_stats/1]).
+-export([emit_stats/1, force_event_refresh/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -71,6 +71,7 @@
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
@@ -129,6 +130,9 @@ info(Pid, Items) ->
emit_stats(Pid) ->
gen_server:cast(Pid, emit_stats).
+force_event_refresh(Pid) ->
+ gen_server:cast(Pid, force_event_refresh).
+
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -325,6 +329,11 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
mainloop(Deb, State);
handle_other({'$gen_cast', emit_stats}, Deb, State) ->
mainloop(Deb, internal_emit_stats(State));
+handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ rabbit_event:notify(connection_exists,
+ [{type, network} |
+ infos(?CREATION_EVENT_KEYS, State)]),
+ mainloop(Deb, State);
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other(Other, _Deb, _State) ->