diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-03-14 16:24:25 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-03-14 16:24:25 +0000 |
commit | 8dd3a08a840c85e80891f2c46d486ac26d348d95 (patch) | |
tree | f4860993a676291d222120bb70612aec692f1bdb | |
parent | c47f036b0744dd704cef1a446c16dc938835ecc1 (diff) | |
download | rabbitmq-server-8dd3a08a840c85e80891f2c46d486ac26d348d95.tar.gz |
This fixes the bug, but it does not scatter-gather, which would certainly be nice...
-rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 |
2 files changed, 29 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c95efa14..43d65aea 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -421,8 +421,24 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +%% We need to account for the idea that queues may be mid-promotion +%% during force_event_refresh (since it's likely we're doing this in +%% the first place since a node failed). Therefore we keep poking at +%% the list of queues until we were able to talk to a live process. force_event_refresh() -> - [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()], + force_event_refresh([Q#amqqueue.name || Q <- list()]). + +force_event_refresh([]) -> + ok; + +force_event_refresh(QNames) -> + Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], + Results = [catch gen_server2:call(Q#amqqueue.pid, force_event_refresh) || + Q <- Qs], + Failed = [QName || {QName, {'EXIT', _}} <- lists:zip(QNames, Results)], + io:format("Failed: ~p~n", [Failed]), + timer:sleep(100), + force_event_refresh(Failed), ok. consumers(#amqqueue{ pid = QPid }) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b8b27443..106a9960 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1231,7 +1231,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(subtract_acks( ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)). + fun (State1) -> requeue_and_run(AckTags, State1) end)); + +handle_call(force_event_refresh, _From, + State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), + emit_consumer_created(Ch, CTag, true, AckRequired) + end, + reply(ok, State). handle_cast({confirm, MsgSeqNos, QPid}, State) -> handle_confirm(MsgSeqNos, QPid, State); @@ -1328,16 +1339,6 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), - case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || - {Ch, CTag, AckRequired} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) - end, - noreply(State); - handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> dead_letter_msg(Msg, AckTag, Reason, State). |