summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl37
1 files changed, 23 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7a10d239..4e23dbd2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -258,15 +258,16 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
Q = rabbit_queue_decorator:set(
- rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []})),
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ down_slave_nodes = [],
+ gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
@@ -666,15 +667,23 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [rabbit_binding:process_deletions(
- internal_delete1(Name)) ||
- #amqqueue{name = Name, pid = Pid} = Q <- Qs,
- node(Pid) =:= Node,
- rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
+ [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node],
ok
end),
ok.
+forget_node_for_queue(#amqqueue{name = Name,
+ down_slave_nodes = []}) ->
+ %% No slaves to recover from, queue is gone
+ rabbit_binding:process_deletions(internal_delete1(Name));
+
+forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
+ %% Promote a slave while down - it'll happily recover as a master
+ Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
+ down_slave_nodes = T},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write).
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).