diff options
author | Loïc Hoguin <79089961+lhoguin@users.noreply.github.com> | 2021-08-11 14:59:29 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 14:59:29 +0200 |
commit | 9ea36a295efe142a49e45f5a64986b1415c90a2d (patch) | |
tree | 57b0386a8969c3f20311a9243ea16cf285d42af5 | |
parent | 5c62c23490f38eabdb0a606f64c1287533c6d1f1 (diff) | |
parent | 6b55f75b74befd06e53a087b9f41056de9a2007a (diff) | |
download | rabbitmq-server-git-9ea36a295efe142a49e45f5a64986b1415c90a2d.tar.gz |
Merge pull request #3297 from rabbitmq/mergify/bp/v3.8.x/pr-3295
Add tests for the regression introduced in #3041 (backport #3295)
-rw-r--r-- | deps/rabbit/test/backing_queue_SUITE.erl | 95 |
1 files changed, 94 insertions, 1 deletions
diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 16f2740a10..f54e469aeb 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -28,6 +28,8 @@ variable_queue_fold_msg_on_disk, variable_queue_dropfetchwhile, variable_queue_dropwhile_varying_ram_duration, + variable_queue_dropwhile_restart, + variable_queue_dropwhile_sync_restart, variable_queue_fetchwhile_varying_ram_duration, variable_queue_ack_limiting, variable_queue_purge, @@ -983,6 +985,87 @@ variable_queue_dropfetchwhile2(VQ0, _QName) -> VQ5. +variable_queue_dropwhile_restart(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_dropwhile_restart1, [Config]). + +variable_queue_dropwhile_restart1(Config) -> + with_fresh_variable_queue( + fun variable_queue_dropwhile_restart2/2, + ?config(variable_queue_type, Config)). + +variable_queue_dropwhile_restart2(VQ0, QName) -> + Count = 10000, + + %% add messages with sequential expiry + VQ1 = variable_queue_publish( + true, 1, Count, + fun (N, Props) -> Props#message_properties{expiry = N} end, + fun erlang:term_to_binary/1, VQ0), + + %% drop the first 5 messages + {#message_properties{expiry = 6}, VQ2} = + rabbit_variable_queue:dropwhile( + fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1), + + _VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2), + Terms = variable_queue_read_terms(QName), + VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms), + + %% fetch 5 + VQ5 = lists:foldl(fun (_, VQN) -> + {{_, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ4, lists:seq(6, Count)), + + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ5), + + VQ5. + +variable_queue_dropwhile_sync_restart(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_dropwhile_sync_restart1, [Config]). + +variable_queue_dropwhile_sync_restart1(Config) -> + with_fresh_variable_queue( + fun variable_queue_dropwhile_sync_restart2/2, + ?config(variable_queue_type, Config)). + +variable_queue_dropwhile_sync_restart2(VQ0, QName) -> + Count = 10000, + + %% add messages with sequential expiry + VQ1 = variable_queue_publish( + true, 1, Count, + fun (N, Props) -> Props#message_properties{expiry = N} end, + fun erlang:term_to_binary/1, VQ0), + + %% drop the first 5 messages + {#message_properties{expiry = 6}, VQ2} = + rabbit_variable_queue:dropwhile( + fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1), + + %% Queue index sync. + VQ2b = rabbit_variable_queue:handle_pre_hibernate(VQ2), + + _VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2b), + Terms = variable_queue_read_terms(QName), + VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms), + + %% fetch 5 + VQ5 = lists:foldl(fun (_, VQN) -> + {{_, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ4, lists:seq(6, Count)), + + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ5), + + VQ5. + variable_queue_dropwhile_varying_ram_duration(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, variable_queue_dropwhile_varying_ram_duration1, [Config]). @@ -1370,9 +1453,19 @@ variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( Q, case Recover of true -> non_clean_shutdown; - false -> new + false -> new; + Terms -> Terms end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). +variable_queue_read_terms(QName) -> + #resource { kind = queue, + virtual_host = VHost, + name = Name } = QName, + <<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>), + DirName = rabbit_misc:format("~.36B", [Num]), + {ok, Terms} = rabbit_recovery_terms:read(VHost, DirName), + Terms. + publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), [begin |