diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-08-09 07:50:02 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-08-09 07:50:02 +0100 |
commit | cfd2381c8133957ce66db1761096c983994d43d1 (patch) | |
tree | fb89e167d1abd1e9bbcd0ac6bd9812be2336864a | |
parent | 22c34ea12a1a5af08012b05e17873dc812de93ff (diff) | |
parent | f721a7fa14b23c391fed908695491e526c0f2391 (diff) | |
download | rabbitmq-server-cfd2381c8133957ce66db1761096c983994d43d1.tar.gz |
merge bug25704 into default
-rw-r--r-- | src/delegate.erl | 75 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 52 |
2 files changed, 57 insertions, 70 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 7a06c1e4..5277e59f 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -33,15 +33,14 @@ -export_type([monitor_ref/0]). -type(monitor_ref() :: reference() | {atom(), pid()}). +-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). --spec(invoke/2 :: - ( pid(), fun ((pid()) -> A)) -> A; - ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], - [{pid(), term()}]}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A; + ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], + [{pid(), term()}]}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok'). -spec(monitor/2 :: ('process', pid()) -> monitor_ref()). -spec(demonitor/1 :: (monitor_ref()) -> 'true'). -spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). @@ -64,24 +63,24 @@ start_link(Num) -> Name = delegate_name(Num), gen_server2:start_link({local, Name}, ?MODULE, [Name], []). -invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - Fun(Pid); -invoke(Pid, Fun) when is_pid(Pid) -> - case invoke([Pid], Fun) of +invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + apply1(FunOrMFA, Pid); +invoke(Pid, FunOrMFA) when is_pid(Pid) -> + case invoke([Pid], FunOrMFA) of {[{Pid, Result}], []} -> Result; {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; -invoke([], _Fun) -> %% optimisation +invoke([], _FunOrMFA) -> %% optimisation {[], []}; -invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, Fun) of +invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; -invoke(Pids, Fun) when is_list(Pids) -> +invoke(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do @@ -91,38 +90,38 @@ invoke(Pids, Fun) when is_list(Pids) -> [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}, infinity) + {invoke, FunOrMFA, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - safe_invoke(Pid, Fun), %% we don't care about any error +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, FunOrMFA), %% we don't care about any error ok; -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result([Pid], Fun); +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> + invoke_no_result([Pid], FunOrMFA); -invoke_no_result([], _Fun) -> %% optimisation +invoke_no_result([], _FunOrMFA) -> %% optimisation ok; -invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - safe_invoke(Pid, Fun), %% must not die +invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + safe_invoke(Pid, FunOrMFA), %% must not die ok; -invoke_no_result(Pids, Fun) when is_list(Pids) -> +invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; RemoteNodes -> gen_server2:abcast( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}) + {invoke, FunOrMFA, Grouped}) end, - safe_invoke(LocalPids, Fun), %% must not die + safe_invoke(LocalPids, FunOrMFA), %% must not die ok. monitor(Type, Pid) when node(Pid) =:= node() -> @@ -140,10 +139,10 @@ demonitor({Name, Pid}, Options) -> gen_server2:cast(Name, {demonitor, Pid, Options}). call(PidOrPids, Msg) -> - invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). %%---------------------------------------------------------------------------- @@ -171,23 +170,27 @@ delegate(Pid, RemoteNodes) -> Name -> Name end. -safe_invoke(Pids, Fun) when is_list(Pids) -> - [safe_invoke(Pid, Fun) || Pid <- Pids]; -safe_invoke(Pid, Fun) when is_pid(Pid) -> +safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> + [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; +safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> try - {ok, Pid, Fun(Pid)} + {ok, Pid, apply1(FunOrMFA, Pid)} catch Class:Reason -> {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. +apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); +apply1(Fun, Arg) -> Fun(Arg). + %%---------------------------------------------------------------------------- init([Name]) -> {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. +handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, + hibernate}. handle_cast({monitor, Type, WantsMonitor, Pid}, State = #state{monitors = Monitors}) -> @@ -205,8 +208,8 @@ handle_cast({demonitor, Pid, Options}, State end, hibernate}; -handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), +handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> + safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), {noreply, State, hibernate}. handle_info({'DOWN', Ref, process, Pid, Info}, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6b956818..85958400 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -396,7 +396,7 @@ cluster_status(WhichNodes) -> node_info() -> {erlang:system_info(otp_release), rabbit_misc:version(), - delegate_beam_hash(), cluster_status_from_mnesia()}. + cluster_status_from_mnesia()}. node_type() -> DiscNodes = cluster_nodes(disc), @@ -570,16 +570,16 @@ check_cluster_consistency(Node) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, _Hash, {error, _}} -> + {_OTP, _Rabbit, {error, _}} -> {error, not_found}; - {_OTP, Rabbit, _Status} -> - %% pre-2013/04 format implies version mismatch - version_error("Rabbit", rabbit_misc:version(), Rabbit); - {OTP, Rabbit, Hash, {ok, Status}} -> - case check_consistency(OTP, Rabbit, Hash, Node, Status) of + {OTP, Rabbit, {ok, Status}} -> + case check_consistency(OTP, Rabbit, Node, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} - end + end; + {_OTP, Rabbit, _Hash, _Status} -> + %% delegate hash checking implies version mismatch + version_error("Rabbit", rabbit_misc:version(), Rabbit) end. %%-------------------------------------------------------------------- @@ -743,17 +743,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit, Hash) -> +check_consistency(OTP, Rabbit) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), - check_rabbit_consistency(Rabbit), - check_beam_compatibility(Hash)]). + check_rabbit_consistency(Rabbit)]). -check_consistency(OTP, Rabbit, Hash, Node, Status) -> +check_consistency(OTP, Rabbit, Node, Status) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit), - check_beam_compatibility(Hash), check_nodes_consistency(Node, Status)]). check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> @@ -789,21 +787,6 @@ check_rabbit_consistency(Remote) -> rabbit_misc:version(), Remote, "Rabbit", fun rabbit_misc:version_minor_equivalent/2). -check_beam_compatibility(RemoteHash) -> - case RemoteHash == delegate_beam_hash() of - true -> ok; - false -> {error, {incompatible_bytecode, - "Incompatible Erlang bytecode found on nodes"}} - end. - -%% The delegate module sends functions across the cluster; if it is -%% out of sync (say due to mixed compilers), we will get badfun -%% exceptions when trying to do so. Let's detect that at startup. -delegate_beam_hash() -> - {delegate, Obj, _} = code:get_object_code(delegate), - {ok, {delegate, Hash}} = beam_lib:md5(Obj), - Hash. - %% This is fairly tricky. We want to know if the node is in the state %% that a `reset' would leave it in. We cannot simply check if the %% mnesia tables aren't there because restarted RAM nodes won't have @@ -829,12 +812,13 @@ find_good_node([]) -> none; find_good_node([Node | Nodes]) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); - {_OTP, _Rabbit, _} -> find_good_node(Nodes); - {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {badrpc, _Reason} -> find_good_node(Nodes); + %% old delegate hash check + {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> find_good_node(Nodes); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> |