summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-04-16 15:16:15 +0100
committerSimon MacMullen <simon@lshift.net>2010-04-16 15:16:15 +0100
commit2bfed7413f3678c46d7f01524f596d8098b2f3b5 (patch)
tree97cc23b58d0d20795a423b987cca301bf8716d1e
parent629d5a84d5b0586a18e9747c643fe7d435e62e06 (diff)
downloadrabbitmq-server-2bfed7413f3678c46d7f01524f596d8098b2f3b5.tar.gz
Reimplement the local optimistion, refactor quite a bit, fix delegate to actually do fanout properly (oops).
-rw-r--r--src/delegate.erl64
-rw-r--r--src/rabbit_tests.erl3
2 files changed, 33 insertions, 34 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 76fd9d72..a7020d9b 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -68,34 +68,29 @@ gs2_pcast(Pid, Pri, Msg) ->
cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
-% TODO reimplement the single-node optimisation
-
-call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({server(), Node}, {thunk, Thunk}, infinity);
-
call(Pid, FPid) when is_pid(Pid) ->
- [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}],
- f_pid_node(fun call/2, FPid)),
+ [[{Status, Res, _}]] = call_per_node([{node(Pid), [Pid]}], FPid),
{Status, Res};
call(Pids, FPid) when is_list(Pids) ->
lists:flatten(
- delegate_per_node(split_delegate_per_node(Pids),
- f_pid_node(fun call/2, FPid))).
+ call_per_node(split_delegate_per_node(Pids), FPid)).
+
+internal_call(Node, Thunk) when is_atom(Node) ->
+ gen_server2:call({server(), Node}, {thunk, Thunk}, infinity).
-cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({server(), Node}, {thunk, Thunk});
cast(Pid, FPid) when is_pid(Pid) ->
- delegate_per_node([{node(Pid), [Pid]}],
- f_pid_node(fun cast/2, FPid)),
+ cast_per_node([{node(Pid), [Pid]}], FPid),
ok;
cast(Pids, FPid) when is_list(Pids) ->
- delegate_per_node(split_delegate_per_node(Pids),
- f_pid_node(fun cast/2, FPid)),
+ cast_per_node(split_delegate_per_node(Pids), FPid),
ok.
+internal_cast(Node, Thunk) when is_atom(Node) ->
+ gen_server2:cast({server(), Node}, {thunk, Thunk}).
+
%%----------------------------------------------------------------------------
split_delegate_per_node(Pids) ->
@@ -108,17 +103,22 @@ split_delegate_per_node(Pids) ->
end,
dict:new(), Pids)).
-f_pid_node(DelegateFun, FPid) ->
- fun(Pid, Node) ->
- DelegateFun(Node, fun() -> FPid(Pid) end)
- end.
+call_per_node([{Node, Pids}], FPid) when Node == node() ->
+ local_delegate(Pids, FPid);
+call_per_node(NodePids, FPid) ->
+ delegate_per_node(NodePids, FPid, fun internal_call/2).
+
+cast_per_node([{Node, Pids}], FPid) when Node == node() ->
+ local_delegate(Pids, FPid);
+cast_per_node(NodePids, FPid) ->
+ delegate_per_node(NodePids, FPid, fun internal_cast/2).
-delegate_per_node(NodePids, FPidNode) ->
- [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] ||
- {Node, Pids} <- NodePids].
+local_delegate(Pids, FPid) ->
+ [[safe_invoke(FPid, Pid) || Pid <- Pids]].
-add_pid({Status, Result}, Pid) -> {Status, Result, Pid};
-add_pid(Status, Pid) -> {Status, Pid}.
+delegate_per_node(NodePids, FPid, DelegateFun) ->
+ [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) ||
+ {Node, Pids} <- NodePids].
server() ->
server(erlang:phash(self(), ?DELEGATE_PROCESSES)).
@@ -126,19 +126,21 @@ server() ->
server(Hash) ->
list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))).
+safe_invoke(FPid, Pid) ->
+ case catch FPid(Pid) of
+ {'EXIT', Reason} ->
+ {error, {'EXIT', Reason}, Pid};
+ Result ->
+ {ok, Result, Pid}
+ end.
+
%%--------------------------------------------------------------------
init([]) ->
{ok, no_state}.
handle_call({thunk, Thunk}, _From, State) ->
- Res = case catch Thunk() of
- {'EXIT', Reason} ->
- {error, {'EXIT', Reason}};
- Result ->
- {ok, Result}
- end,
- {reply, Res, State}.
+ {reply, Thunk(), State}.
handle_cast({thunk, Thunk}, State) ->
catch Thunk(),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e5e57a68..5ed7d64c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -883,9 +883,6 @@ await_response(Count) ->
end.
test_delegates_sync(SecondaryNode) ->
- {ok, "foo"} = delegate:call(node(), fun() -> "foo" end),
- {ok, "bar"} = delegate:call(SecondaryNode, fun() -> "bar" end),
-
Sender = fun(Pid) ->
gen_server2:call(Pid, invoked)
end,