summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-08-05 17:15:08 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-08-05 17:15:08 +0100
commitb73d562111cd764097cde55ff761e3c2a2d82ec0 (patch)
tree3108af27d17c0f07ab7c0ace5370eecf48f67f0e /src
parent51c8e02239abcb2f8a9a80db8ed2385e0ce67d10 (diff)
parentb18de45aaea54480827f593e1caced01c89d7e73 (diff)
downloadrabbitmq-server-b73d562111cd764097cde55ff761e3c2a2d82ec0.tar.gz
Merge in default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_networking.erl7
-rw-r--r--src/rabbit_reader.erl10
7 files changed, 77 insertions, 7 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e067607d..5e9c84ef 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -19,7 +19,7 @@
-behaviour(application).
-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0,
- rotate_logs/1]).
+ rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -189,6 +189,7 @@
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
+-spec(force_event_refresh/0 :: () -> 'ok').
-spec(status/0 ::
() -> [{pid, integer()} |
{running_applications, [{atom(), string(), string()}]} |
@@ -512,6 +513,11 @@ log_rotation_result(ok, {error, SaslLogError}) ->
log_rotation_result(ok, ok) ->
ok.
+force_event_refresh() ->
+ rabbit_networking:force_connection_event_refresh(),
+ rabbit_channel:force_event_refresh(),
+ rabbit_amqqueue:force_event_refresh().
+
%%---------------------------------------------------------------------------
%% misc
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a7c92e51..0aa8921f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,6 +22,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -88,6 +89,7 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
+-spec(force_event_refresh/0 :: () -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -381,6 +383,11 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+force_event_refresh() ->
+ [map(VHost, fun(Q) -> delegate_cast(Q#amqqueue.pid,
+ force_event_refresh) end) ||
+ VHost <- rabbit_vhost:list()].
+
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 05de48d6..e1fa7006 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -761,7 +761,15 @@ emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
- rabbit_event:notify(consumer_created,
+ emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
+ consumer_created).
+
+emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
+ consumer_exists).
+
+emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) ->
+ rabbit_event:notify(Type,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
@@ -1085,6 +1093,17 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)),
+ case Exclusive of
+ none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State)];
+ _ -> [emit_consumer_exists(Ch, CTag, true, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State),
+ Exclusive = {Ch, CTag}]
+ end,
noreply(State).
handle_info(maybe_expire, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 45f0032d..f49f2e20 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -22,8 +22,9 @@
-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
--export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([list_local/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_all/0, ready_for_close/1]).
+-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -84,7 +85,7 @@
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
--spec(list/0 :: () -> [pid()]).
+-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
@@ -92,6 +93,7 @@
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(refresh_config_all/0 :: () -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
+-spec(force_event_refresh/0 :: () -> 'ok').
-endif.
@@ -127,9 +129,12 @@ flushed(Pid, QPid) ->
confirm(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-list() ->
+list_local() ->
pg_local:get_members(rabbit_channels).
+list() ->
+ rabbit_misc:rpc_list_all_nodes(rabbit_channel, list_local, []).
+
info_keys() -> ?INFO_KEYS.
info(Pid) ->
@@ -149,12 +154,18 @@ info_all(Items) ->
refresh_config_all() ->
rabbit_misc:upmap(
- fun (C) -> gen_server2:call(C, refresh_config) end, list()),
+ fun (C) -> gen_server2:call(C, refresh_config) end, list_local()),
ok.
ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).
+force_event_refresh() ->
+ rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, list()).
+
+force_event_refresh(Pid) ->
+ gen_server2:cast(Pid, force_event_refresh).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
@@ -296,6 +307,10 @@ handle_cast({deliver, ConsumerTag, AckRequired,
rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
+
+handle_cast(force_event_refresh, State) ->
+ rabbit_event:notify(channel_exists, infos(?CREATION_EVENT_KEYS, State)),
+ noreply(State);
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b98dbd46..fa1eea66 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -58,6 +58,7 @@
-export([is_process_alive/1]).
-export([pget/2, pget/3, pget_or_die/2]).
-export([format_message_queue/2]).
+-export([rpc_list_all_nodes/3]).
%%----------------------------------------------------------------------------
@@ -208,6 +209,7 @@
-spec(pget/3 :: (term(), [term()], term()) -> term()).
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
+-spec(rpc_list_all_nodes/3 :: (atom(), atom(), [any()]) -> [any()]).
-endif.
@@ -954,3 +956,9 @@ format_message_queue_entry(V) when is_tuple(V) ->
list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]);
format_message_queue_entry(_V) ->
'_'.
+
+rpc_list_all_nodes(M, F, A) ->
+ [Res || Node <- [node() | nodes()], Res <- case rpc:call(Node, M, F, A) of
+ {badrpc, _} -> [];
+ R -> R
+ end].
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 451e56e8..a75a5fc0 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -21,7 +21,7 @@
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
- close_connection/2]).
+ close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/2,
@@ -69,6 +69,8 @@
-spec(connection_info_all/1 ::
(rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
+-spec(force_connection_event_refresh/0 :: () -> 'ok').
+
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
-> [{inet:ip_address(), ip_port(), family(), atom()}]).
@@ -289,6 +291,9 @@ close_connection(Pid, Explanation) ->
false -> throw({error, {not_a_connection_pid, Pid}})
end.
+force_connection_event_refresh() ->
+ cmap(fun (C) -> rabbit_reader:force_event_refresh(C) end).
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2dccc748..2fc0b9cd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,6 +27,7 @@
-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
+-export([force_event_refresh/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -68,6 +69,7 @@
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
+-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
@@ -123,6 +125,9 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
+force_event_refresh(Pid) ->
+ gen_server:cast(Pid, force_event_refresh).
+
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -319,6 +324,11 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
mainloop(Deb, State);
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
+handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ rabbit_event:notify(connection_exists,
+ [{type, network} |
+ infos(?CREATION_EVENT_KEYS, State)]),
+ mainloop(Deb, State);
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other(Other, _Deb, _State) ->