diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-03-15 16:56:44 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-03-15 16:56:44 +0000 |
commit | 720da3dd2375d0e31db68269e92aeafa144f7621 (patch) | |
tree | 5530ffd0d133cb984da374991e0ba13a3c509dd6 | |
parent | c47f036b0744dd704cef1a446c16dc938835ecc1 (diff) | |
parent | cfe297ce81d7fee179316880d7605b9271715a80 (diff) | |
download | rabbitmq-server-720da3dd2375d0e31db68269e92aeafa144f7621.tar.gz |
Merged bug24794 into defaultrabbitmq_v2_8_0
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 28 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 21 |
4 files changed, 82 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c95efa14..9b6f14ca 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,6 +42,8 @@ -define(MORE_CONSUMER_CREDIT_AFTER, 50). +-define(FAILOVER_WAIT_MILLIS, 100). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -421,9 +423,26 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +%% We need to account for the idea that queues may be mid-promotion +%% during force_event_refresh (since it's likely we're doing this in +%% the first place since a node failed). Therefore we keep poking at +%% the list of queues until we were able to talk to a live process or +%% the queue no longer exists. force_event_refresh() -> - [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()], - ok. + force_event_refresh([Q#amqqueue.name || Q <- list()]). + +force_event_refresh(QNames) -> + Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], + {_, Bad} = rabbit_misc:multi_call( + [Q#amqqueue.pid || Q <- Qs], force_event_refresh), + FailedPids = [Pid || {Pid, _Reason} <- Bad], + Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, + lists:member(Pid, FailedPids)], + case Failed of + [] -> ok; + _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), + force_event_refresh(Failed) + end. consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b8b27443..106a9960 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1231,7 +1231,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(subtract_acks( ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)). + fun (State1) -> requeue_and_run(AckTags, State1) end)); + +handle_call(force_event_refresh, _From, + State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), + emit_consumer_created(Ch, CTag, true, AckRequired) + end, + reply(ok, State). handle_cast({confirm, MsgSeqNos, QPid}, State) -> handle_confirm(MsgSeqNos, QPid, State); @@ -1328,16 +1339,6 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), - case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || - {Ch, CTag, AckRequired} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) - end, - noreply(State); - handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> dead_letter_msg(Msg, AckTag, Reason, State). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index dca3bead..ddf7f574 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -58,6 +58,7 @@ -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). +-export([multi_call/2]). -export([quit/1]). %%---------------------------------------------------------------------------- @@ -200,6 +201,8 @@ -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). +-spec(multi_call/2 :: + ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). -endif. @@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) -> _ -> Res end || Res <- ResL]). +%% A simplified version of gen_server:multi_call/2 with a sane +%% API. This is not in gen_server2 as there is no useful +%% infrastructure there to share. +multi_call(Pids, Req) -> + MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids], + receive_multi_call(MonitorPids, [], []). + +start_multi_call(Pid, Req) when is_pid(Pid) -> + Mref = erlang:monitor(process, Pid), + Pid ! {'$gen_call', {self(), Mref}, Req}, + {Mref, Pid}. + +receive_multi_call([], Good, Bad) -> + {lists:reverse(Good), lists:reverse(Bad)}; +receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad); + {'DOWN', Mref, _, _, noconnection} -> + receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]); + {'DOWN', Mref, _, _, Reason} -> + receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) + end. + %% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f7e3baa7..85fe5426 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,6 +35,7 @@ all_tests() -> passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), + passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), @@ -107,6 +108,26 @@ run_cluster_dependent_tests(SecondaryNode) -> passed. +test_multi_call() -> + Fun = fun() -> + receive + {'$gen_call', {From, Mref}, request} -> + From ! {Mref, response} + end, + receive + never -> ok + end + end, + Pid1 = spawn(Fun), + Pid2 = spawn(Fun), + Pid3 = spawn(Fun), + exit(Pid2, bang), + {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} = + rabbit_misc:multi_call([Pid1, Pid2, Pid3], request), + exit(Pid1, bang), + exit(Pid3, bang), + passed. + test_priority_queue() -> false = priority_queue:is_queue(not_a_queue), |