diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-27 15:43:55 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-27 15:43:55 +0100 |
commit | fe98ed220be0865aa791b1364a492993fd83ea94 (patch) | |
tree | 7ab2a0bbcc044d2298fb621115e0fe71730578a5 | |
parent | aa89b97aaa40a9a495e5979373f6f02537380d44 (diff) | |
parent | 20a7e435105443224c1f292838127ebd133ef7c7 (diff) | |
download | rabbitmq-server-fe98ed220be0865aa791b1364a492993fd83ea94.tar.gz |
Merging bug 22812 into default
-rw-r--r-- | src/delegate.erl | 45 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 17 |
2 files changed, 36 insertions, 26 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 12eb814f..98353453 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -63,7 +63,7 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun), + [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), case Res of {ok, Result, _} -> Result; @@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) -> invoke_per_node(split_delegate_per_node(Pids), Fun)). invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node([{node(Pid), [Pid]}], Fun), + invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), ok; invoke_no_result(Pids, Fun) when is_list(Pids) -> @@ -99,32 +99,37 @@ internal_cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). split_delegate_per_node(Pids) -> - orddict:to_list( - lists:foldl( - fun (Pid, D) -> - orddict:update(node(Pid), - fun (Pids1) -> [Pid | Pids1] end, - [Pid], D) - end, - orddict:new(), Pids)). + LocalNode = node(), + {Local, Remote} = + lists:foldl( + fun (Pid, {L, D}) -> + Node = node(Pid), + case Node of + LocalNode -> {[Pid|L], D}; + _ -> {L, orddict:append(Node, Pid, D)} + end + end, + {[], orddict:new()}, Pids), + {Local, orddict:to_list(Remote)}. -invoke_per_node([{Node, Pids}], Fun) when Node == node() -> - safe_invoke(Pids, Fun); invoke_per_node(NodePids, Fun) -> lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). -invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() -> - %% This is not actually async! However, in practice Fun will - %% always be something that does a gen_server:cast or similar, so - %% I don't think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - safe_invoke(Pids, Fun); invoke_no_result_per_node(NodePids, Fun) -> delegate_per_node(NodePids, Fun, fun internal_cast/2), ok. -delegate_per_node(NodePids, Fun, DelegateFun) -> +delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> + %% In the case where DelegateFun is internal_cast, the safe_invoke + %% is not actually async! However, in practice Fun will always be + %% something that does a gen_server:cast or similar, so I don't + %% think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. + [safe_invoke(LocalPids, Fun)| + delegate_per_remote_node(NodePids, Fun, DelegateFun)]. + +delegate_per_remote_node(NodePids, Fun, DelegateFun) -> Self = self(), %% Note that this is unsafe if the Fun requires reentrancy to the %% local_server. I.e. if self() == local_server(Node) then we'll diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7afa7316..7ae37d73 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -855,10 +855,11 @@ test_delegates_async(SecondaryNode) -> passed. -make_responder(FMsg) -> +make_responder(FMsg) -> make_responder(FMsg, timeout). +make_responder(FMsg, Throw) -> fun() -> receive Msg -> FMsg(Msg) - after 1000 -> throw(timeout) + after 1000 -> throw(Throw) end end. @@ -892,17 +893,21 @@ test_delegates_sync(SecondaryNode) -> gen_server:reply(From, response) end), + BadResponder = make_responder(fun({'$gen_call', From, invoked}) -> + gen_server:reply(From, response) + end, bad_responder_died), + response = delegate:invoke(spawn(Responder), Sender), response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end), + must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end), must_exit(fun() -> - delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end), + delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), - LocalBadPids = spawn_responders(node(), Responder, 2), - RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2), + LocalBadPids = spawn_responders(node(), BadResponder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), true = lists:all(fun ({_, response}) -> true end, GoodRes), |