summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-03-15 16:56:44 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-03-15 16:56:44 +0000
commit720da3dd2375d0e31db68269e92aeafa144f7621 (patch)
tree5530ffd0d133cb984da374991e0ba13a3c509dd6
parentc47f036b0744dd704cef1a446c16dc938835ecc1 (diff)
parentcfe297ce81d7fee179316880d7605b9271715a80 (diff)
downloadrabbitmq-server-720da3dd2375d0e31db68269e92aeafa144f7621.tar.gz
Merged bug24794 into defaultrabbitmq_v2_8_0
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_misc.erl28
-rw-r--r--src/rabbit_tests.erl21
4 files changed, 82 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c95efa14..9b6f14ca 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,6 +42,8 @@
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+-define(FAILOVER_WAIT_MILLIS, 100).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -421,9 +423,26 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+%% We need to account for the idea that queues may be mid-promotion
+%% during force_event_refresh (since it's likely we're doing this in
+%% the first place since a node failed). Therefore we keep poking at
+%% the list of queues until we were able to talk to a live process or
+%% the queue no longer exists.
force_event_refresh() ->
- [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
- ok.
+ force_event_refresh([Q#amqqueue.name || Q <- list()]).
+
+force_event_refresh(QNames) ->
+ Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
+ {_, Bad} = rabbit_misc:multi_call(
+ [Q#amqqueue.pid || Q <- Qs], force_event_refresh),
+ FailedPids = [Pid || {Pid, _Reason} <- Bad],
+ Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
+ lists:member(Pid, FailedPids)],
+ case Failed of
+ [] -> ok;
+ _ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
+ force_event_refresh(Failed)
+ end.
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b8b27443..106a9960 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1231,7 +1231,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(subtract_acks(
ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end)).
+ fun (State1) -> requeue_and_run(AckTags, State1) end));
+
+handle_call(force_event_refresh, _From,
+ State = #q{exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ case Exclusive of
+ none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State)];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
+ emit_consumer_created(Ch, CTag, true, AckRequired)
+ end,
+ reply(ok, State).
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
handle_confirm(MsgSeqNos, QPid, State);
@@ -1328,16 +1339,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
- case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
- {Ch, CTag, AckRequired} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
- end,
- noreply(State);
-
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
dead_letter_msg(Msg, AckTag, Reason, State).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dca3bead..ddf7f574 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -58,6 +58,7 @@
-export([pget/2, pget/3, pget_or_die/2]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
+-export([multi_call/2]).
-export([quit/1]).
%%----------------------------------------------------------------------------
@@ -200,6 +201,8 @@
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
+-spec(multi_call/2 ::
+ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
-endif.
@@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
_ -> Res
end || Res <- ResL]).
+%% A simplified version of gen_server:multi_call/2 with a sane
+%% API. This is not in gen_server2 as there is no useful
+%% infrastructure there to share.
+multi_call(Pids, Req) ->
+ MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids],
+ receive_multi_call(MonitorPids, [], []).
+
+start_multi_call(Pid, Req) when is_pid(Pid) ->
+ Mref = erlang:monitor(process, Pid),
+ Pid ! {'$gen_call', {self(), Mref}, Req},
+ {Mref, Pid}.
+
+receive_multi_call([], Good, Bad) ->
+ {lists:reverse(Good), lists:reverse(Bad)};
+receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
+ receive
+ {Mref, Reply} ->
+ erlang:demonitor(Mref, [flush]),
+ receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad);
+ {'DOWN', Mref, _, _, noconnection} ->
+ receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]);
+ {'DOWN', Mref, _, _, Reason} ->
+ receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
+ end.
+
%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f7e3baa7..85fe5426 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,6 +35,7 @@ all_tests() ->
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
+ passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
@@ -107,6 +108,26 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed.
+test_multi_call() ->
+ Fun = fun() ->
+ receive
+ {'$gen_call', {From, Mref}, request} ->
+ From ! {Mref, response}
+ end,
+ receive
+ never -> ok
+ end
+ end,
+ Pid1 = spawn(Fun),
+ Pid2 = spawn(Fun),
+ Pid3 = spawn(Fun),
+ exit(Pid2, bang),
+ {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} =
+ rabbit_misc:multi_call([Pid1, Pid2, Pid3], request),
+ exit(Pid1, bang),
+ exit(Pid3, bang),
+ passed.
+
test_priority_queue() ->
false = priority_queue:is_queue(not_a_queue),