summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-21 16:43:25 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-21 16:43:25 +0100
commit6db76bf64b8a941f5ac7bb76f042e38088d7404a (patch)
tree85d05e8998ba6bc0fad9811db93bfcf4c3b6ce41
parenta210d0695e455ad87ae28efcfd66a900123031f2 (diff)
downloadrabbitmq-server-6db76bf64b8a941f5ac7bb76f042e38088d7404a.tar.gz
Sort out misc. (dnc)
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl60
3 files changed, 31 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0b25a4e0..e279b055 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -253,8 +253,8 @@ determine_queue_nodes(Args) ->
case [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes] of
[] -> {node(), undefined};
- [Node] -> {Node, undefined};
- [First | Rest] -> {First, Rest}
+ [Node] -> {Node, undefined};
+ [First | Rest] -> {First, Rest}
end;
{{_Type, <<"all">>}, _} ->
{node(), all};
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index da12ea82..dd2e76a1 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -82,7 +82,7 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
_ -> [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes]
end) -- [node()],
- [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1],
+ [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- Nodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
#state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 94402d28..633af6cb 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_misc).
-export([remove_from_queue/2, on_node_up/0,
- drop_slave/2, drop_slave/3, add_slave/2, add_slave/3]).
+ drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]).
-include("rabbit.hrl").
@@ -25,7 +25,9 @@
%% then only remove that if we are about to be promoted. Otherwise we
%% can have the situation where a slave updates the mnesia record for
%% a queue, promoting another slave before that slave realises it has
-%% become the new master.
+%% become the new master, which is bad because it could then mean the
+%% slave (now master) receives messages it's not ready for (for
+%% example, new consumers).
remove_from_queue(QueueName, DeadPids) ->
DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
rabbit_misc:execute_mnesia_transaction(
@@ -34,8 +36,8 @@ remove_from_queue(QueueName, DeadPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids }] ->
[QPid1 | SPids1] =
[Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
@@ -44,8 +46,8 @@ remove_from_queue(QueueName, DeadPids) ->
ok;
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
- %% we're ok to update mnesia; or master
- %% has changed to become us!
+ %% we're ok to update mnesia; or we have
+ %% become the master.
Q1 = Q #amqqueue { pid = QPid1,
slave_pids = SPids1 },
ok = rabbit_amqqueue:store_queue(Q1);
@@ -65,50 +67,44 @@ on_node_up() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (#amqqueue{ arguments = Args, name = QName }, QsN) ->
- case rabbit_misc:table_lookup(
- Args, <<"x-mirror">>) of
- {_Type, []} ->
- [QName | QsN];
- {_Type, Nodes} ->
- Nodes1 = [list_to_atom(binary_to_list(Node))
- || {longstr, Node} <- Nodes],
- case lists:member(node(), Nodes1) of
- true -> [QName | QsN];
- false -> QsN
- end;
- _ ->
- QsN
+ fun (#amqqueue { mirror_nodes = [] }, QsN) ->
+ QsN;
+ (#amqqueue { name = QName,
+ mirror_nodes = MNodes }, QsN) ->
+ case lists:member(node(), MNodes) of
+ true -> [QName | QsN];
+ false -> QsN
end
end, [], rabbit_queue)
end),
- [add_slave(Q, node()) || Q <- Qs],
+ [add_mirror(Q, node()) || Q <- Qs],
ok.
-drop_slave(VHostPath, QueueName, MirrorNode) ->
- drop_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+drop_mirror(VHostPath, QueueName, MirrorNode) ->
+ drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
-drop_slave(Queue, MirrorNode) ->
+drop_mirror(Queue, MirrorNode) ->
if_mirrored_queue(
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
{error, {queue_not_mirrored_on_node, MirrorNode}};
- [QPid | SPids] ->
+ [QPid] when SPids =:= [] ->
{error, cannot_drop_only_mirror};
[Pid] ->
- rabbit_log:info("Dropping slave node on ~p for ~s~n",
- [MirrorNode, rabbit_misc:rs(Name)]),
+ rabbit_log:info(
+ "Dropping queue mirror on node ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
exit(Pid, {shutdown, dropped}),
ok
end
end).
-add_slave(VHostPath, QueueName, MirrorNode) ->
- add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+add_mirror(VHostPath, QueueName, MirrorNode) ->
+ add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
-add_slave(Queue, MirrorNode) ->
+add_mirror(Queue, MirrorNode) ->
if_mirrored_queue(
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
@@ -116,7 +112,7 @@ add_slave(Queue, MirrorNode) ->
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
rabbit_log:info(
- "Adding slave node for ~s on node ~p: ~p~n",
+ "Adding mirror of queue ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, Result]),
case Result of
{ok, _Pid} -> ok;
@@ -129,7 +125,7 @@ add_slave(Queue, MirrorNode) ->
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
Queue, fun (#amqqueue { arguments = Args } = Q) ->
- case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
undefined -> ok;
_ -> Fun(Q)
end