summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-07-14 12:13:55 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:10 +0100
commit0b81094345ca926e56cd3207abfe1bf2e5ae2965 (patch)
tree735c80ef09d071ccc86de3c476b02efb4348d193
parentb5ec2249f653d6b3be2c8d0332666bf1cf3b020f (diff)
downloadrabbitmq-server-git-0b81094345ca926e56cd3207abfe1bf2e5ae2965.tar.gz
Test fixes
-rw-r--r--src/rabbit_fifo_client.erl19
-rw-r--r--src/rabbit_fifo_v0.erl1
-rw-r--r--test/quorum_queue_SUITE.erl21
3 files changed, 26 insertions, 15 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 9a6cd32a7b..8700b1e6af 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -106,22 +106,23 @@ init(ClusterName, Servers) ->
%% @param MaxPending size defining the max number of pending commands.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
#state{cfg = #cfg{cluster_name = ClusterName,
servers = Servers,
soft_limit = SoftLimit,
- timeout = Timeout}}.
+ timeout = Timeout * 1000}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
+ %% net ticktime is in seconds
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
#state{cfg = #cfg{cluster_name = ClusterName,
servers = Servers,
block_handler = BlockFun,
unblock_handler = UnblockFun,
soft_limit = SoftLimit,
- timeout = Timeout}}.
+ timeout = Timeout * 1000}}.
%% @doc Enqueues a message.
@@ -141,7 +142,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
enqueue(Correlation, Msg,
#state{queue_status = undefined,
next_enqueue_seq = 1,
- cfg = #cfg{}} = State0) ->
+ cfg = #cfg{timeout = Timeout}} = State0) ->
%% it is the first enqueue, check the version
{_, Node} = Server = pick_server(State0),
case rpc:call(Node, rabbit_fifo, version, []) of
@@ -154,7 +155,7 @@ enqueue(Correlation, Msg,
%% were running the new version on the leader do sync initialisation
%% of enqueuer session
Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg) of
+ case ra:process_command(Server, Reg, Timeout) of
{ok, reject_publish, _} ->
{reject_publish, State0#state{queue_status = reject_publish}};
{ok, ok, _} ->
@@ -167,8 +168,7 @@ enqueue(Correlation, Msg,
exit(Err)
end;
{badrpc, nodedown} ->
- rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
- State0#state{queue_status = go}
+ {reject_publish, State0}
end;
enqueue(_Correlation, _Msg,
#state{queue_status = reject_publish,
@@ -176,6 +176,7 @@ enqueue(_Correlation, _Msg,
{reject_publish, State};
enqueue(Correlation, Msg,
#state{slow = Slow,
+ queue_status = go,
cfg = #cfg{block_handler = BlockFun}} = State0) ->
Node = pick_server(State0),
{Next, State1} = next_enqueue_seq(State0),
@@ -694,7 +695,7 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
- rabbit_log:info("rabbit_fifo_client resend all pending ~w", [Seqs]),
+ rabbit_log:info("rabbit_fifo_client: resend all pending ~w", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
index 95f665d0a9..51f6bd133e 100644
--- a/src/rabbit_fifo_v0.erl
+++ b/src/rabbit_fifo_v0.erl
@@ -128,7 +128,6 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]),
update_config(Conf, #?STATE{cfg = #cfg{name = Name,
resource = Resource}}).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 5f14cdef25..d438978010 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1028,9 +1028,11 @@ recover_from_multiple_failures(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0).
publishing_to_unavailable_queue(Config) ->
+ %% publishing to an unavialable queue but with a reachable member should result
+ %% in the initial enqueuer session timing out and the message being nacked
[Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- TCh = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ TCh = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
@@ -1042,16 +1044,25 @@ publishing_to_unavailable_queue(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
- publish_many(Ch, QQ, 300),
- timer:sleep(1000),
+ publish_many(Ch, QQ, 1),
+ %% this should result in a nack
+ ok = receive
+ #'basic.ack'{} -> fail;
+ #'basic.nack'{} -> ok
+ after 90000 ->
+ exit(confirm_timeout)
+ end,
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
- %% check we get at least on ack
+ timer:sleep(2000),
+ publish_many(Ch, QQ, 1),
+ %% this should now be acked
ok = receive
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
- after 30000 ->
+ after 90000 ->
exit(confirm_timeout)
end,
+ %% check we get at least on ack
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
ok.