diff options
author | Simon MacMullen <simon@lshift.net> | 2010-05-27 15:04:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-05-27 15:04:31 +0100 |
commit | a730ac41827344677507fa235298b9b996cd7327 (patch) | |
tree | 60e170cf0efc364a455b1541539540cdbf413d35 | |
parent | e7dc78714cda3805f65e73dd09886ba76bec6ae1 (diff) | |
download | rabbitmq-server-a730ac41827344677507fa235298b9b996cd7327.tar.gz |
We were only applying the local shortcut in the case when we were *only*
talking to a local node. This is wrong because:
1) We could then happen to pick a local delegate and a remote delegate that
were the same process, and deadlock.
2) There's still a possibility of messages overtaking if sometimes they go
via delegates locally and sometimes not.
So fix that to always avoid the delegates when communicating locally, even if
we're communicating remotely at the same time.
-rw-r--r-- | src/delegate.erl | 44 |
1 files changed, 24 insertions, 20 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 12eb814f..2f7bc299 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -63,7 +63,7 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun), + [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), case Res of {ok, Result, _} -> Result; @@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) -> invoke_per_node(split_delegate_per_node(Pids), Fun)). invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node([{node(Pid), [Pid]}], Fun), + invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), ok; invoke_no_result(Pids, Fun) when is_list(Pids) -> @@ -99,32 +99,36 @@ internal_cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). split_delegate_per_node(Pids) -> - orddict:to_list( - lists:foldl( - fun (Pid, D) -> - orddict:update(node(Pid), - fun (Pids1) -> [Pid | Pids1] end, - [Pid], D) - end, - orddict:new(), Pids)). + {Local, Remote} = + lists:foldl( + fun (Pid, {L, D}) -> + Node = node(Pid), + case node() of + Node -> {[Pid|L], D}; + _ -> {L, orddict:append(node(Pid), Pid, D)} + end + end, + {[], orddict:new()}, Pids), + {Local, orddict:to_list(Remote)}. -invoke_per_node([{Node, Pids}], Fun) when Node == node() -> - safe_invoke(Pids, Fun); invoke_per_node(NodePids, Fun) -> lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). -invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() -> - %% This is not actually async! However, in practice Fun will - %% always be something that does a gen_server:cast or similar, so - %% I don't think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - safe_invoke(Pids, Fun); invoke_no_result_per_node(NodePids, Fun) -> delegate_per_node(NodePids, Fun, fun internal_cast/2), ok. -delegate_per_node(NodePids, Fun, DelegateFun) -> +delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> + %% In the case where DelegateFun is internal_cast, the safe_invoke + %% is not actually async! However, in practice Fun will always be + %% something that does a gen_server:cast or similar, so I don't + %% think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. + [safe_invoke(LocalPids, Fun)| + delegate_per_remote_node(NodePids, Fun, DelegateFun)]. + +delegate_per_remote_node(NodePids, Fun, DelegateFun) -> Self = self(), %% Note that this is unsafe if the Fun requires reentrancy to the %% local_server. I.e. if self() == local_server(Node) then we'll |