summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-05 10:32:13 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-05 10:32:13 +0100
commitb13fede8cbcab7290d0f07c813c62c1281c56c5a (patch)
tree6d9bafe1543f149e5344f70d7bf2bef9ce6d5532
parent33a66243733fbf36ec32d48d9f290aece290579b (diff)
downloadrabbitmq-server-b13fede8cbcab7290d0f07c813c62c1281c56c5a.tar.gz
Retain all exception information if thrown. Use that to rethrow in the single pid case, and mimic gen_server:multi_call in the multiple pid case.
-rw-r--r--src/delegate.erl36
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_router.erl9
-rw-r--r--src/rabbit_tests.erl39
4 files changed, 63 insertions, 47 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 104315e0..a264d60c 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -62,11 +62,24 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [{Status, Res, _}] = invoke_per_node([{node(Pid), [Pid]}], Fun),
- {Status, Res};
+ [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ case Res of
+ {ok, Result, _} ->
+ Result;
+ {error, {Class, Reason, StackTrace}, _} ->
+ erlang:raise(Class, Reason, StackTrace)
+ end;
invoke(Pids, Fun) when is_list(Pids) ->
- invoke_per_node(split_delegate_per_node(Pids), Fun).
+ lists:foldl(
+ fun({Status, Result, Pid}, {Good, Bad}) ->
+ case Status of
+ ok -> {[{Pid, Result}|Good], Bad};
+ error -> {Good, [{Pid, Result}|Bad]}
+ end
+ end,
+ {[], []},
+ invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
@@ -156,12 +169,11 @@ server(Hash) ->
list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
safe_invoke(Fun, Pid) ->
- %% We need the catch here for the local case. In the remote case
- %% there will already have been a catch in handle_ca{ll,st} below,
- %% but that's OK, catch is idempotent.
- case catch Fun(Pid) of
- {'EXIT', Reason} -> {error, {'EXIT', Reason}, Pid};
- Result -> {ok, Result, Pid}
+ try
+ {ok, Fun(Pid), Pid}
+ catch
+ Class:Reason ->
+ {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
end.
process_count() ->
@@ -173,11 +185,13 @@ init([]) ->
{ok, no_state, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+%% We don't need a catch here; we always go via safe_invoke. A catch here would
+%% be the wrong thing anyway since the Thunk can throw multiple errors.
handle_call({thunk, Thunk}, _From, State) ->
- {reply, catch Thunk(), State, hibernate}.
+ {reply, Thunk(), State, hibernate}.
handle_cast({thunk, Thunk}, State) ->
- catch Thunk(),
+ Thunk(),
{noreply, State, hibernate}.
handle_info(_Info, State) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b5592c0..4a9cf9b4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -371,31 +371,27 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid}.
safe_delegate_call_ok(H, F, Pids) ->
- case [R || R = {error, _, _} <- delegate:invoke(
- Pids,
- fun (Pid) ->
- rabbit_misc:with_exit_handler(
- fun () -> H(Pid) end,
- fun () -> F(Pid) end)
- end)] of
+ {_, Bad} = delegate:invoke(Pids,
+ fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> H(Pid) end,
+ fun () -> F(Pid) end)
+ end),
+ case Bad of
[] -> ok;
Errors -> {error, Errors}
end.
delegate_call(Pid, Msg, Timeout) ->
- {_Status, Res} =
- delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end),
- Res.
+ delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- {_Status, Res} =
- delegate:invoke(Pid,
- fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end),
- Res.
+ delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
delegate_pcast(Pid, Pri, Msg) ->
- delegate:invoke_no_result(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+ delegate:invoke_no_result(Pid,
+ fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 7bd916b3..747c9de1 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -61,9 +61,9 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
{routed, QPids};
deliver(QPids, Delivery) ->
- Res = delegate:invoke(
+ {Success, _} = delegate:invoke(
QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Res),
+ {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
@@ -109,9 +109,8 @@ lookup_qpids(Queues) ->
%%--------------------------------------------------------------------
-fold_deliveries({ok, true, Pid},{_, Handled}) -> {true, [Pid|Handled]};
-fold_deliveries({ok, false, _ },{_, Handled}) -> {true, Handled};
-fold_deliveries({error, _ , _ },{Routed, Handled}) -> {Routed, Handled}.
+fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
+fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
check_delivery(true, _ , {false, []}) -> {unroutable, []};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 374ea07d..5451e7d1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -879,40 +879,47 @@ await_response(Count) ->
throw(timeout)
end.
+must_exit(Fun) ->
+ try
+ Fun(),
+ throw(exit_not_thrown)
+ catch
+ exit:_ -> ok
+ end.
+
test_delegates_sync(SecondaryNode) ->
Sender = fun(Pid) -> gen_server:call(Pid, invoked) end,
+ BadSender = fun(_Pid) -> exit(exception) end,
Responder = make_responder(fun({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end),
- BadResponder = make_responder(fun({'$gen_call', _From, invoked}) ->
- throw(exception)
- end),
+ response = delegate:invoke(spawn(Responder), Sender),
+ response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- {ok, response} = delegate:invoke(spawn(Responder), Sender),
- {ok, response} = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
-
- {error, _} = delegate:invoke(spawn(BadResponder), Sender),
- {error, _} = delegate:invoke(spawn(SecondaryNode, BadResponder), Sender),
+ must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
+ must_exit(fun() ->
+ delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
- LocalBadPids = spawn_responders(node(), BadResponder, 2),
- RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
+ LocalBadPids = spawn_responders(node(), Responder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
- GoodRes = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
- BadRes = delegate:invoke(LocalBadPids ++ RemoteBadPids, Sender),
+ {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
- true = lists:all(fun ({ok, response, _}) -> true end, GoodRes),
- true = lists:all(fun ({error, _, _}) -> true end, BadRes),
+ true = lists:all(fun ({_, response}) -> true end, GoodRes),
- GoodResPids = [Pid || {_, _, Pid} <- GoodRes],
- BadResPids = [Pid || {_, _, Pid} <- BadRes],
+ GoodResPids = [Pid || {Pid, _} <- GoodRes],
Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
Good = ordsets:from_list(GoodResPids),
+ {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
+ true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
+ BadResPids = [Pid || {Pid, _} <- BadRes],
+
Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
Bad = ordsets:from_list(BadResPids),