diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-07-10 14:25:17 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
commit | ce230316d4a5fb623b7a3567f35c3d0c033a6dbf (patch) | |
tree | d77d164ffb3b804d75fccfadead5ff3bfd5e778d | |
parent | a5469e186c2f2d49876da49b9bee98df64ea6c73 (diff) | |
download | rabbitmq-server-git-ce230316d4a5fb623b7a3567f35c3d0c033a6dbf.tar.gz |
test fixes
-rw-r--r-- | src/rabbit_fifo_client.erl | 9 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 13 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 3 |
3 files changed, 22 insertions, 3 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a698f93e9e..83207d7bf9 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -161,7 +161,10 @@ enqueue(Correlation, Msg, State0#state{queue_status = go}; Err -> exit(Err) - end + end; + {badrpc, nodedown} -> + rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), + State0#state{queue_status = go} end, enqueue(Correlation, Msg, State); enqueue(_Correlation, _Msg, @@ -685,8 +688,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> error -> State end. + resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), + rabbit_log:info("rabbit_fifo_client resend all pending ~w", [Seqs]), lists:foldl(fun resend/2, State, Seqs). handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, @@ -762,7 +767,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> end. pick_server(#state{leader = undefined, - cfg = #cfg{servers = [N | _]}}) -> + cfg = #cfg{servers = [N | _]}}) -> %% TODO: pick random rather that first? N; pick_server(#state{leader = Leader}) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index a8b2f1f673..7cc7b1d516 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -102,6 +102,7 @@ handle_event({ra_event, From, Evt}, QState) -> {new | existing, amqqueue:amqqueue()} | rabbit_types:channel_exit(). declare(Q) when ?amqqueue_is_quorum(Q) -> + rabbit_log:info("quorum_queue declaring ~w", [Q]), QName = amqqueue:get_name(Q), Durable = amqqueue:is_durable(Q), AutoDelete = amqqueue:is_auto_delete(Q), @@ -123,6 +124,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout) || ServerId <- members(NewQ)], + rabbit_log:info("quorum_queue starting cluster ~w", [RaConfs]), case ra:start_cluster(RaConfs) of {ok, _, _} -> %% TODO: handle error - what should be done if the @@ -1359,6 +1361,17 @@ check_invalid_arguments(QueueName, Args) -> "invalid arg '~s' for ~s", [Key, rabbit_misc:rs(QueueName)]) end || Key <- Keys], + + case rabbit_misc:table_lookup(Args, <<"x-overflow">>) of + undefined -> ok; + {_, <<"reject-publish-dlx">>} -> + rabbit_misc:protocol_error( + precondition_failed, + "invalid arg 'x-overflow' with value 'reject-publish-dlx' for ~s", + [rabbit_misc:rs(QueueName)]); + _ -> + ok + end, ok. check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index ccb1e1f4d8..5f14cdef25 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -346,7 +346,7 @@ declare_invalid_args(Config) -> declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-overflow">>, longstr, XOverflow}])) - || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], + || XOverflow <- [<<"reject-publish-dlx">>]], ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -1038,6 +1038,7 @@ publishing_to_unavailable_queue(Config) -> ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + ct:pal("opening channel to ~w", [Server]), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), |