summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-03-26 16:59:00 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-03-26 16:59:00 +0100
commita8497c025c50278ddae3c4c13ef3717c156147d8 (patch)
tree8bd3119d58a38534eea1e3dfc8393a4e7e1f26f1
parentdbecbebefad7e5c4401820e80dd966fa990db39a (diff)
parentc63f2bf395054787bb855cebfd0f3158acdf84f3 (diff)
downloadrabbitmq-server-a8497c025c50278ddae3c4c13ef3717c156147d8.tar.gz
Merge bug23610
-rw-r--r--src/rabbit_amqqueue.erl51
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_tests.erl2
3 files changed, 42 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9b6f14ca..c2724a12 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,7 +32,7 @@
%% internal
--export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -144,11 +144,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/1 ::
- (name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/2 ::
+ (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -231,7 +231,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName),
+ false -> TailFun = internal_delete(QueueName, QPid),
fun () -> TailFun(), ExistingQ end
end
end
@@ -534,13 +534,19 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName) ->
+internal_delete(QueueName, QPid) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- rabbit_binding:process_deletions(Deletions)
+ T = rabbit_binding:process_deletions(Deletions),
+ fun() ->
+ ok = T(),
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QueueName}])
+ end
end
end).
@@ -555,14 +561,25 @@ set_maximum_since_use(QPid, Age) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid,
- slave_pids = []}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
- rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels))
+ fun () -> QsDels =
+ qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ #amqqueue{name = QName, pid = Pid,
+ slave_pids = []}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node])),
+ {Qs, Dels} = lists:unzip(QsDels),
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels)),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun({QName, QPid}) ->
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QName}])
+ end, Qs)
+ end
end).
delete_queue(QueueName) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 83d9ae22..e1fd9bbc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -180,16 +180,13 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate(Reason, State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- terminate_shutdown(fun (BQS) ->
- rabbit_event:notify(
- queue_deleted, [{pid, self()},
- {name, QName}]),
- BQS1 = BQ:delete_and_terminate(Reason, BQS),
- %% don't care if the internal delete
- %% doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(qname(State)),
- BQS1
- end, State).
+ terminate_shutdown(
+ fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
+ %% don't care if the internal delete doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(QName, self()),
+ BQS1
+ end, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 85fe5426..55e4a6f8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2551,7 +2551,7 @@ test_queue_recover() ->
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName)
+ rabbit_amqqueue:internal_delete(QName, QPid1)
end),
passed.