diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-04-16 14:47:34 +0200 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-05-12 17:12:09 +0100 |
commit | 733f5fb36752ab20481d789be8ebbf803b74d963 (patch) | |
tree | 5ea9d6d3cf868563b85d622e389c62dbdac4ddcc | |
parent | f965cf8dde4c1a9df1a281799a12a1469411f38f (diff) | |
download | rabbitmq-server-git-733f5fb36752ab20481d789be8ebbf803b74d963.tar.gz |
Report stream coordinator unavailable as an amqp error
Uses code 506: resource_error
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 50 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 11 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 25 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 35 | ||||
-rw-r--r-- | deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl | 3 |
5 files changed, 90 insertions, 34 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 065df1718b..fb6cad5189 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2123,27 +2123,35 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QRef = amqqueue:get_name(Q), [QRef | Acc] end, [], Qs), - {ok, QueueStates, Actions} = - rabbit_queue_type:deliver(Qs, Delivery, QueueStates0), - %% NB: the order here is important since basic.returns must be - %% sent before confirms. - ok = process_routing_mandatory(Mandatory, Qs, Message, State0), - State1 = process_routing_confirm(Confirm, AllQueueNames, - MsgSeqNo, XName, State0), - %% Actions must be processed after registering confirms as actions may - %% contain rejections of publishes - State = handle_queue_actions(Actions, - State1#ch{queue_states = QueueStates}), - case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> - ?INCR_STATS(exchange_stats, XName, 1, publish), - [?INCR_STATS(queue_exchange_stats, - {amqqueue:get_name(Q), XName}, 1, publish) - || Q <- Qs]; - _ -> - ok - end, - State. + try + {ok, QueueStates, Actions} = + rabbit_queue_type:deliver(Qs, Delivery, QueueStates0), + %% NB: the order here is important since basic.returns must be + %% sent before confirms. + ok = process_routing_mandatory(Mandatory, Qs, Message, State0), + State1 = process_routing_confirm(Confirm, AllQueueNames, + MsgSeqNo, XName, State0), + %% Actions must be processed after registering confirms as actions may + %% contain rejections of publishes + State = handle_queue_actions(Actions, + State1#ch{queue_states = QueueStates}), + case rabbit_event:stats_level(State, #ch.stats_timer) of + fine -> + ?INCR_STATS(exchange_stats, XName, 1, publish), + [?INCR_STATS(queue_exchange_stats, + {amqqueue:get_name(Q), XName}, 1, publish) + || Q <- Qs]; + _ -> + ok + end, + State + catch + exit:{coordinator_unavailable, Resource} -> + rabbit_misc:protocol_error( + resource_error, + "Stream coordinator unavailable for ~s", + [rabbit_misc:rs(Resource)]) + end. process_routing_mandatory(_Mandatory = true, _RoutedToQs = [], diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index c3b9d0c710..43f5e97a46 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -533,9 +533,14 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) %% not found and no initial state passed - initialize new state Mod = amqqueue:get_type(Q), Name = amqqueue:get_name(Q), - #ctx{module = Mod, - name = Name, - state = Mod:init(Q)}; + case Mod:init(Q) of + {error, Reason} -> + exit({Reason, Ref}); + QState -> + #ctx{module = Mod, + name = Name, + state = QState} + end; _ -> %% not found - initialize with supplied initial state Mod = amqqueue:get_type(Q), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 85fc1539c8..b9f0bbb328 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -628,17 +628,24 @@ readers(QName) -> init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), #{name := StreamId} = amqqueue:get_type_state(Q), %% tell us about leader changes so we can fail over - {ok, ok, _} = rabbit_stream_coordinator:register_listener(Q), - Prefix = erlang:pid_to_list(self()) ++ "_", - WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix), - {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit), - #stream_client{stream_id = StreamId, - name = amqqueue:get_name(Q), - leader = Leader, - writer_id = WriterId, - soft_limit = SoftLimit}. + case rabbit_stream_coordinator:register_listener(Q) of + {ok, ok, _} -> + Prefix = erlang:pid_to_list(self()) ++ "_", + WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix), + {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit), + #stream_client{stream_id = StreamId, + name = amqqueue:get_name(Q), + leader = Leader, + writer_id = WriterId, + soft_limit = SoftLimit}; + {error, coordinator_unavailable} = E -> + rabbit_log:warning("Failed to start stream queue ~p: coordinator unavailable", + [rabbit_misc:rs(QName)]), + E + end. close(#stream_client{readers = Readers}) -> _ = maps:map(fun (_, #stream{log = Log}) -> diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 806a5706a8..4cd0110b9d 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -51,7 +51,8 @@ groups() -> replica_recovery, leader_failover, leader_failover_dedupe, - add_replicas]}, + add_replicas, + publish_coordinator_unavailable]}, {cluster_size_3_parallel, [parallel], [delete_replica, delete_last_replica, delete_classic_replica, @@ -573,6 +574,38 @@ delete_down_replica(Config) -> end), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +publish_coordinator_unavailable(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + check_leader_and_replicas(Config, [Server0, Server1, Server2]), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + rabbit_ct_helpers:await_condition( + fun () -> + N = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, cluster_nodes, [running]), + length(N) == 1 + end), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 506, _}}}, _}, + amqp_channel:wait_for_confirms(Ch, 60)), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + rabbit_ct_helpers:await_condition( + fun () -> + Info = find_queue_info(Config, 0, [online]), + length(proplists:get_value(online, Info)) == 3 + end), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0), + publish(Ch1, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + publish(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl index d72aade7f0..b5fbf44e8a 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_exchange_publish.erl @@ -93,6 +93,9 @@ bad({shutdown, {server_initiated_close, Code, Reason}}, ReqData, Context) -> rabbit_mgmt_util:bad_request_exception(Code, Reason, ReqData, Context); bad(rejected, ReqData, Context) -> Msg = "Unable to publish message. Check queue limits.", + rabbit_mgmt_util:bad_request_exception(rejected, Msg, ReqData, Context); +bad({{coordinator_unavailable, _}, _}, ReqData, Context) -> + Msg = "Unable to publish message. Coordinator unavailable.", rabbit_mgmt_util:bad_request_exception(rejected, Msg, ReqData, Context). is_authorized(ReqData, Context) -> |