summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl29
1 files changed, 19 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2e8509..80b7a92c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -281,13 +281,13 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
+ unblock -> {NewBlockedConsumers, NewActiveConsumers} =
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.active_consumers),
run_poke_burst(
State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedeConsumers})
+ blocked_consumers = NewBlockedConsumers})
end
end.
@@ -297,7 +297,7 @@ should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
- not_found -> noreply(State);
+ not_found -> {ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
@@ -321,8 +321,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
blocked_consumers = remove_consumers(
ChPid, State#q.blocked_consumers)}),
case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ false -> {ok, NewState};
+ true -> {stop, NewState}
end
end.
@@ -576,10 +576,16 @@ handle_call({commit, Txn}, From, State) ->
erase_tx(Txn),
noreply(NewState);
-handle_call({notify_down, ChPid}, From, State) ->
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
- handle_ch_down(ChPid, State);
+handle_call({notify_down, ChPid}, _From, State) ->
+ %% we want to do this synchronously, so that auto_deleted queues
+ %% are no longer visible by the time we send a response to the
+ %% client. The queue is ultimately deleted in terminate/2; if we
+ %% return stop with a reply, terminate/2 will be called by
+ %% gen_server2 *before* the reply is sent.
+ case handle_ch_down(ChPid, State) of
+ {ok, NewState} -> reply(ok, NewState);
+ {stop, NewState} -> {stop, normal, ok, NewState}
+ end;
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
@@ -813,7 +819,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
NewState = State#q{owner = none},
{stop, normal, NewState};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
- handle_ch_down(DownPid, State);
+ case handle_ch_down(DownPid, State) of
+ {ok, NewState} -> noreply(NewState);
+ {stop, NewState} -> {stop, normal, NewState}
+ end;
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),