diff options
author | Simon MacMullen <simon@lshift.net> | 2010-05-05 10:32:13 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-05-05 10:32:13 +0100 |
commit | b13fede8cbcab7290d0f07c813c62c1281c56c5a (patch) | |
tree | 6d9bafe1543f149e5344f70d7bf2bef9ce6d5532 | |
parent | 33a66243733fbf36ec32d48d9f290aece290579b (diff) | |
download | rabbitmq-server-b13fede8cbcab7290d0f07c813c62c1281c56c5a.tar.gz |
Retain all exception information if thrown. Use that to rethrow in the single pid case, and mimic gen_server:multi_call in the multiple pid case.
-rw-r--r-- | src/delegate.erl | 36 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_router.erl | 9 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 39 |
4 files changed, 63 insertions, 47 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 104315e0..a264d60c 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -62,11 +62,24 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> - [{Status, Res, _}] = invoke_per_node([{node(Pid), [Pid]}], Fun), - {Status, Res}; + [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun), + case Res of + {ok, Result, _} -> + Result; + {error, {Class, Reason, StackTrace}, _} -> + erlang:raise(Class, Reason, StackTrace) + end; invoke(Pids, Fun) when is_list(Pids) -> - invoke_per_node(split_delegate_per_node(Pids), Fun). + lists:foldl( + fun({Status, Result, Pid}, {Good, Bad}) -> + case Status of + ok -> {[{Pid, Result}|Good], Bad}; + error -> {Good, [{Pid, Result}|Bad]} + end + end, + {[], []}, + invoke_per_node(split_delegate_per_node(Pids), Fun)). invoke_no_result(Pid, Fun) when is_pid(Pid) -> invoke_no_result_per_node([{node(Pid), [Pid]}], Fun), @@ -156,12 +169,11 @@ server(Hash) -> list_to_atom("delegate_process_" ++ integer_to_list(Hash)). safe_invoke(Fun, Pid) -> - %% We need the catch here for the local case. In the remote case - %% there will already have been a catch in handle_ca{ll,st} below, - %% but that's OK, catch is idempotent. - case catch Fun(Pid) of - {'EXIT', Reason} -> {error, {'EXIT', Reason}, Pid}; - Result -> {ok, Result, Pid} + try + {ok, Fun(Pid), Pid} + catch + Class:Reason -> + {error, {Class, Reason, erlang:get_stacktrace()}, Pid} end. process_count() -> @@ -173,11 +185,13 @@ init([]) -> {ok, no_state, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +%% We don't need a catch here; we always go via safe_invoke. A catch here would +%% be the wrong thing anyway since the Thunk can throw multiple errors. handle_call({thunk, Thunk}, _From, State) -> - {reply, catch Thunk(), State, hibernate}. + {reply, Thunk(), State, hibernate}. handle_cast({thunk, Thunk}, State) -> - catch Thunk(), + Thunk(), {noreply, State, hibernate}. handle_info(_Info, State) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b5592c0..4a9cf9b4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -371,31 +371,27 @@ pseudo_queue(QueueName, Pid) -> pid = Pid}. safe_delegate_call_ok(H, F, Pids) -> - case [R || R = {error, _, _} <- delegate:invoke( - Pids, - fun (Pid) -> - rabbit_misc:with_exit_handler( - fun () -> H(Pid) end, - fun () -> F(Pid) end) - end)] of + {_, Bad} = delegate:invoke(Pids, + fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> H(Pid) end, + fun () -> F(Pid) end) + end), + case Bad of [] -> ok; Errors -> {error, Errors} end. delegate_call(Pid, Msg, Timeout) -> - {_Status, Res} = - delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), - Res. + delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - {_Status, Res} = - delegate:invoke(Pid, - fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), - Res. + delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_cast(Pid, Msg) -> delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end). delegate_pcast(Pid, Pri, Msg) -> - delegate:invoke_no_result(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + delegate:invoke_no_result(Pid, + fun(P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 7bd916b3..747c9de1 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -61,9 +61,9 @@ deliver(QPids, Delivery = #delivery{mandatory = false, {routed, QPids}; deliver(QPids, Delivery) -> - Res = delegate:invoke( + {Success, _} = delegate:invoke( QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Res), + {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). @@ -109,9 +109,8 @@ lookup_qpids(Queues) -> %%-------------------------------------------------------------------- -fold_deliveries({ok, true, Pid},{_, Handled}) -> {true, [Pid|Handled]}; -fold_deliveries({ok, false, _ },{_, Handled}) -> {true, Handled}; -fold_deliveries({error, _ , _ },{Routed, Handled}) -> {Routed, Handled}. +fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; +fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {unroutable, []}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 374ea07d..5451e7d1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -879,40 +879,47 @@ await_response(Count) -> throw(timeout) end. +must_exit(Fun) -> + try + Fun(), + throw(exit_not_thrown) + catch + exit:_ -> ok + end. + test_delegates_sync(SecondaryNode) -> Sender = fun(Pid) -> gen_server:call(Pid, invoked) end, + BadSender = fun(_Pid) -> exit(exception) end, Responder = make_responder(fun({'$gen_call', From, invoked}) -> gen_server:reply(From, response) end), - BadResponder = make_responder(fun({'$gen_call', _From, invoked}) -> - throw(exception) - end), + response = delegate:invoke(spawn(Responder), Sender), + response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - {ok, response} = delegate:invoke(spawn(Responder), Sender), - {ok, response} = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - - {error, _} = delegate:invoke(spawn(BadResponder), Sender), - {error, _} = delegate:invoke(spawn(SecondaryNode, BadResponder), Sender), + must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end), + must_exit(fun() -> + delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), - LocalBadPids = spawn_responders(node(), BadResponder, 2), - RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), + LocalBadPids = spawn_responders(node(), Responder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2), - GoodRes = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), - BadRes = delegate:invoke(LocalBadPids ++ RemoteBadPids, Sender), + {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), - true = lists:all(fun ({ok, response, _}) -> true end, GoodRes), - true = lists:all(fun ({error, _, _}) -> true end, BadRes), + true = lists:all(fun ({_, response}) -> true end, GoodRes), - GoodResPids = [Pid || {_, _, Pid} <- GoodRes], - BadResPids = [Pid || {_, _, Pid} <- BadRes], + GoodResPids = [Pid || {Pid, _} <- GoodRes], Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), Good = ordsets:from_list(GoodResPids), + {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), + true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), + BadResPids = [Pid || {Pid, _} <- BadRes], + Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), Bad = ordsets:from_list(BadResPids), |