summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-05 15:12:40 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-05 15:12:40 +0000
commitb4dba4367d50d5b46a08255d486d05f0ede0a1c1 (patch)
tree36d5a50caa4167691925d30265e77b738ee043fe
parent1da40fe4d737f507f38c000e85bcd7f067bef4eb (diff)
parent946178bfa19a1b87d8ee946aefd32ad29341acc1 (diff)
downloadrabbitmq-server-b4dba4367d50d5b46a08255d486d05f0ede0a1c1.tar.gz
Merge bug25979
-rw-r--r--docs/rabbitmqctl.1.xml17
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_mnesia.erl1
-rw-r--r--src/rabbit_nodes.erl15
-rw-r--r--src/rabbit_reader.erl16
-rw-r--r--src/rabbit_runtime_parameters.erl64
-rw-r--r--src/rabbit_upgrade_functions.erl27
7 files changed, 113 insertions, 32 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d19acd00..a7e42503 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -502,6 +502,23 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets the cluster name. The cluster name is announced to
+ clients on connection, and used by the federation and
+ shovel plugins to record where a message has been. The
+ cluster name is by default derived from the hostname of
+ the first node in the cluster, but can be changed.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_cluster_name london</screen>
+ <para role="example">
+ This sets the cluster name to "london".
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index f3463286..746f2bdb 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -90,6 +90,7 @@
status,
environment,
report,
+ set_cluster_name,
eval,
close_connection,
@@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) ->
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
ok;
+action(set_cluster_name, Node, [Name], _Opts, Inform) ->
+ Inform("Setting cluster name to ~s", [Name]),
+ rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]);
+
action(eval, Node, [Expr], _Opts, _Inform) ->
case erl_scan:string(Expr) of
{ok, Scanned, _} ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f27f77c6..59873ffc 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -327,6 +327,7 @@ status() ->
case is_running() of
true -> RunningNodes = cluster_nodes(running),
[{running_nodes, RunningNodes},
+ {cluster_name, rabbit_nodes:cluster_name()},
{partitions, mnesia_partitions(RunningNodes)}];
false -> []
end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 5a1613a7..c5aa8473 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,8 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2, fqdn_nodename/0]).
+ is_running/2, is_process_running/2,
+ cluster_name/0, set_cluster_name/1]).
-include_lib("kernel/include/inet.hrl").
@@ -37,7 +38,8 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
--spec(fqdn_nodename/0 :: () -> binary()).
+-spec(cluster_name/0 :: () -> binary()).
+-spec(set_cluster_name/1 :: (binary()) -> 'ok').
-endif.
@@ -111,8 +113,15 @@ is_process_running(Node, Process) ->
P when is_pid(P) -> true
end.
-fqdn_nodename() ->
+cluster_name() ->
+ rabbit_runtime_parameters:value_global(
+ cluster_name, cluster_name_default()).
+
+cluster_name_default() ->
{ID, _} = rabbit_nodes:parts(node()),
{ok, Host} = inet:gethostname(),
{ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
+
+set_cluster_name(Name) ->
+ rabbit_runtime_parameters:set_global(cluster_name, Name).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 122eb305..47bc99d8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -156,19 +156,23 @@ server_properties(Protocol) ->
[case X of
{KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
longstr,
- list_to_binary(Value)};
+ maybe_list_to_binary(Value)};
{BinKey, Type, Value} -> {BinKey, Type, Value}
end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]]],
+ [{product, Product},
+ {version, Version},
+ {cluster_name, rabbit_nodes:cluster_name()},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favour of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+maybe_list_to_binary(V) when is_binary(V) -> V;
+maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V).
+
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index bcde0078..18b9fbb8 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -22,6 +22,8 @@
list_component/1, list/2, list_formatted/1, lookup/3,
value/3, value/4, info_keys/0]).
+-export([set_global/2, value_global/1, value_global/2]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -34,6 +36,7 @@
-> ok_or_error_string()).
-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_global/2 :: (atom(), term()) -> 'ok').
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
@@ -48,6 +51,8 @@
-> rabbit_types:infos() | 'not_found').
-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
+-spec(value_global/1 :: (atom()) -> term() | 'not_found').
+-spec(value_global/2 :: (atom(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-endif.
@@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) ->
set(VHost, Component, Name, Term) ->
set_any(VHost, Component, Name, Term).
+set_global(Name, Term) ->
+ mnesia_update(Name, Term),
+ ok.
+
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
@@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) ->
E
end.
+mnesia_update(Key, Term) ->
+ rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)).
+
mnesia_update(VHost, Comp, Name, Term) ->
- F = fun () ->
- Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of
- [] -> new;
- [Params] -> {old, Params#runtime_parameters.value}
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))).
+
+mnesia_update_fun(Key, Term) ->
+ fun () ->
+ Res = case mnesia:read(?TABLE, Key, read) of
+ [] -> new;
+ [Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write),
- Res
- end,
- rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)).
+ ok = mnesia:write(?TABLE, c(Key, Term), write),
+ Res
+ end.
clear(_, <<"policy">> , _) ->
{error_string, "policies may not be cleared using this method"};
@@ -159,43 +174,46 @@ list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
lookup(VHost, Component, Name) ->
- case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
+ case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(VHost, Component, Name) ->
- case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
+value(VHost, Comp, Name) -> value0({VHost, Comp, Name}).
+value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def).
+
+value_global(Key) -> value0(Key).
+value_global(Key, Default) -> value0(Key, Default).
+
+value0(Key) ->
+ case lookup0(Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(VHost, Component, Name, Default) ->
- Params = lookup0(VHost, Component, Name,
- fun () ->
- lookup_missing(VHost, Component, Name, Default)
- end),
+value0(Key, Default) ->
+ Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end),
Params#runtime_parameters.value.
-lookup0(VHost, Component, Name, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of
+lookup0(Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, Key) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(VHost, Component, Name, Default) ->
+lookup_missing(Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
- [] -> Record = c(VHost, Component, Name, Default),
+ case mnesia:read(?TABLE, Key, read) of
+ [] -> Record = c(Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(VHost, Component, Name, Default) ->
- #runtime_parameters{key = {VHost, Component, Name},
+c(Key, Default) ->
+ #runtime_parameters{key = Key,
value = Default}.
p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 90372461..4cb3cacc 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -47,6 +47,7 @@
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
+-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
%% -------------------------------------------------------------------
@@ -355,6 +356,32 @@ internal_system_x() ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+cluster_name() ->
+ {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0),
+ ok.
+
+cluster_name_tx() ->
+ %% mnesia:transform_table/4 does not let us delete records
+ T = rabbit_runtime_parameters,
+ mnesia:write_lock_table(T),
+ Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K
+ <- mnesia:all_keys(T)],
+ case Ks of
+ [] -> ok;
+ [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write),
+ R = {runtime_parameters, cluster_name, Name},
+ mnesia:write(T, R, write),
+ case Tl of
+ [] -> ok;
+ _ -> {VHost, _, _} = K,
+ error_logger:warning_msg(
+ "Multiple local-nodenames found, picking '~s' "
+ "from '~s' for cluster name~n", [Name, VHost])
+ end
+ end,
+ [mnesia:delete(T, K, write) || K <- Ks],
+ ok.
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->