diff options
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 41 |
3 files changed, 32 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24320f51..e8730b03 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). +-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -115,9 +115,7 @@ (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(delete_exclusive/1 :: (rabbit_types:amqqueue()) - -> rabbit_types:ok_or_error2(qlen(), - 'not_exclusive')). +-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -362,8 +360,8 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). -delete_exclusive(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, delete_exclusive, infinity). +delete_immediately(#amqqueue{ pid = QPid }) -> + gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 61204deb..19db731a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -594,7 +594,6 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - delete_exclusive -> 8; {maybe_run_queue_via_backing_queue, _Fun} -> 6; _ -> 0 end. @@ -602,6 +601,7 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of update_ram_duration -> 8; + delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; @@ -787,16 +787,6 @@ handle_call(stat, _From, State = #q{backing_queue = BQ, reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, ensure_expiry_timer(State)); -handle_call(delete_exclusive, _From, - State = #q{ backing_queue_state = BQS, - backing_queue = BQ, - q = #amqqueue{exclusive_owner = Owner} - }) when Owner =/= none -> - {stop, normal, {ok, BQ:len(BQS)}, State}; - -handle_call(delete_exclusive, _From, State) -> - reply({error, not_exclusive}, State); - handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), @@ -868,6 +858,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); +handle_cast(delete_immediately, State) -> + {stop, normal, State}; + handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 0b8efc8f..6ac402c8 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues}). +-record(state, {queues, delete_from}). -include("rabbit.hrl"). @@ -66,32 +66,39 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new()}}. + {ok, #state{queues = dict:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- handle_call({register, Q}, _From, - State = #state{queues = Queues}) -> + State = #state{queues = Queues, delete_from = Deleting}) -> MonitorRef = erlang:monitor(process, Q#amqqueue.pid), - {reply, ok, - State#state{queues = dict:store(MonitorRef, Q, Queues)}}; - -handle_call(delete_all, _From, State = #state{queues = Queues}) -> - [rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> - erlang:demonitor(MonitorRef), - rabbit_amqqueue:delete_exclusive(Q) - end) - || {MonitorRef, Q} <- dict:to_list(Queues)], - {reply, ok, State}. + case Deleting of + undefined -> ok; + _ -> rabbit_amqqueue:delete_immediately(Q) + end, + {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, From, State = #state{queues = Queues, + delete_from = undefined}) -> + case dict:size(Queues) of + 0 -> {reply, ok, State#state{delete_from = From}}; + _ -> [rabbit_amqqueue:delete_immediately(Q) + || {_MRef, Q} <- dict:to_list(Queues)], + {noreply, State#state{delete_from = From}} + end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues}) -> - {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}. + State = #state{queues = Queues, delete_from = Deleting}) -> + Queues1 = dict:erase(MonitorRef, Queues), + case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of + true -> gen_server:reply(Deleting, ok); + false -> ok + end, + {noreply, State#state{queues = Queues1}}. terminate(_Reason, _State) -> ok. |