summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-03-14 16:24:25 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-03-14 16:24:25 +0000
commit8dd3a08a840c85e80891f2c46d486ac26d348d95 (patch)
treef4860993a676291d222120bb70612aec692f1bdb
parentc47f036b0744dd704cef1a446c16dc938835ecc1 (diff)
downloadrabbitmq-server-8dd3a08a840c85e80891f2c46d486ac26d348d95.tar.gz
This fixes the bug, but it does not scatter-gather, which would certainly be nice...
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl23
2 files changed, 29 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c95efa14..43d65aea 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -421,8 +421,24 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+%% We need to account for the idea that queues may be mid-promotion
+%% during force_event_refresh (since it's likely we're doing this in
+%% the first place since a node failed). Therefore we keep poking at
+%% the list of queues until we were able to talk to a live process.
force_event_refresh() ->
- [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
+ force_event_refresh([Q#amqqueue.name || Q <- list()]).
+
+force_event_refresh([]) ->
+ ok;
+
+force_event_refresh(QNames) ->
+ Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
+ Results = [catch gen_server2:call(Q#amqqueue.pid, force_event_refresh) ||
+ Q <- Qs],
+ Failed = [QName || {QName, {'EXIT', _}} <- lists:zip(QNames, Results)],
+ io:format("Failed: ~p~n", [Failed]),
+ timer:sleep(100),
+ force_event_refresh(Failed),
ok.
consumers(#amqqueue{ pid = QPid }) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b8b27443..106a9960 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1231,7 +1231,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(subtract_acks(
ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end)).
+ fun (State1) -> requeue_and_run(AckTags, State1) end));
+
+handle_call(force_event_refresh, _From,
+ State = #q{exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ case Exclusive of
+ none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State)];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
+ emit_consumer_created(Ch, CTag, true, AckRequired)
+ end,
+ reply(ok, State).
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
handle_confirm(MsgSeqNos, QPid, State);
@@ -1328,16 +1339,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
- case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
- {Ch, CTag, AckRequired} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
- end,
- noreply(State);
-
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
dead_letter_msg(Msg, AckTag, Reason, State).