summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-08-09 07:50:02 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2013-08-09 07:50:02 +0100
commitcfd2381c8133957ce66db1761096c983994d43d1 (patch)
treefb89e167d1abd1e9bbcd0ac6bd9812be2336864a
parent22c34ea12a1a5af08012b05e17873dc812de93ff (diff)
parentf721a7fa14b23c391fed908695491e526c0f2391 (diff)
downloadrabbitmq-server-cfd2381c8133957ce66db1761096c983994d43d1.tar.gz
merge bug25704 into default
-rw-r--r--src/delegate.erl75
-rw-r--r--src/rabbit_mnesia.erl52
2 files changed, 57 insertions, 70 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 7a06c1e4..5277e59f 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -33,15 +33,14 @@
-export_type([monitor_ref/0]).
-type(monitor_ref() :: reference() | {atom(), pid()}).
+-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}).
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
--spec(invoke/2 ::
- ( pid(), fun ((pid()) -> A)) -> A;
- ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
- [{pid(), term()}]}).
--spec(invoke_no_result/2 ::
- (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A;
+ ([pid()], fun_or_mfa(A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok').
-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
-spec(demonitor/1 :: (monitor_ref()) -> 'true').
-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true').
@@ -64,24 +63,24 @@ start_link(Num) ->
Name = delegate_name(Num),
gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
-invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- Fun(Pid);
-invoke(Pid, Fun) when is_pid(Pid) ->
- case invoke([Pid], Fun) of
+invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ apply1(FunOrMFA, Pid);
+invoke(Pid, FunOrMFA) when is_pid(Pid) ->
+ case invoke([Pid], FunOrMFA) of
{[{Pid, Result}], []} ->
Result;
{[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
-invoke([], _Fun) -> %% optimisation
+invoke([], _FunOrMFA) -> %% optimisation
{[], []};
-invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- case safe_invoke(Pid, Fun) of
+invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
{ok, _, Result} -> {[{Pid, Result}], []};
{error, _, Error} -> {[], [{Pid, Error}]}
end;
-invoke(Pids, Fun) when is_list(Pids) ->
+invoke(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
%% infinity, and thus there is no process spawned in order to do
@@ -91,38 +90,38 @@ invoke(Pids, Fun) when is_list(Pids) ->
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, Fun, Grouped}, infinity)
+ {invoke, FunOrMFA, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
Pid <- orddict:fetch(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
[Results || {_Node, Results} <- Replies]]),
lists:foldl(
fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- safe_invoke(Pid, Fun), %% we don't care about any error
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, FunOrMFA), %% we don't care about any error
ok;
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result([Pid], Fun);
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
+ invoke_no_result([Pid], FunOrMFA);
-invoke_no_result([], _Fun) -> %% optimisation
+invoke_no_result([], _FunOrMFA) -> %% optimisation
ok;
-invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- safe_invoke(Pid, Fun), %% must not die
+invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, FunOrMFA), %% must not die
ok;
-invoke_no_result(Pids, Fun) when is_list(Pids) ->
+invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
RemoteNodes -> gen_server2:abcast(
RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, Fun, Grouped})
+ {invoke, FunOrMFA, Grouped})
end,
- safe_invoke(LocalPids, Fun), %% must not die
+ safe_invoke(LocalPids, FunOrMFA), %% must not die
ok.
monitor(Type, Pid) when node(Pid) =:= node() ->
@@ -140,10 +139,10 @@ demonitor({Name, Pid}, Options) ->
gen_server2:cast(Name, {demonitor, Pid, Options}).
call(PidOrPids, Msg) ->
- invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+ invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
cast(PidOrPids, Msg) ->
- invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+ invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}).
%%----------------------------------------------------------------------------
@@ -171,23 +170,27 @@ delegate(Pid, RemoteNodes) ->
Name -> Name
end.
-safe_invoke(Pids, Fun) when is_list(Pids) ->
- [safe_invoke(Pid, Fun) || Pid <- Pids];
-safe_invoke(Pid, Fun) when is_pid(Pid) ->
+safe_invoke(Pids, FunOrMFA) when is_list(Pids) ->
+ [safe_invoke(Pid, FunOrMFA) || Pid <- Pids];
+safe_invoke(Pid, FunOrMFA) when is_pid(Pid) ->
try
- {ok, Pid, Fun(Pid)}
+ {ok, Pid, apply1(FunOrMFA, Pid)}
catch Class:Reason ->
{error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
+apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]);
+apply1(Fun, Arg) -> Fun(Arg).
+
%%----------------------------------------------------------------------------
init([Name]) ->
{ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
+handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State,
+ hibernate}.
handle_cast({monitor, Type, WantsMonitor, Pid},
State = #state{monitors = Monitors}) ->
@@ -205,8 +208,8 @@ handle_cast({demonitor, Pid, Options},
State
end, hibernate};
-handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
- safe_invoke(orddict:fetch(Node, Grouped), Fun),
+handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
+ safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
{noreply, State, hibernate}.
handle_info({'DOWN', Ref, process, Pid, Info},
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6b956818..85958400 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -396,7 +396,7 @@ cluster_status(WhichNodes) ->
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
- delegate_beam_hash(), cluster_status_from_mnesia()}.
+ cluster_status_from_mnesia()}.
node_type() ->
DiscNodes = cluster_nodes(disc),
@@ -570,16 +570,16 @@ check_cluster_consistency(Node) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
{error, not_found};
- {_OTP, _Rabbit, _Hash, {error, _}} ->
+ {_OTP, _Rabbit, {error, _}} ->
{error, not_found};
- {_OTP, Rabbit, _Status} ->
- %% pre-2013/04 format implies version mismatch
- version_error("Rabbit", rabbit_misc:version(), Rabbit);
- {OTP, Rabbit, Hash, {ok, Status}} ->
- case check_consistency(OTP, Rabbit, Hash, Node, Status) of
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
- end
+ end;
+ {_OTP, Rabbit, _Hash, _Status} ->
+ %% delegate hash checking implies version mismatch
+ version_error("Rabbit", rabbit_misc:version(), Rabbit)
end.
%%--------------------------------------------------------------------
@@ -743,17 +743,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-check_consistency(OTP, Rabbit, Hash) ->
+check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
- check_rabbit_consistency(Rabbit),
- check_beam_compatibility(Hash)]).
+ check_rabbit_consistency(Rabbit)]).
-check_consistency(OTP, Rabbit, Hash, Node, Status) ->
+check_consistency(OTP, Rabbit, Node, Status) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
check_rabbit_consistency(Rabbit),
- check_beam_compatibility(Hash),
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
@@ -789,21 +787,6 @@ check_rabbit_consistency(Remote) ->
rabbit_misc:version(), Remote, "Rabbit",
fun rabbit_misc:version_minor_equivalent/2).
-check_beam_compatibility(RemoteHash) ->
- case RemoteHash == delegate_beam_hash() of
- true -> ok;
- false -> {error, {incompatible_bytecode,
- "Incompatible Erlang bytecode found on nodes"}}
- end.
-
-%% The delegate module sends functions across the cluster; if it is
-%% out of sync (say due to mixed compilers), we will get badfun
-%% exceptions when trying to do so. Let's detect that at startup.
-delegate_beam_hash() ->
- {delegate, Obj, _} = code:get_object_code(delegate),
- {ok, {delegate, Hash}} = beam_lib:md5(Obj),
- Hash.
-
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
%% mnesia tables aren't there because restarted RAM nodes won't have
@@ -829,12 +812,13 @@ find_good_node([]) ->
none;
find_good_node([Node | Nodes]) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
- {_OTP, _Rabbit, _} -> find_good_node(Nodes);
- {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ %% old delegate hash check
+ {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->