diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-16 16:06:24 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-16 16:06:24 +0100 |
commit | 71ca05e3c59a5b24463e87ea29f4e83a726a27a5 (patch) | |
tree | 03d7e677c6151a792232bc533626e396b82725fb | |
parent | 7d1aa1dde7bb937bbecdcca71870fc7c90a9caff (diff) | |
parent | 01dbbdb6c072d65714bbc3ee6722741fec1384bc (diff) | |
download | rabbitmq-server-71ca05e3c59a5b24463e87ea29f4e83a726a27a5.tar.gz |
merge bug25202 into default
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 16 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 20 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 |
6 files changed, 52 insertions, 17 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 41cce0a3..3db2b68a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,8 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, policy}). + arguments, pid, slave_pids, sync_slave_pids, policy, + gm_pids}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a8b0ea24..6ad85b24 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], - sync_slave_pids = []}), + sync_slave_pids = [], + gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 6dac2808..377d5186 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -97,10 +97,18 @@ init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) -> ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State. -init_with_existing_bq(Q, BQ, BQS) -> +init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), depth_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Self = self(), + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue{gm_pids = GMPids}] + = mnesia:read({rabbit_queue, QName}), + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) + end), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -157,7 +165,7 @@ stop_all_slaves(Reason, #state{gm = GM}) -> fun () -> [Q] = mnesia:read({rabbit_queue, QName}), rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { slave_pids = [] }) + Q #amqqueue { gm_pids = [], slave_pids = [] }) end), ok = gm:forget_group(QName). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b6c229aa..901f33b1 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -59,7 +59,6 @@ %% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadGMPids) -> - DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -67,12 +66,18 @@ remove_from_queue(QueueName, DeadGMPids) -> case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> - Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + slave_pids = SPids, + gm_pids = GMPids }] -> + {Dead, GMPids1} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- Dead], + Alive = [QPid | SPids] -- DeadPids, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> + GMPids = GMPids1, %% ASSERTION {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so @@ -80,7 +85,8 @@ remove_from_queue(QueueName, DeadGMPids) -> %% become the master. store_updated_slaves( Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), + slave_pids = SPids1, + gm_pids = GMPids1 }), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f72ed03d..afb85738 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -104,7 +104,7 @@ init(Q = #amqqueue { name = QName }) -> Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun() -> init_it(Self, Node, QName) end) of + fun() -> init_it(Self, GM, Node, QName) end) of {new, QPid} -> erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( @@ -140,14 +140,15 @@ init(Q = #amqqueue { name = QName }) -> duplicate_live_master -> {stop, {duplicate_live_master, Node}}; existing -> + gm:leave(GM), ignore end. -init_it(Self, Node, QName) -> - [Q = #amqqueue { pid = QPid, slave_pids = SPids }] = +init_it(Self, GM, Node, QName) -> + [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] = mnesia:read({rabbit_queue, QName}), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of - [] -> add_slave(Q, Self), + [] -> add_slave(Q, Self, GM), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; @@ -155,17 +156,20 @@ init_it(Self, Node, QName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> Q1 = Q#amqqueue { slave_pids = SPids -- [SPid] }, - add_slave(Q1, Self), + false -> Q1 = Q#amqqueue { + slave_pids = SPids -- [SPid], + gm_pids = [T || T = {_, S} <- GMPids, + S =/= SPid] }, + add_slave(Q1, Self, GM), {new, QPid} end end. %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 -add_slave(Q = #amqqueue { slave_pids = SPids }, New) -> +add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( - Q#amqqueue{slave_pids = SPids ++ [New]}). + Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index ddc9c565..21fdcd66 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -42,6 +42,7 @@ [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). +-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). %% ------------------------------------------------------------------- @@ -66,6 +67,7 @@ -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). +-spec(gm_pids/0 :: () -> 'ok'). -endif. @@ -268,6 +270,19 @@ no_mirror_nodes() -> || T <- Tables], ok. +gm_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddGMPidsFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []} + end, + [ok = transform(T, AddGMPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy, gm_pids]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- |