summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-04-29 16:49:10 +0200
committerKarl Nilsson <kjnilsson@gmail.com>2021-05-12 17:12:09 +0100
commitacddc0ea9d4595cede41f2835b1ac0a41980a6e6 (patch)
treebe7c2f23b5783172e254f3ac7b843a9f6da2cc98
parent733f5fb36752ab20481d789be8ebbf803b74d963 (diff)
downloadrabbitmq-server-git-acddc0ea9d4595cede41f2835b1ac0a41980a6e6.tar.gz
Catch deliver errors in rabbit_queue_type
-rw-r--r--deps/rabbit/src/rabbit_channel.erl46
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl14
2 files changed, 33 insertions, 27 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index fb6cad5189..916eee60cb 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2123,30 +2123,28 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QRef = amqqueue:get_name(Q),
[QRef | Acc]
end, [], Qs),
- try
- {ok, QueueStates, Actions} =
- rabbit_queue_type:deliver(Qs, Delivery, QueueStates0),
- %% NB: the order here is important since basic.returns must be
- %% sent before confirms.
- ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
- State1 = process_routing_confirm(Confirm, AllQueueNames,
- MsgSeqNo, XName, State0),
- %% Actions must be processed after registering confirms as actions may
- %% contain rejections of publishes
- State = handle_queue_actions(Actions,
- State1#ch{queue_states = QueueStates}),
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine ->
- ?INCR_STATS(exchange_stats, XName, 1, publish),
- [?INCR_STATS(queue_exchange_stats,
- {amqqueue:get_name(Q), XName}, 1, publish)
- || Q <- Qs];
- _ ->
- ok
- end,
- State
- catch
- exit:{coordinator_unavailable, Resource} ->
+ case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
+ {ok, QueueStates, Actions} ->
+ %% NB: the order here is important since basic.returns must be
+ %% sent before confirms.
+ ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
+ State1 = process_routing_confirm(Confirm, AllQueueNames,
+ MsgSeqNo, XName, State0),
+ %% Actions must be processed after registering confirms as actions may
+ %% contain rejections of publishes
+ State = handle_queue_actions(Actions,
+ State1#ch{queue_states = QueueStates}),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine ->
+ ?INCR_STATS(exchange_stats, XName, 1, publish),
+ [?INCR_STATS(queue_exchange_stats,
+ {amqqueue:get_name(Q), XName}, 1, publish)
+ || Q <- Qs];
+ _ ->
+ ok
+ end,
+ State;
+ {error, {coordinator_unavailable, Resource}} ->
rabbit_misc:protocol_error(
resource_error,
"Stream coordinator unavailable for ~s",
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 43f5e97a46..a532893be2 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -439,14 +439,22 @@ module(QRef, Ctxs) ->
-spec deliver([amqqueue:amqqueue()], Delivery :: term(),
stateless | state()) ->
- {ok, state(), actions()}.
-deliver(Qs, Delivery, stateless) ->
+ {ok, state(), actions()} | {error, Reason :: term()}.
+deliver(Qs, Delivery, State) ->
+ try
+ deliver0(Qs, Delivery, State)
+ catch
+ exit:Reason ->
+ {error, Reason}
+ end.
+
+deliver0(Qs, Delivery, stateless) ->
_ = lists:map(fun(Q) ->
Mod = amqqueue:get_type(Q),
_ = Mod:deliver([{Q, stateless}], Delivery)
end, Qs),
{ok, stateless, []};
-deliver(Qs, Delivery, #?STATE{} = State0) ->
+deliver0(Qs, Delivery, #?STATE{} = State0) ->
%% TODO: optimise single queue case?
%% sort by queue type - then dispatch each group
ByType = lists:foldl(