diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-04-29 16:49:10 +0200 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-05-12 17:12:09 +0100 |
commit | acddc0ea9d4595cede41f2835b1ac0a41980a6e6 (patch) | |
tree | be7c2f23b5783172e254f3ac7b843a9f6da2cc98 | |
parent | 733f5fb36752ab20481d789be8ebbf803b74d963 (diff) | |
download | rabbitmq-server-git-acddc0ea9d4595cede41f2835b1ac0a41980a6e6.tar.gz |
Catch deliver errors in rabbit_queue_type
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 46 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 14 |
2 files changed, 33 insertions, 27 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index fb6cad5189..916eee60cb 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2123,30 +2123,28 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QRef = amqqueue:get_name(Q), [QRef | Acc] end, [], Qs), - try - {ok, QueueStates, Actions} = - rabbit_queue_type:deliver(Qs, Delivery, QueueStates0), - %% NB: the order here is important since basic.returns must be - %% sent before confirms. - ok = process_routing_mandatory(Mandatory, Qs, Message, State0), - State1 = process_routing_confirm(Confirm, AllQueueNames, - MsgSeqNo, XName, State0), - %% Actions must be processed after registering confirms as actions may - %% contain rejections of publishes - State = handle_queue_actions(Actions, - State1#ch{queue_states = QueueStates}), - case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> - ?INCR_STATS(exchange_stats, XName, 1, publish), - [?INCR_STATS(queue_exchange_stats, - {amqqueue:get_name(Q), XName}, 1, publish) - || Q <- Qs]; - _ -> - ok - end, - State - catch - exit:{coordinator_unavailable, Resource} -> + case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of + {ok, QueueStates, Actions} -> + %% NB: the order here is important since basic.returns must be + %% sent before confirms. + ok = process_routing_mandatory(Mandatory, Qs, Message, State0), + State1 = process_routing_confirm(Confirm, AllQueueNames, + MsgSeqNo, XName, State0), + %% Actions must be processed after registering confirms as actions may + %% contain rejections of publishes + State = handle_queue_actions(Actions, + State1#ch{queue_states = QueueStates}), + case rabbit_event:stats_level(State, #ch.stats_timer) of + fine -> + ?INCR_STATS(exchange_stats, XName, 1, publish), + [?INCR_STATS(queue_exchange_stats, + {amqqueue:get_name(Q), XName}, 1, publish) + || Q <- Qs]; + _ -> + ok + end, + State; + {error, {coordinator_unavailable, Resource}} -> rabbit_misc:protocol_error( resource_error, "Stream coordinator unavailable for ~s", diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 43f5e97a46..a532893be2 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -439,14 +439,22 @@ module(QRef, Ctxs) -> -spec deliver([amqqueue:amqqueue()], Delivery :: term(), stateless | state()) -> - {ok, state(), actions()}. -deliver(Qs, Delivery, stateless) -> + {ok, state(), actions()} | {error, Reason :: term()}. +deliver(Qs, Delivery, State) -> + try + deliver0(Qs, Delivery, State) + catch + exit:Reason -> + {error, Reason} + end. + +deliver0(Qs, Delivery, stateless) -> _ = lists:map(fun(Q) -> Mod = amqqueue:get_type(Q), _ = Mod:deliver([{Q, stateless}], Delivery) end, Qs), {ok, stateless, []}; -deliver(Qs, Delivery, #?STATE{} = State0) -> +deliver0(Qs, Delivery, #?STATE{} = State0) -> %% TODO: optimise single queue case? %% sort by queue type - then dispatch each group ByType = lists:foldl( |