diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2e8509..80b7a92c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -281,13 +281,13 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> {NewBlockedeConsumers, NewActiveConsumers} = + unblock -> {NewBlockedConsumers, NewActiveConsumers} = move_consumers(ChPid, State#q.blocked_consumers, State#q.active_consumers), run_poke_burst( State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedeConsumers}) + blocked_consumers = NewBlockedConsumers}) end end. @@ -297,7 +297,7 @@ should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of - not_found -> noreply(State); + not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, unacked_messages = UAM} -> erlang:demonitor(MonitorRef), @@ -321,8 +321,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> blocked_consumers = remove_consumers( ChPid, State#q.blocked_consumers)}), case should_auto_delete(NewState) of - false -> noreply(NewState); - true -> {stop, normal, NewState} + false -> {ok, NewState}; + true -> {stop, NewState} end end. @@ -576,10 +576,16 @@ handle_call({commit, Txn}, From, State) -> erase_tx(Txn), noreply(NewState); -handle_call({notify_down, ChPid}, From, State) -> - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), - handle_ch_down(ChPid, State); +handle_call({notify_down, ChPid}, _From, State) -> + %% we want to do this synchronously, so that auto_deleted queues + %% are no longer visible by the time we send a response to the + %% client. The queue is ultimately deleted in terminate/2; if we + %% return stop with a reply, terminate/2 will be called by + %% gen_server2 *before* the reply is sent. + case handle_ch_down(ChPid, State) of + {ok, NewState} -> reply(ok, NewState); + {stop, NewState} -> {stop, normal, ok, NewState} + end; handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, @@ -813,7 +819,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, NewState = State#q{owner = none}, {stop, normal, NewState}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> - handle_ch_down(DownPid, State); + case handle_ch_down(DownPid, State) of + {ok, NewState} -> noreply(NewState); + {stop, NewState} -> {stop, normal, NewState} + end; handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), |