summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl45
1 files changed, 21 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 24320f51..2389ec86 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,9 +115,7 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
- -> rabbit_types:ok_or_error2(qlen(),
- 'not_exclusive')).
+-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -254,10 +252,10 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = []}).
+ rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -362,8 +360,8 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
-delete_exclusive(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, delete_exclusive, infinity).
+delete_immediately(#amqqueue{ pid = QPid }) ->
+ gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -439,7 +437,7 @@ internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
- rabbit_binding:remove_for_queue(QueueName).
+ rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
case rabbit_misc:execute_mnesia_transaction(
@@ -450,8 +448,7 @@ internal_delete(QueueName) ->
end
end) of
{error, _} = Err -> Err;
- PostHook -> PostHook(),
- ok
+ Deletions -> ok = rabbit_binding:process_deletions(Deletions)
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
@@ -470,19 +467,20 @@ maybe_expire(QPid) ->
gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
- [Hook() ||
- Hook <- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end)],
- ok.
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end))).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- rabbit_binding:remove_transient_for_queue(QueueName).
+ rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
@@ -508,4 +506,3 @@ delegate_call(Pid, Msg, Timeout) ->
delegate_cast(Pid, Msg) ->
delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
-