summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-01-05 10:19:34 +0000
committerRob Harrop <rob@rabbitmq.com>2011-01-05 10:19:34 +0000
commit45808e83b3a9414ebed30a3d0ebc0eb185143e72 (patch)
treeb951425bcb28c2a09045b7ffe14a99eeee7a8110
parentf7552846659f8a571242a2d1acd1040a10ccf815 (diff)
parent02396f8d21a641b4640657e1203b7c2a343a1c73 (diff)
downloadrabbitmq-server-45808e83b3a9414ebed30a3d0ebc0eb185143e72.tar.gz
Merge with default
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml2
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--src/delegate.erl217
-rw-r--r--src/delegate_sup.erl13
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_misc.erl18
-rw-r--r--src/rabbit_mnesia.erl24
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_networking.erl3
-rw-r--r--src/rabbit_tests.erl35
11 files changed, 152 insertions, 169 deletions
diff --git a/Makefile b/Makefile
index e0d5744c..00bfd629 100644
--- a/Makefile
+++ b/Makefile
@@ -170,7 +170,7 @@ start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
- ./scripts/rabbitmq-server ; sleep 1
+ ./scripts/rabbitmq-server; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 2419a54b..01ddd4c1 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1081,7 +1081,7 @@
<para role="example-prefix">
For example:
</para>
- <screen role="example">rabbitmqctl list_connections send_pend server_port</screen>
+ <screen role="example">rabbitmqctl list_connections send_pend port</screen>
<para role="example">
This command displays the send queue size and server port for each
connection.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 6fb7bbc5..5ed872b6 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -32,4 +32,5 @@
{server_properties, []},
{collect_statistics, none},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
- {auth_backends, [rabbit_auth_backend_internal]} ]} ]}.
+ {auth_backends, [rabbit_auth_backend_internal]},
+ {delegate_count, 16}]}]}.
diff --git a/src/delegate.erl b/src/delegate.erl
index 8e64f3d0..10054e57 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -31,19 +31,9 @@
-module(delegate).
-%% The reason we have local delegate processes is because we want to
-%% be able to issue calls to remote nodes in parallel. This requires
-%% segmenting the destination Pids by node, and then getting local
-%% delegates to issue calls/casts to the remote delegates in
-%% parallel. In order to ensure consistent ordering, even casts to
-%% remote Pids have to go through the local delegates rather than be
-%% sent directly.
-
--define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
-
-behaviour(gen_server2).
--export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -52,13 +42,16 @@
-ifdef(use_specs).
--spec(start_link/2 ::
- (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
+-spec(invoke/2 ::
+ ( pid(), fun ((pid()) -> A)) -> A;
+ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
--spec(process_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/0 :: () -> non_neg_integer()).
-endif.
@@ -69,157 +62,113 @@
%%----------------------------------------------------------------------------
-start_link(Prefix, Hash) ->
- gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []).
+start_link(Num) ->
+ gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ Fun(Pid);
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
- case Res of
- {ok, Result, _} ->
+ case invoke([Pid], Fun) of
+ {[{Pid, Result}], []} ->
Result;
- {error, {Class, Reason, StackTrace}, _} ->
+ {[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
invoke(Pids, Fun) when is_list(Pids) ->
- lists:foldl(
- fun ({Status, Result, Pid}, {Good, Bad}) ->
- case Status of
- ok -> {[{Pid, Result}|Good], Bad};
- error -> {Good, [{Pid, Result}|Bad]}
- end
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ %% The use of multi_call is only safe because the timeout is
+ %% infinity, and thus there is no process spawned in order to do
+ %% the sending. Thus calls can't overtake preceding calls/casts.
+ {Replies, BadNodes} =
+ case orddict:fetch_keys(Grouped) of
+ [] -> {[], []};
+ RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped},
+ infinity)
end,
- {[], []},
- invoke_per_node(split_delegate_per_node(Pids), Fun)).
+ BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
+ BadNode <- BadNodes,
+ Pid <- orddict:fetch(BadNode, Grouped)],
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ [Results || {_Node, Results} <- Replies]]),
+ lists:foldl(
+ fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
+ ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
+ end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
+invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, Fun), %% we don't care about any error
ok;
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result([Pid], Fun);
invoke_no_result(Pids, Fun) when is_list(Pids) ->
- invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ case orddict:fetch_keys(Grouped) of
+ [] -> ok;
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped})
+ end,
+ safe_invoke(LocalPids, Fun), %% must not die
ok.
%%----------------------------------------------------------------------------
-internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
-
-internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
-
-split_delegate_per_node(Pids) ->
+group_pids_by_node(Pids) ->
LocalNode = node(),
- {Local, Remote} =
- lists:foldl(
- fun (Pid, {L, D}) ->
- Node = node(Pid),
- case Node of
- LocalNode -> {[Pid|L], D};
- _ -> {L, orddict:append(Node, Pid, D)}
- end
- end,
- {[], orddict:new()}, Pids),
- {Local, orddict:to_list(Remote)}.
-
-invoke_per_node(NodePids, Fun) ->
- lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-
-invoke_no_result_per_node(NodePids, Fun) ->
- delegate_per_node(NodePids, Fun, fun internal_cast/2),
- ok.
-
-delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
- %% In the case where DelegateFun is internal_cast, the safe_invoke
- %% is not actually async! However, in practice Fun will always be
- %% something that does a gen_server:cast or similar, so I don't
- %% think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- [safe_invoke(LocalPids, Fun)|
- delegate_per_remote_node(NodePids, Fun, DelegateFun)].
-
-delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
- Self = self(),
- %% Note that this is unsafe if the Fun requires reentrancy to the
- %% local_server. I.e. if self() == local_server(Node) then we'll
- %% block forever.
- [gen_server2:cast(
- local_server(Node),
- {thunk, fun () ->
- Self ! {result,
- DelegateFun(
- Node, fun () -> safe_invoke(Pids, Fun) end)}
- end}) || {Node, Pids} <- NodePids],
- [receive {result, Result} -> Result end || _ <- NodePids].
-
-local_server(Node) ->
- case get({delegate_local_server_name, Node}) of
- undefined ->
- Name = server(outgoing,
- erlang:phash2({self(), Node}, process_count())),
- put({delegate_local_server_name, Node}, Name),
- Name;
- Name -> Name
- end.
-
-remote_server(Node) ->
- case get({delegate_remote_server_name, Node}) of
- undefined ->
- case rpc:call(Node, delegate, process_count, []) of
- {badrpc, _} ->
- %% Have to return something, if we're just casting
- %% then we don't want to blow up
- server(incoming, 1);
- Count ->
- Name = server(incoming,
- erlang:phash2({self(), Node}, Count)),
- put({delegate_remote_server_name, Node}, Name),
- Name
- end;
- Name -> Name
+ lists:foldl(
+ fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
+ {[Pid | Local], Remote};
+ (Pid, {Local, Remote}) ->
+ {Local,
+ orddict:update(
+ node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
+ end, {[], orddict:new()}, Pids).
+
+delegate_count() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ Count.
+
+delegate_name(Hash) ->
+ list_to_atom("delegate_" ++ integer_to_list(Hash)).
+
+delegate() ->
+ case get(delegate) of
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(), delegate_count())),
+ put(delegate, Name),
+ Name;
+ Name -> Name
end.
-server(Prefix, Hash) ->
- list_to_atom("delegate_" ++
- atom_to_list(Prefix) ++ "_" ++
- integer_to_list(Hash)).
-
safe_invoke(Pids, Fun) when is_list(Pids) ->
[safe_invoke(Pid, Fun) || Pid <- Pids];
safe_invoke(Pid, Fun) when is_pid(Pid) ->
try
- {ok, Fun(Pid), Pid}
- catch
- Class:Reason ->
- {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ {ok, Pid, Fun(Pid)}
+ catch Class:Reason ->
+ {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
-process_count() ->
- ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
-
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, no_state, hibernate,
+ {ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-%% We don't need a catch here; we always go via safe_invoke. A catch here would
-%% be the wrong thing anyway since the Thunk can throw multiple errors.
-handle_call({thunk, Thunk}, _From, State) ->
- {reply, Thunk(), State, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_cast({thunk, Thunk}, State) ->
- Thunk(),
- {noreply, State, hibernate}.
+handle_cast({invoke, Fun, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), Fun),
+ {noreply, Node, hibernate}.
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
+handle_info(_Info, Node) ->
+ {noreply, Node, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
+code_change(_OldVsn, Node, _Extra) ->
+ {ok, Node}.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 544546f1..d2af72af 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -55,11 +55,8 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}.
-
-specs(Prefix) ->
- [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Hash <- lists:seq(0, delegate:process_count() - 1)].
-
-%%----------------------------------------------------------------------------
+ DCount = delegate:delegate_count(),
+ {ok, {{one_for_one, 10, 10},
+ [{Num, {delegate, start_link, [Num]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Num <- lists:seq(0, DCount - 1)]}}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e3e89211..35ed1c94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -516,4 +516,4 @@ delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 96bcef91..15ba787a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -343,8 +343,8 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch
- exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown ->
+ catch exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
Handler()
end.
@@ -577,19 +577,19 @@ sort_field_table(Arguments) ->
pid_to_string(Pid) when is_pid(Pid) ->
%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
%% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+ lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
%% inverse of above
string_to_pid(Str) ->
Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
- case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
+ case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
- {match, [NodeStr, IdStr, SerStr]} ->
+ {match, [NodeStr, CreStr, IdStr, SerStr]} ->
%% the NodeStr atom might be quoted, so we have to parse
%% it rather than doing a simple list_to_atom
NodeAtom = case erl_scan:string(NodeStr) of
@@ -597,9 +597,9 @@ string_to_pid(Str) ->
{error, _, _} -> throw(Err)
end,
<<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
- Id = list_to_integer(IdStr),
- Ser = list_to_integer(SerStr),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ [Cre, Id, Ser] = lists:map(fun list_to_integer/1,
+ [CreStr, IdStr, SerStr]),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
nomatch ->
throw(Err)
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 1f2d4d7d..38cc82a6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -34,7 +34,8 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
- is_clustered/0, empty_ram_only_tables/0, copy_db/1]).
+ is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
+ empty_ram_only_tables/0, copy_db/1]).
-export([table_names/0]).
@@ -63,6 +64,8 @@
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(all_clustered_nodes/0 :: () -> [node()]).
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -81,12 +84,12 @@ status() ->
Nodes = nodes_of_type(CopyType),
Nodes =/= []
end];
- no -> case mnesia:system_info(db_nodes) of
+ no -> case all_clustered_nodes() of
[] -> [];
Nodes -> [{unknown, Nodes}]
end
end},
- {running_nodes, mnesia:system_info(running_db_nodes)}].
+ {running_nodes, running_clustered_nodes()}].
init() ->
ok = ensure_mnesia_running(),
@@ -127,9 +130,15 @@ reset() -> reset(false).
force_reset() -> reset(true).
is_clustered() ->
- RunningNodes = mnesia:system_info(running_db_nodes),
+ RunningNodes = running_clustered_nodes(),
[node()] /= RunningNodes andalso [] /= RunningNodes.
+all_clustered_nodes() ->
+ mnesia:system_info(db_nodes).
+
+running_clustered_nodes() ->
+ mnesia:system_info(running_db_nodes).
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -372,8 +381,7 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir),
- mnesia:system_info(db_nodes)} of
+ case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
{[], true, [_]} ->
%% True single disc node, attempt upgrade
ok = wait_for_tables(),
@@ -566,8 +574,8 @@ reset(Force) ->
{Nodes, RunningNodes} =
try
ok = init(),
- {mnesia:system_info(db_nodes) -- [Node],
- mnesia:system_info(running_db_nodes) -- [Node]}
+ {all_clustered_nodes() -- [Node],
+ running_clustered_nodes() -- [Node]}
after
mnesia:stop()
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e8b4e8e2..2e1834c7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -701,7 +701,7 @@ handle_cast({write, CRef, Guid},
{ok, _} -> dict:update(CRef, fun(Guids) ->
gb_sets:add(Guid, Guids)
end,
- gb_sets:empty(), CTG);
+ gb_sets:singleton(Guid), CTG);
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1c542ffe..d5a9d73c 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) ->
connections() ->
[rabbit_connection_sup:reader(ConnSup) ||
+ Node <- rabbit_mnesia:running_clustered_nodes(),
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children({rabbit_tcp_client_sup, Node})].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8d56784a..8ceb4410 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -96,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ %% we now run the tests remotely, so that code coverage on the
+ %% local node picks up more of the delegate
+ Node = node(),
+ Self = self(),
+ Remote = spawn(SecondaryNode,
+ fun () -> A = test_delegates_async(Node),
+ B = test_delegates_sync(Node),
+ Self ! {self(), {A, B}}
+ end),
+ receive
+ {Remote, Result} ->
+ Result = {passed, passed}
+ after 2000 ->
+ throw(timeout)
+ end,
+
passed.
test_priority_queue() ->
@@ -1254,15 +1270,26 @@ test_delegates_sync(SecondaryNode) ->
true = lists:all(fun ({_, response}) -> true end, GoodRes),
GoodResPids = [Pid || {Pid, _} <- GoodRes],
- Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
- Good = ordsets:from_list(GoodResPids),
+ Good = lists:usort(LocalGoodPids ++ RemoteGoodPids),
+ Good = lists:usort(GoodResPids),
{[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
BadResPids = [Pid || {Pid, _} <- BadRes],
- Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
- Bad = ordsets:from_list(BadResPids),
+ Bad = lists:usort(LocalBadPids ++ RemoteBadPids),
+ Bad = lists:usort(BadResPids),
+
+ MagicalPids = [rabbit_misc:string_to_pid(Str) ||
+ Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]],
+ {[], BadNodes} = delegate:invoke(MagicalPids, Sender),
+ true = lists:all(
+ fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end,
+ BadNodes),
+ BadNodesPids = [Pid || {Pid, _} <- BadNodes],
+
+ Magical = lists:usort(MagicalPids),
+ Magical = lists:usort(BadNodesPids),
passed.