summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-07-05 16:00:28 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-07-05 16:00:28 +0100
commitf4e20306ddd516c40e26f3c8faf1e2316d1cc604 (patch)
treef930b063fc9c466f3857591e6654e2f4b1052b6d
parenta47b7dba6201972a293dd3563466ad29f834e799 (diff)
downloadrabbitmq-server-f4e20306ddd516c40e26f3c8faf1e2316d1cc604.tar.gz
Rough sketch of dynamic queue HA-ness:
* Remove #amqqueue.mirror_nodes, we will always derive this from policy * Remove everything to do with x-ha-* arguments * Abstract a bit more stuff into rabbit_mirror_queue_misc * Add a new "at-least" mode This works! Sort of. Changing policies on the fly, changing HAness altogether, having "at-least" set up a new mirror when one disappears, and probably some other things do not work.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl54
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_mirror_queue_master.erl10
-rw-r--r--src/rabbit_mirror_queue_misc.erl69
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl14
7 files changed, 89 insertions, 92 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index e8b4a623..7be82aef 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,7 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes, policy}).
+ arguments, pid, slave_pids, policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index afbaea65..0565b1a5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -207,15 +207,15 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- {Node, MNodes} = determine_queue_nodes(Args),
- Q = start_queue_process(Node, #amqqueue{name = QueueName,
+ Node = node(), %% TODO utter rubbish
+ Q = start_queue_process(
+ Node, rabbit_policy:set(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
exclusive_owner = Owner,
pid = none,
- slave_pids = [],
- mirror_nodes = MNodes}),
+ slave_pids = []})),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -270,22 +270,6 @@ store_queue(Q = #amqqueue{durable = false}) ->
policy_changed(_Q1, _Q2) ->
ok.
-determine_queue_nodes(Args) ->
- Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
- PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
- case {Policy, PolicyParams} of
- {{_Type, <<"nodes">>}, {array, Nodes}} ->
- case [list_to_atom(binary_to_list(Node)) ||
- {longstr, Node} <- Nodes] of
- [Node] -> {Node, undefined};
- [First | Rest] -> {First, [First | Rest]}
- end;
- {{_Type, <<"all">>}, _} ->
- {node(), all};
- _ ->
- {node(), undefined}
- end.
-
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
Q#amqqueue{pid = Pid}.
@@ -351,13 +335,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
rabbit_misc:assert_args_equivalence(
- Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
+ Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
{<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
- {<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
@@ -406,29 +388,6 @@ check_dlxrk_arg({longstr, _}, Args) ->
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-check_ha_policy_arg({longstr, <<"all">>}, _Args) ->
- ok;
-check_ha_policy_arg({longstr, <<"nodes">>}, Args) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
- undefined ->
- {error, {require, 'x-ha-policy-params'}};
- {array, []} ->
- {error, {require_non_empty_list_of_nodes_for_ha}};
- {array, Ary} ->
- case lists:all(fun ({longstr, _Node}) -> true;
- (_ ) -> false
- end, Ary) of
- true -> ok;
- false -> {error, {require_node_list_as_longstrs_for_ha, Ary}}
- end;
- {Type, _} ->
- {error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
- end;
-check_ha_policy_arg({longstr, Policy}, _Args) ->
- {error, {invalid_ha_policy, Policy}};
-check_ha_policy_arg({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
-
list() ->
mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
@@ -625,8 +584,7 @@ pseudo_queue(QueueName, Pid) ->
auto_delete = false,
arguments = [],
pid = Pid,
- slave_pids = [],
- mirror_nodes = undefined}.
+ slave_pids = []}.
deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8933de87..71f8aacd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -230,8 +230,7 @@ matches(false, Q1, Q2) ->
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
- Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
bq_init(BQ, Q, Recover) ->
Self = self(),
@@ -296,11 +295,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_module(#amqqueue{arguments = Args}) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
- undefined -> {ok, BQM} = application:get_env(backing_queue_module),
- BQM;
- _Policy -> rabbit_mirror_queue_master
+backing_queue_module(Q) ->
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ true -> rabbit_mirror_queue_master
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -906,11 +905,11 @@ infos(Items, State) ->
Prefix ++ [{Item, i(Item, State)}
|| Item <- (Items1 -- [synchronised_slave_pids])].
-slaves_status(#q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} ->
+slaves_status(#q{q = Q}) ->
+ case rabbit_mirror_queue_misc:slave_pids(Q) of
+ not_mirrored -> %% TODO do we need this branch?
[{slave_pids, ''}, {synchronised_slave_pids, ''}];
- {ok, #amqqueue{slave_pids = SPids}} ->
+ SPids ->
{Results, _Bad} =
delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
{SPids1, SSPids} =
@@ -955,10 +954,10 @@ i(consumers, _) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> [];
- {ok, #amqqueue{slave_pids = SPids}} -> SPids
+i(slave_pids, #q{q = Q}) ->
+ case rabbit_mirror_queue_misc:slave_pids(Q) of
+ not_mirrored -> [];
+ SPids -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 750bcd56..899c31b7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -82,17 +82,13 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
+init(#amqqueue { name = QName } = Q, Recover,
AsyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- MNodes1 =
- (case MNodes of
- all -> rabbit_mnesia:all_clustered_nodes();
- undefined -> [];
- _ -> MNodes
- end) -- [node()],
+ {_MNode, MNodes} = rabbit_mirror_queue_misc:determine_queue_nodes(Q),
+ MNodes1 = MNodes -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..876257b8 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,6 +20,10 @@
drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
report_deaths/4]).
+%% temp
+-export([determine_queue_nodes/1, is_mirrored/1, slave_pids/1]).
+
+
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -85,24 +89,19 @@ remove_from_queue(QueueName, DeadPids) ->
end).
on_node_up() ->
- Qs =
+ QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (#amqqueue { mirror_nodes = undefined }, QsN) ->
- QsN;
- (#amqqueue { name = QName,
- mirror_nodes = all }, QsN) ->
- [QName | QsN];
- (#amqqueue { name = QName,
- mirror_nodes = MNodes }, QsN) ->
+ fun (Q = #amqqueue{name = QName}, QNames0) ->
+ {_MNode, MNodes} = determine_queue_nodes(Q),
case lists:member(node(), MNodes) of
- true -> [QName | QsN];
- false -> QsN
+ true -> [QName | QNames0];
+ false -> QNames0
end
end, [], rabbit_queue)
end),
- [add_mirror(Q, node()) || Q <- Qs],
+ [add_mirror(QName, node()) || QName <- QNames],
ok.
drop_mirror(VHostPath, QueueName, MirrorNode) ->
@@ -150,14 +149,13 @@ add_mirror(Queue, MirrorNode) ->
end
end).
-if_mirrored_queue(Queue, Fun) ->
- rabbit_amqqueue:with(
- Queue, fun (#amqqueue { arguments = Args } = Q) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
- undefined -> ok;
- _ -> Fun(Q)
- end
- end).
+if_mirrored_queue(QName, Fun) ->
+ rabbit_amqqueue:with(QName, fun (Q) ->
+ case is_mirrored(Q) of
+ false -> ok;
+ true -> Fun(Q)
+ end
+ end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
@@ -172,3 +170,36 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
end,
rabbit_misc:pid_to_string(MirrorPid),
[[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
+
+%%----------------------------------------------------------------------------
+
+determine_queue_nodes(Q) ->
+ case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of
+ [{ok, <<"all">>}, _] ->
+ {node(), rabbit_mnesia:all_clustered_nodes()};
+ [{ok, <<"nodes">>}, {ok, Nodes}] ->
+ case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of
+ [Node] -> {Node, []};
+ [First | Rest] -> {First, [First | Rest]}
+ end;
+ [{ok, <<"at-least">>}, {ok, Count}] ->
+ {node(), lists:sublist(rabbit_mnesia:all_clustered_nodes(), Count)};
+ _ ->
+ {node(), []}
+ end.
+
+is_mirrored(Q) ->
+ case rabbit_policy:get(<<"ha-mode">>, Q) of
+ {ok, <<"all">>} -> true;
+ {ok, <<"nodes">>} -> true;
+ {ok, <<"at-least">>} -> true;
+ _ -> false
+ end.
+
+slave_pids(Q = #amqqueue{name = Name}) ->
+ case is_mirrored(Q) of
+ false -> not_mirrored;
+ true -> {ok, #amqqueue{slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ SPids
+ end.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 732c29b6..adfb19a0 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -118,8 +118,7 @@
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
pid :: rabbit_types:maybe(pid()),
- slave_pids :: [pid()],
- mirror_nodes :: [node()] | 'undefined' | 'all'}).
+ slave_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 18704807..d024ff63 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -40,6 +40,7 @@
-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
-rabbit_upgrade({policy, mnesia,
[exchange_scratches, ha_mirrors]}).
+-rabbit_upgrade({no_mirror_nodes, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -62,6 +63,7 @@
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
+-spec(no_mirror_nodes/0 :: () -> 'ok').
-endif.
@@ -240,6 +242,18 @@ queue_policy(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid,
slave_pids, mirror_nodes, policy]).
+no_mirror_nodes() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ RemoveMirrorNodesFun =
+ fun ({amqqueue, Name, D, AD, O, Args, Pid, SPids, _MNodes, Policy}) ->
+ {amqqueue, Name, D, AD, O, Args, Pid, SPids, Policy}
+ end,
+ [ok = transform(T, RemoveMirrorNodesFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, policy])
+ || T <- Tables],
+ ok.
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->