diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 33 |
1 files changed, 15 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ac357663..af634dc3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -788,17 +788,16 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC1}) end. -stop_later(Reason, State) -> - stop_later(Reason, undefined, noreply, State). +stop(State) -> stop(undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> +stop(From, Reply, State = #q{unconfirmed = UC}) -> case {dtree:is_empty(UC), Reply} of {true, noreply} -> - {stop, Reason, State}; + {stop, normal, State}; {true, _} -> - {stop, Reason, Reply, State}; + {stop, normal, Reply, State}; {false, _} -> - noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) + noreply(State#q{delayed_stop = {From, Reply}}) end. cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, @@ -809,11 +808,10 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, State1 = State#q{backing_queue_state = BQS1}, case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of - {_, {_, noreply}} -> ok; - {_, {From, Reply}} -> gen_server2:reply(From, Reply) + {_, noreply} -> ok; + {From, Reply} -> gen_server2:reply(From, Reply) end, - {Reason, _} = DS, - {stop, Reason, State1}; + {stop, normal, State1}; false -> noreply(State1) end. @@ -1048,7 +1046,7 @@ handle_call({notify_down, ChPid}, From, State) -> %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> stop_later(normal, From, ok, State1) + {stop, State1} -> stop(From, ok, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -1123,7 +1121,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> stop_later(normal, From, ok, State1) + true -> stop(From, ok, State1) end end; @@ -1139,8 +1137,7 @@ handle_call({delete, IfUnused, IfEmpty}, From, if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> stop_later(normal, From, - {ok, BQ:len(BQS)}, State) + true -> stop(From, {ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1230,7 +1227,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> end)); handle_cast(delete_immediately, State) -> - stop_later(normal, State); + stop(State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1306,7 +1303,7 @@ handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> stop_later(normal, State); + true -> stop(State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1328,12 +1325,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - stop_later(normal, State); + stop(State); handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, State1} -> handle_queue_down(DownPid, Reason, State1); - {stop, State1} -> stop_later(normal, State1) + {stop, State1} -> stop(State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, |