diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-21 16:43:25 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-21 16:43:25 +0100 |
commit | 6db76bf64b8a941f5ac7bb76f042e38088d7404a (patch) | |
tree | 85d05e8998ba6bc0fad9811db93bfcf4c3b6ce41 /src | |
parent | a210d0695e455ad87ae28efcfd66a900123031f2 (diff) | |
download | rabbitmq-server-6db76bf64b8a941f5ac7bb76f042e38088d7404a.tar.gz |
Sort out misc. (dnc)
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 60 |
3 files changed, 31 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0b25a4e0..e279b055 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -253,8 +253,8 @@ determine_queue_nodes(Args) -> case [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] of [] -> {node(), undefined}; - [Node] -> {Node, undefined}; - [First | Rest] -> {First, Rest} + [Node] -> {Node, undefined}; + [First | Rest] -> {First, Rest} end; {{_Type, <<"all">>}, _} -> {node(), all}; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index da12ea82..dd2e76a1 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -82,7 +82,7 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover, _ -> [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] end) -- [node()], - [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1], + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- Nodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), #state { gm = GM, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 94402d28..633af6cb 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_misc). -export([remove_from_queue/2, on_node_up/0, - drop_slave/2, drop_slave/3, add_slave/2, add_slave/3]). + drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]). -include("rabbit.hrl"). @@ -25,7 +25,9 @@ %% then only remove that if we are about to be promoted. Otherwise we %% can have the situation where a slave updates the mnesia record for %% a queue, promoting another slave before that slave realises it has -%% become the new master. +%% become the new master, which is bad because it could then mean the +%% slave (now master) receives messages it's not ready for (for +%% example, new consumers). remove_from_queue(QueueName, DeadPids) -> DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], rabbit_misc:execute_mnesia_transaction( @@ -34,8 +36,8 @@ remove_from_queue(QueueName, DeadPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids }] -> [QPid1 | SPids1] = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], @@ -44,8 +46,8 @@ remove_from_queue(QueueName, DeadPids) -> ok; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so - %% we're ok to update mnesia; or master - %% has changed to become us! + %% we're ok to update mnesia; or we have + %% become the master. Q1 = Q #amqqueue { pid = QPid1, slave_pids = SPids1 }, ok = rabbit_amqqueue:store_queue(Q1); @@ -65,50 +67,44 @@ on_node_up() -> rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue{ arguments = Args, name = QName }, QsN) -> - case rabbit_misc:table_lookup( - Args, <<"x-mirror">>) of - {_Type, []} -> - [QName | QsN]; - {_Type, Nodes} -> - Nodes1 = [list_to_atom(binary_to_list(Node)) - || {longstr, Node} <- Nodes], - case lists:member(node(), Nodes1) of - true -> [QName | QsN]; - false -> QsN - end; - _ -> - QsN + fun (#amqqueue { mirror_nodes = [] }, QsN) -> + QsN; + (#amqqueue { name = QName, + mirror_nodes = MNodes }, QsN) -> + case lists:member(node(), MNodes) of + true -> [QName | QsN]; + false -> QsN end end, [], rabbit_queue) end), - [add_slave(Q, node()) || Q <- Qs], + [add_mirror(Q, node()) || Q <- Qs], ok. -drop_slave(VHostPath, QueueName, MirrorNode) -> - drop_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +drop_mirror(VHostPath, QueueName, MirrorNode) -> + drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -drop_slave(Queue, MirrorNode) -> +drop_mirror(Queue, MirrorNode) -> if_mirrored_queue( Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid | SPids] -> + [QPid] when SPids =:= [] -> {error, cannot_drop_only_mirror}; [Pid] -> - rabbit_log:info("Dropping slave node on ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), exit(Pid, {shutdown, dropped}), ok end end). -add_slave(VHostPath, QueueName, MirrorNode) -> - add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +add_mirror(VHostPath, QueueName, MirrorNode) -> + add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). -add_slave(Queue, MirrorNode) -> +add_mirror(Queue, MirrorNode) -> if_mirrored_queue( Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> @@ -116,7 +112,7 @@ add_slave(Queue, MirrorNode) -> [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), rabbit_log:info( - "Adding slave node for ~s on node ~p: ~p~n", + "Adding mirror of queue ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of {ok, _Pid} -> ok; @@ -129,7 +125,7 @@ add_slave(Queue, MirrorNode) -> if_mirrored_queue(Queue, Fun) -> rabbit_amqqueue:with( Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of undefined -> ok; _ -> Fun(Q) end |