summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2021-08-10 17:10:00 +0200
committermergify-bot <noreply@mergify.io>2021-08-11 12:29:23 +0000
commit6b55f75b74befd06e53a087b9f41056de9a2007a (patch)
tree57b0386a8969c3f20311a9243ea16cf285d42af5
parent5c62c23490f38eabdb0a606f64c1287533c6d1f1 (diff)
downloadrabbitmq-server-git-6b55f75b74befd06e53a087b9f41056de9a2007a.tar.gz
Add tests for the regression introduced in #3041
(cherry picked from commit 24c25ab3ccc44e855ce50b0d25377cc7bbbcf3d3)
-rw-r--r--deps/rabbit/test/backing_queue_SUITE.erl95
1 files changed, 94 insertions, 1 deletions
diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl
index 16f2740a10..f54e469aeb 100644
--- a/deps/rabbit/test/backing_queue_SUITE.erl
+++ b/deps/rabbit/test/backing_queue_SUITE.erl
@@ -28,6 +28,8 @@
variable_queue_fold_msg_on_disk,
variable_queue_dropfetchwhile,
variable_queue_dropwhile_varying_ram_duration,
+ variable_queue_dropwhile_restart,
+ variable_queue_dropwhile_sync_restart,
variable_queue_fetchwhile_varying_ram_duration,
variable_queue_ack_limiting,
variable_queue_purge,
@@ -983,6 +985,87 @@ variable_queue_dropfetchwhile2(VQ0, _QName) ->
VQ5.
+variable_queue_dropwhile_restart(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, variable_queue_dropwhile_restart1, [Config]).
+
+variable_queue_dropwhile_restart1(Config) ->
+ with_fresh_variable_queue(
+ fun variable_queue_dropwhile_restart2/2,
+ ?config(variable_queue_type, Config)).
+
+variable_queue_dropwhile_restart2(VQ0, QName) ->
+ Count = 10000,
+
+ %% add messages with sequential expiry
+ VQ1 = variable_queue_publish(
+ true, 1, Count,
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% drop the first 5 messages
+ {#message_properties{expiry = 6}, VQ2} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1),
+
+ _VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2),
+ Terms = variable_queue_read_terms(QName),
+ VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (_, VQN) ->
+ {{_, _, _}, VQM} =
+ rabbit_variable_queue:fetch(false, VQN),
+ VQM
+ end, VQ4, lists:seq(6, Count)),
+
+ %% should be empty now
+ true = rabbit_variable_queue:is_empty(VQ5),
+
+ VQ5.
+
+variable_queue_dropwhile_sync_restart(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).
+
+variable_queue_dropwhile_sync_restart1(Config) ->
+ with_fresh_variable_queue(
+ fun variable_queue_dropwhile_sync_restart2/2,
+ ?config(variable_queue_type, Config)).
+
+variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
+ Count = 10000,
+
+ %% add messages with sequential expiry
+ VQ1 = variable_queue_publish(
+ true, 1, Count,
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% drop the first 5 messages
+ {#message_properties{expiry = 6}, VQ2} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1),
+
+ %% Queue index sync.
+ VQ2b = rabbit_variable_queue:handle_pre_hibernate(VQ2),
+
+ _VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2b),
+ Terms = variable_queue_read_terms(QName),
+ VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (_, VQN) ->
+ {{_, _, _}, VQM} =
+ rabbit_variable_queue:fetch(false, VQN),
+ VQM
+ end, VQ4, lists:seq(6, Count)),
+
+ %% should be empty now
+ true = rabbit_variable_queue:is_empty(VQ5),
+
+ VQ5.
+
variable_queue_dropwhile_varying_ram_duration(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_dropwhile_varying_ram_duration1, [Config]).
@@ -1370,9 +1453,19 @@ variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
Q, case Recover of
true -> non_clean_shutdown;
- false -> new
+ false -> new;
+ Terms -> Terms
end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).
+variable_queue_read_terms(QName) ->
+ #resource { kind = queue,
+ virtual_host = VHost,
+ name = Name } = QName,
+ <<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>),
+ DirName = rabbit_misc:format("~.36B", [Num]),
+ {ok, Terms} = rabbit_recovery_terms:read(VHost, DirName),
+ Terms.
+
publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
[begin