summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_queue_collector.erl41
3 files changed, 32 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 24320f51..e8730b03 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,9 +115,7 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
- -> rabbit_types:ok_or_error2(qlen(),
- 'not_exclusive')).
+-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -362,8 +360,8 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
-delete_exclusive(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, delete_exclusive, infinity).
+delete_immediately(#amqqueue{ pid = QPid }) ->
+ gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 61204deb..19db731a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -594,7 +594,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -602,6 +601,7 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
update_ram_duration -> 8;
+ delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
maybe_expire -> 8;
@@ -787,16 +787,6 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)},
ensure_expiry_timer(State));
-handle_call(delete_exclusive, _From,
- State = #q{ backing_queue_state = BQS,
- backing_queue = BQ,
- q = #amqqueue{exclusive_owner = Owner}
- }) when Owner =/= none ->
- {stop, normal, {ok, BQ:len(BQS)}, State};
-
-handle_call(delete_exclusive, _From, State) ->
- reply({error, not_exclusive}, State);
-
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
@@ -868,6 +858,9 @@ handle_cast({reject, AckTags, Requeue, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
+handle_cast(delete_immediately, State) ->
+ {stop, normal, State};
+
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0b8efc8f..6ac402c8 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues}).
+-record(state, {queues, delete_from}).
-include("rabbit.hrl").
@@ -66,32 +66,39 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{queues = dict:new()}}.
+ {ok, #state{queues = dict:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
handle_call({register, Q}, _From,
- State = #state{queues = Queues}) ->
+ State = #state{queues = Queues, delete_from = Deleting}) ->
MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
- {reply, ok,
- State#state{queues = dict:store(MonitorRef, Q, Queues)}};
-
-handle_call(delete_all, _From, State = #state{queues = Queues}) ->
- [rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () ->
- erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete_exclusive(Q)
- end)
- || {MonitorRef, Q} <- dict:to_list(Queues)],
- {reply, ok, State}.
+ case Deleting of
+ undefined -> ok;
+ _ -> rabbit_amqqueue:delete_immediately(Q)
+ end,
+ {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, From, State = #state{queues = Queues,
+ delete_from = undefined}) ->
+ case dict:size(Queues) of
+ 0 -> {reply, ok, State#state{delete_from = From}};
+ _ -> [rabbit_amqqueue:delete_immediately(Q)
+ || {_MRef, Q} <- dict:to_list(Queues)],
+ {noreply, State#state{delete_from = From}}
+ end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues}) ->
- {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
+ State = #state{queues = Queues, delete_from = Deleting}) ->
+ Queues1 = dict:erase(MonitorRef, Queues),
+ case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of
+ true -> gen_server:reply(Deleting, ok);
+ false -> ok
+ end,
+ {noreply, State#state{queues = Queues1}}.
terminate(_Reason, _State) ->
ok.