diff options
author | Simon MacMullen <simon@lshift.net> | 2010-04-16 15:16:15 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-04-16 15:16:15 +0100 |
commit | 2bfed7413f3678c46d7f01524f596d8098b2f3b5 (patch) | |
tree | 97cc23b58d0d20795a423b987cca301bf8716d1e | |
parent | 629d5a84d5b0586a18e9747c643fe7d435e62e06 (diff) | |
download | rabbitmq-server-2bfed7413f3678c46d7f01524f596d8098b2f3b5.tar.gz |
Reimplement the local optimistion, refactor quite a bit, fix delegate to actually do fanout properly (oops).
-rw-r--r-- | src/delegate.erl | 64 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 3 |
2 files changed, 33 insertions, 34 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 76fd9d72..a7020d9b 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -68,34 +68,29 @@ gs2_pcast(Pid, Pri, Msg) -> cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). -% TODO reimplement the single-node optimisation - -call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); - call(Pid, FPid) when is_pid(Pid) -> - [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun call/2, FPid)), + [[{Status, Res, _}]] = call_per_node([{node(Pid), [Pid]}], FPid), {Status, Res}; call(Pids, FPid) when is_list(Pids) -> lists:flatten( - delegate_per_node(split_delegate_per_node(Pids), - f_pid_node(fun call/2, FPid))). + call_per_node(split_delegate_per_node(Pids), FPid)). + +internal_call(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(), Node}, {thunk, Thunk}, infinity). -cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({server(), Node}, {thunk, Thunk}); cast(Pid, FPid) when is_pid(Pid) -> - delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun cast/2, FPid)), + cast_per_node([{node(Pid), [Pid]}], FPid), ok; cast(Pids, FPid) when is_list(Pids) -> - delegate_per_node(split_delegate_per_node(Pids), - f_pid_node(fun cast/2, FPid)), + cast_per_node(split_delegate_per_node(Pids), FPid), ok. +internal_cast(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(), Node}, {thunk, Thunk}). + %%---------------------------------------------------------------------------- split_delegate_per_node(Pids) -> @@ -108,17 +103,22 @@ split_delegate_per_node(Pids) -> end, dict:new(), Pids)). -f_pid_node(DelegateFun, FPid) -> - fun(Pid, Node) -> - DelegateFun(Node, fun() -> FPid(Pid) end) - end. +call_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +call_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_call/2). + +cast_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +cast_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_cast/2). -delegate_per_node(NodePids, FPidNode) -> - [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] || - {Node, Pids} <- NodePids]. +local_delegate(Pids, FPid) -> + [[safe_invoke(FPid, Pid) || Pid <- Pids]]. -add_pid({Status, Result}, Pid) -> {Status, Result, Pid}; -add_pid(Status, Pid) -> {Status, Pid}. +delegate_per_node(NodePids, FPid, DelegateFun) -> + [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) || + {Node, Pids} <- NodePids]. server() -> server(erlang:phash(self(), ?DELEGATE_PROCESSES)). @@ -126,19 +126,21 @@ server() -> server(Hash) -> list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). +safe_invoke(FPid, Pid) -> + case catch FPid(Pid) of + {'EXIT', Reason} -> + {error, {'EXIT', Reason}, Pid}; + Result -> + {ok, Result, Pid} + end. + %%-------------------------------------------------------------------- init([]) -> {ok, no_state}. handle_call({thunk, Thunk}, _From, State) -> - Res = case catch Thunk() of - {'EXIT', Reason} -> - {error, {'EXIT', Reason}}; - Result -> - {ok, Result} - end, - {reply, Res, State}. + {reply, Thunk(), State}. handle_cast({thunk, Thunk}, State) -> catch Thunk(), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5e57a68..5ed7d64c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -883,9 +883,6 @@ await_response(Count) -> end. test_delegates_sync(SecondaryNode) -> - {ok, "foo"} = delegate:call(node(), fun() -> "foo" end), - {ok, "bar"} = delegate:call(SecondaryNode, fun() -> "bar" end), - Sender = fun(Pid) -> gen_server2:call(Pid, invoked) end, |