summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-27 15:43:55 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-27 15:43:55 +0100
commitfe98ed220be0865aa791b1364a492993fd83ea94 (patch)
tree7ab2a0bbcc044d2298fb621115e0fe71730578a5
parentaa89b97aaa40a9a495e5979373f6f02537380d44 (diff)
parent20a7e435105443224c1f292838127ebd133ef7c7 (diff)
downloadrabbitmq-server-fe98ed220be0865aa791b1364a492993fd83ea94.tar.gz
Merging bug 22812 into default
-rw-r--r--src/delegate.erl45
-rw-r--r--src/rabbit_tests.erl17
2 files changed, 36 insertions, 26 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f..98353453 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -63,7 +63,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,32 +99,37 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7afa7316..7ae37d73 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -855,10 +855,11 @@ test_delegates_async(SecondaryNode) ->
passed.
-make_responder(FMsg) ->
+make_responder(FMsg) -> make_responder(FMsg, timeout).
+make_responder(FMsg, Throw) ->
fun() ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(timeout)
+ after 1000 -> throw(Throw)
end
end.
@@ -892,17 +893,21 @@ test_delegates_sync(SecondaryNode) ->
gen_server:reply(From, response)
end),
+ BadResponder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end, bad_responder_died),
+
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
+ must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun() ->
- delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
- LocalBadPids = spawn_responders(node(), Responder, 2),
- RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
{GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
true = lists:all(fun ({_, response}) -> true end, GoodRes),