summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-04-16 14:47:34 +0200
committerKarl Nilsson <kjnilsson@gmail.com>2021-05-12 17:12:09 +0100
commit733f5fb36752ab20481d789be8ebbf803b74d963 (patch)
tree5ea9d6d3cf868563b85d622e389c62dbdac4ddcc
parentf965cf8dde4c1a9df1a281799a12a1469411f38f (diff)
downloadrabbitmq-server-git-733f5fb36752ab20481d789be8ebbf803b74d963.tar.gz
Report stream coordinator unavailable as an amqp error
Uses code 506: resource_error
-rw-r--r--deps/rabbit/src/rabbit_channel.erl50
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl11
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl25
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl35
-rw-r--r--deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl3
5 files changed, 90 insertions, 34 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 065df1718b..fb6cad5189 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2123,27 +2123,35 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QRef = amqqueue:get_name(Q),
[QRef | Acc]
end, [], Qs),
- {ok, QueueStates, Actions} =
- rabbit_queue_type:deliver(Qs, Delivery, QueueStates0),
- %% NB: the order here is important since basic.returns must be
- %% sent before confirms.
- ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
- State1 = process_routing_confirm(Confirm, AllQueueNames,
- MsgSeqNo, XName, State0),
- %% Actions must be processed after registering confirms as actions may
- %% contain rejections of publishes
- State = handle_queue_actions(Actions,
- State1#ch{queue_states = QueueStates}),
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine ->
- ?INCR_STATS(exchange_stats, XName, 1, publish),
- [?INCR_STATS(queue_exchange_stats,
- {amqqueue:get_name(Q), XName}, 1, publish)
- || Q <- Qs];
- _ ->
- ok
- end,
- State.
+ try
+ {ok, QueueStates, Actions} =
+ rabbit_queue_type:deliver(Qs, Delivery, QueueStates0),
+ %% NB: the order here is important since basic.returns must be
+ %% sent before confirms.
+ ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
+ State1 = process_routing_confirm(Confirm, AllQueueNames,
+ MsgSeqNo, XName, State0),
+ %% Actions must be processed after registering confirms as actions may
+ %% contain rejections of publishes
+ State = handle_queue_actions(Actions,
+ State1#ch{queue_states = QueueStates}),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine ->
+ ?INCR_STATS(exchange_stats, XName, 1, publish),
+ [?INCR_STATS(queue_exchange_stats,
+ {amqqueue:get_name(Q), XName}, 1, publish)
+ || Q <- Qs];
+ _ ->
+ ok
+ end,
+ State
+ catch
+ exit:{coordinator_unavailable, Resource} ->
+ rabbit_misc:protocol_error(
+ resource_error,
+ "Stream coordinator unavailable for ~s",
+ [rabbit_misc:rs(Resource)])
+ end.
process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index c3b9d0c710..43f5e97a46 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -533,9 +533,14 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
%% not found and no initial state passed - initialize new state
Mod = amqqueue:get_type(Q),
Name = amqqueue:get_name(Q),
- #ctx{module = Mod,
- name = Name,
- state = Mod:init(Q)};
+ case Mod:init(Q) of
+ {error, Reason} ->
+ exit({Reason, Ref});
+ QState ->
+ #ctx{module = Mod,
+ name = Name,
+ state = QState}
+ end;
_ ->
%% not found - initialize with supplied initial state
Mod = amqqueue:get_type(Q),
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 85fc1539c8..b9f0bbb328 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -628,17 +628,24 @@ readers(QName) ->
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
#{name := StreamId} = amqqueue:get_type_state(Q),
%% tell us about leader changes so we can fail over
- {ok, ok, _} = rabbit_stream_coordinator:register_listener(Q),
- Prefix = erlang:pid_to_list(self()) ++ "_",
- WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
- {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
- #stream_client{stream_id = StreamId,
- name = amqqueue:get_name(Q),
- leader = Leader,
- writer_id = WriterId,
- soft_limit = SoftLimit}.
+ case rabbit_stream_coordinator:register_listener(Q) of
+ {ok, ok, _} ->
+ Prefix = erlang:pid_to_list(self()) ++ "_",
+ WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
+ {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
+ #stream_client{stream_id = StreamId,
+ name = amqqueue:get_name(Q),
+ leader = Leader,
+ writer_id = WriterId,
+ soft_limit = SoftLimit};
+ {error, coordinator_unavailable} = E ->
+ rabbit_log:warning("Failed to start stream queue ~p: coordinator unavailable",
+ [rabbit_misc:rs(QName)]),
+ E
+ end.
close(#stream_client{readers = Readers}) ->
_ = maps:map(fun (_, #stream{log = Log}) ->
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 806a5706a8..4cd0110b9d 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -51,7 +51,8 @@ groups() ->
replica_recovery,
leader_failover,
leader_failover_dedupe,
- add_replicas]},
+ add_replicas,
+ publish_coordinator_unavailable]},
{cluster_size_3_parallel, [parallel], [delete_replica,
delete_last_replica,
delete_classic_replica,
@@ -573,6 +574,38 @@ delete_down_replica(Config) ->
end),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+publish_coordinator_unavailable(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ check_leader_and_replicas(Config, [Server0, Server1, Server2]),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ N = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, cluster_nodes, [running]),
+ length(N) == 1
+ end),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 506, _}}}, _},
+ amqp_channel:wait_for_confirms(Ch, 60)),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ Info = find_queue_info(Config, 0, [online]),
+ length(proplists:get_value(online, Info)) == 3
+ end),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ publish(Ch1, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+
publish(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl
index d72aade7f0..b5fbf44e8a 100644
--- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl
@@ -93,6 +93,9 @@ bad({shutdown, {server_initiated_close, Code, Reason}}, ReqData, Context) ->
rabbit_mgmt_util:bad_request_exception(Code, Reason, ReqData, Context);
bad(rejected, ReqData, Context) ->
Msg = "Unable to publish message. Check queue limits.",
+ rabbit_mgmt_util:bad_request_exception(rejected, Msg, ReqData, Context);
+bad({{coordinator_unavailable, _}, _}, ReqData, Context) ->
+ Msg = "Unable to publish message. Coordinator unavailable.",
rabbit_mgmt_util:bad_request_exception(rejected, Msg, ReqData, Context).
is_authorized(ReqData, Context) ->