summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-16 16:06:24 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-16 16:06:24 +0100
commit71ca05e3c59a5b24463e87ea29f4e83a726a27a5 (patch)
tree03d7e677c6151a792232bc533626e396b82725fb
parent7d1aa1dde7bb937bbecdcca71870fc7c90a9caff (diff)
parent01dbbdb6c072d65714bbc3ee6722741fec1384bc (diff)
downloadrabbitmq-server-71ca05e3c59a5b24463e87ea29f4e83a726a27a5.tar.gz
merge bug25202 into default
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl20
-rw-r--r--src/rabbit_upgrade_functions.erl15
6 files changed, 52 insertions, 17 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 41cce0a3..3db2b68a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,8 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy}).
+ arguments, pid, slave_pids, sync_slave_pids, policy,
+ gm_pids}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a8b0ea24..6ad85b24 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
- sync_slave_pids = []}),
+ sync_slave_pids = [],
+ gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6dac2808..377d5186 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -97,10 +97,18 @@ init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State.
-init_with_existing_bq(Q, BQ, BQS) ->
+init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), depth_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ end),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -157,7 +165,7 @@ stop_all_slaves(Reason, #state{gm = GM}) ->
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { slave_pids = [] })
+ Q #amqqueue { gm_pids = [], slave_pids = [] })
end),
ok = gm:forget_group(QName).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b6c229aa..901f33b1 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -59,7 +59,6 @@
%% Returns {ok, NewMPid, DeadPids}
remove_from_queue(QueueName, DeadGMPids) ->
- DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -67,12 +66,18 @@ remove_from_queue(QueueName, DeadGMPids) ->
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
- Alive = [Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
+ {Dead, GMPids1} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- Dead],
+ Alive = [QPid | SPids] -- DeadPids,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
+ GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
@@ -80,7 +85,8 @@ remove_from_queue(QueueName, DeadGMPids) ->
%% become the master.
store_updated_slaves(
Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
+ slave_pids = SPids1,
+ gm_pids = GMPids1 }),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f72ed03d..afb85738 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -104,7 +104,7 @@ init(Q = #amqqueue { name = QName }) ->
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, Node, QName) end) of
+ fun() -> init_it(Self, GM, Node, QName) end) of
{new, QPid} ->
erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
@@ -140,14 +140,15 @@ init(Q = #amqqueue { name = QName }) ->
duplicate_live_master ->
{stop, {duplicate_live_master, Node}};
existing ->
+ gm:leave(GM),
ignore
end.
-init_it(Self, Node, QName) ->
- [Q = #amqqueue { pid = QPid, slave_pids = SPids }] =
+init_it(Self, GM, Node, QName) ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
- [] -> add_slave(Q, Self),
+ [] -> add_slave(Q, Self, GM),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -155,17 +156,20 @@ init_it(Self, Node, QName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> Q1 = Q#amqqueue { slave_pids = SPids -- [SPid] },
- add_slave(Q1, Self),
+ false -> Q1 = Q#amqqueue {
+ slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {_, S} <- GMPids,
+ S =/= SPid] },
+ add_slave(Q1, Self, GM),
{new, QPid}
end
end.
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
-add_slave(Q = #amqqueue { slave_pids = SPids }, New) ->
+add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
- Q#amqqueue{slave_pids = SPids ++ [New]}).
+ Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index ddc9c565..21fdcd66 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -42,6 +42,7 @@
[exchange_scratches, ha_mirrors]}).
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
+-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
%% -------------------------------------------------------------------
@@ -66,6 +67,7 @@
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
+-spec(gm_pids/0 :: () -> 'ok').
-endif.
@@ -268,6 +270,19 @@ no_mirror_nodes() ->
|| T <- Tables],
ok.
+gm_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddGMPidsFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []}
+ end,
+ [ok = transform(T, AddGMPidsFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy, gm_pids])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------