diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-07-14 12:13:55 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
commit | 0b81094345ca926e56cd3207abfe1bf2e5ae2965 (patch) | |
tree | 735c80ef09d071ccc86de3c476b02efb4348d193 | |
parent | b5ec2249f653d6b3be2c8d0332666bf1cf3b020f (diff) | |
download | rabbitmq-server-git-0b81094345ca926e56cd3207abfe1bf2e5ae2965.tar.gz |
Test fixes
-rw-r--r-- | src/rabbit_fifo_client.erl | 19 | ||||
-rw-r--r-- | src/rabbit_fifo_v0.erl | 1 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 21 |
3 files changed, 26 insertions, 15 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 9a6cd32a7b..8700b1e6af 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -106,22 +106,23 @@ init(ClusterName, Servers) -> %% @param MaxPending size defining the max number of pending commands. -spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> - Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, #state{cfg = #cfg{cluster_name = ClusterName, servers = Servers, soft_limit = SoftLimit, - timeout = Timeout}}. + timeout = Timeout * 1000}}. -spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> - Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, + %% net ticktime is in seconds + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, #state{cfg = #cfg{cluster_name = ClusterName, servers = Servers, block_handler = BlockFun, unblock_handler = UnblockFun, soft_limit = SoftLimit, - timeout = Timeout}}. + timeout = Timeout * 1000}}. %% @doc Enqueues a message. @@ -141,7 +142,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> enqueue(Correlation, Msg, #state{queue_status = undefined, next_enqueue_seq = 1, - cfg = #cfg{}} = State0) -> + cfg = #cfg{timeout = Timeout}} = State0) -> %% it is the first enqueue, check the version {_, Node} = Server = pick_server(State0), case rpc:call(Node, rabbit_fifo, version, []) of @@ -154,7 +155,7 @@ enqueue(Correlation, Msg, %% were running the new version on the leader do sync initialisation %% of enqueuer session Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg) of + case ra:process_command(Server, Reg, Timeout) of {ok, reject_publish, _} -> {reject_publish, State0#state{queue_status = reject_publish}}; {ok, ok, _} -> @@ -167,8 +168,7 @@ enqueue(Correlation, Msg, exit(Err) end; {badrpc, nodedown} -> - rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), - State0#state{queue_status = go} + {reject_publish, State0} end; enqueue(_Correlation, _Msg, #state{queue_status = reject_publish, @@ -176,6 +176,7 @@ enqueue(_Correlation, _Msg, {reject_publish, State}; enqueue(Correlation, Msg, #state{slow = Slow, + queue_status = go, cfg = #cfg{block_handler = BlockFun}} = State0) -> Node = pick_server(State0), {Next, State1} = next_enqueue_seq(State0), @@ -694,7 +695,7 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), - rabbit_log:info("rabbit_fifo_client resend all pending ~w", [Seqs]), + rabbit_log:info("rabbit_fifo_client: resend all pending ~w", [Seqs]), lists:foldl(fun resend/2, State, Seqs). handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index 95f665d0a9..51f6bd133e 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -128,7 +128,6 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]), update_config(Conf, #?STATE{cfg = #cfg{name = Name, resource = Resource}}). diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 5f14cdef25..d438978010 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -1028,9 +1028,11 @@ recover_from_multiple_failures(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). publishing_to_unavailable_queue(Config) -> + %% publishing to an unavialable queue but with a reachable member should result + %% in the initial enqueuer session timing out and the message being nacked [Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - TCh = rabbit_ct_client_helpers:open_channel(Config, Server1), + TCh = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), @@ -1042,16 +1044,25 @@ publishing_to_unavailable_queue(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), - publish_many(Ch, QQ, 300), - timer:sleep(1000), + publish_many(Ch, QQ, 1), + %% this should result in a nack + ok = receive + #'basic.ack'{} -> fail; + #'basic.nack'{} -> ok + after 90000 -> + exit(confirm_timeout) + end, ok = rabbit_ct_broker_helpers:start_node(Config, Server1), - %% check we get at least on ack + timer:sleep(2000), + publish_many(Ch, QQ, 1), + %% this should now be acked ok = receive #'basic.ack'{} -> ok; #'basic.nack'{} -> fail - after 30000 -> + after 90000 -> exit(confirm_timeout) end, + %% check we get at least on ack ok = rabbit_ct_broker_helpers:start_node(Config, Server2), ok. |