diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24320f51..2389ec86 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). +-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -115,9 +115,7 @@ (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(delete_exclusive/1 :: (rabbit_types:amqqueue()) - -> rabbit_types:ok_or_error2(qlen(), - 'not_exclusive')). +-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -254,10 +252,10 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = []}). + rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -362,8 +360,8 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). -delete_exclusive(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, delete_exclusive, infinity). +delete_immediately(#amqqueue{ pid = QPid }) -> + gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -439,7 +437,7 @@ internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_durable_queue, QueueName}), %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. - rabbit_binding:remove_for_queue(QueueName). + rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> case rabbit_misc:execute_mnesia_transaction( @@ -450,8 +448,7 @@ internal_delete(QueueName) -> end end) of {error, _} = Err -> Err; - PostHook -> PostHook(), - ok + Deletions -> ok = rabbit_binding:process_deletions(Deletions) end. maybe_run_queue_via_backing_queue(QPid, Fun) -> @@ -470,19 +467,20 @@ maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> - [Hook() || - Hook <- rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end)], - ok. + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end))). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), - rabbit_binding:remove_transient_for_queue(QueueName). + rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, @@ -508,4 +506,3 @@ delegate_call(Pid, Msg, Timeout) -> delegate_cast(Pid, Msg) -> delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - |