diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-07-27 14:29:31 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-07-27 14:29:31 +0100 |
commit | 4006546389f82cb24296089da6e3fd601bda339e (patch) | |
tree | 75bf9dbe5b246d0a2845b15be8a1df344c98ca20 | |
parent | d2a7370986d7cc62b775d7bb48067821cd3b63d8 (diff) | |
download | rabbitmq-server-4006546389f82cb24296089da6e3fd601bda339e.tar.gz |
Backport f35d1734d176 (Merge of bug25023; rabbit_amqqueue:notify_down_all doesn't handle node failure)
-rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 27 |
4 files changed, 28 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 12cb543f..c336aec3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -650,13 +650,18 @@ qpids(Qs) -> lists:append([[QPid | SPids] || #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). safe_delegate_call_ok(F, Pids) -> - case delegate:invoke(Pids, fun (Pid) -> - rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> F(Pid) end) - end) of - {_, []} -> ok; - {_, Bad} -> {error, Bad} + {_, Bads} = delegate:invoke(Pids, fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> ok end, + fun () -> F(Pid) end) + end), + case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) -> + rabbit_misc:is_abnormal_exit(R); + ({_Pid, _}) -> + false + end, Bads) of + [] -> ok; + Bads1 -> {error, Bads1} end. delegate_call(Pid, Msg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f2833c26..988fa2ad 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -770,7 +770,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> case pmon:is_monitored(QPid, QMons) of false -> noreply(State); - true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> case rabbit_misc:is_abnormal_exit(Reason) of true -> {Lost, _UC1} = dtree:take_all(QPid, UC), QNameS = rabbit_misc:rs(qname(State)), rabbit_log:warning("DLQ ~p for ~s died with " diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 22c6a223..73b461cd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1119,7 +1119,7 @@ monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, delivering_queues = sets:add_element(QPid, DQ)}. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> - case rabbit_misc:is_abnormal_termination(Reason) of + case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), send_nacks(MXs, State#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0aacd654..88b110af 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -29,8 +29,8 @@ -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). -export([confirm_to_sender/2]). --export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). --export([is_abnormal_termination/1]). +-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1, + filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([execute_mnesia_transaction/2]). @@ -62,6 +62,11 @@ -export([os_cmd/1]). -export([gb_sets_difference/2]). +%% Horrible macro to use in guards +-define(IS_BENIGN_EXIT(R), + R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; + R =:= shutdown). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -134,8 +139,8 @@ -spec(throw_on_error/2 :: (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(is_abnormal_exit/1 :: (any()) -> boolean()). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(is_abnormal_termination/1 :: (any()) -> boolean()). -spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) @@ -395,13 +400,14 @@ with_exit_handler(Handler, Thunk) -> try Thunk() catch - exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> - Handler(); - exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> - Handler() + exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler(); + exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler() end. +is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false; +is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false; +is_abnormal_exit(_) -> true. + filter_exit_map(F, L) -> Ref = make_ref(), lists:filter(fun (R) -> R =/= Ref end, @@ -409,11 +415,6 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). -is_abnormal_termination(Reason) - when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> false; -is_abnormal_termination({shutdown, _}) -> false; -is_abnormal_termination(_) -> true. with_user(Username, Thunk) -> fun () -> |