summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-07-27 14:29:31 +0100
committerTim Watson <tim@rabbitmq.com>2012-07-27 14:29:31 +0100
commit4006546389f82cb24296089da6e3fd601bda339e (patch)
tree75bf9dbe5b246d0a2845b15be8a1df344c98ca20
parentd2a7370986d7cc62b775d7bb48067821cd3b63d8 (diff)
downloadrabbitmq-server-4006546389f82cb24296089da6e3fd601bda339e.tar.gz
Backport f35d1734d176 (Merge of bug25023; rabbit_amqqueue:notify_down_all doesn't handle node failure)
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_misc.erl27
4 files changed, 28 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 12cb543f..c336aec3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -650,13 +650,18 @@ qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
safe_delegate_call_ok(F, Pids) ->
- case delegate:invoke(Pids, fun (Pid) ->
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> F(Pid) end)
- end) of
- {_, []} -> ok;
- {_, Bad} -> {error, Bad}
+ {_, Bads} = delegate:invoke(Pids, fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () -> F(Pid) end)
+ end),
+ case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) ->
+ rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) ->
+ false
+ end, Bads) of
+ [] -> ok;
+ Bads1 -> {error, Bads1}
end.
delegate_call(Pid, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f2833c26..988fa2ad 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -770,7 +770,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
case pmon:is_monitored(QPid, QMons) of
false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> case rabbit_misc:is_abnormal_exit(Reason) of
true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
QNameS = rabbit_misc:rs(qname(State)),
rabbit_log:warning("DLQ ~p for ~s died with "
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 22c6a223..73b461cd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1119,7 +1119,7 @@ monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
delivering_queues = sets:add_element(QPid, DQ)}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
- case rabbit_misc:is_abnormal_termination(Reason) of
+ case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
send_nacks(MXs, State#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0aacd654..88b110af 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -29,8 +29,8 @@
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
-export([confirm_to_sender/2]).
--export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
--export([is_abnormal_termination/1]).
+-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
+ filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
@@ -62,6 +62,11 @@
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+%% Horrible macro to use in guards
+-define(IS_BENIGN_EXIT(R),
+ R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
+ R =:= shutdown).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -134,8 +139,8 @@
-spec(throw_on_error/2 ::
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(is_abnormal_exit/1 :: (any()) -> boolean()).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(is_abnormal_termination/1 :: (any()) -> boolean()).
-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
@@ -395,13 +400,14 @@ with_exit_handler(Handler, Thunk) ->
try
Thunk()
catch
- exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
- Handler();
- exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
- Handler()
+ exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler();
+ exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler()
end.
+is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false;
+is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false;
+is_abnormal_exit(_) -> true.
+
filter_exit_map(F, L) ->
Ref = make_ref(),
lists:filter(fun (R) -> R =/= Ref end,
@@ -409,11 +415,6 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
-is_abnormal_termination(Reason)
- when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown -> false;
-is_abnormal_termination({shutdown, _}) -> false;
-is_abnormal_termination(_) -> true.
with_user(Username, Thunk) ->
fun () ->