diff options
Diffstat (limited to 'deps/rabbit/test/backing_queue_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/backing_queue_SUITE.erl | 1632 |
1 files changed, 1632 insertions, 0 deletions
diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl new file mode 100644 index 0000000000..be6004c8b9 --- /dev/null +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -0,0 +1,1632 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(backing_queue_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). + +-compile(export_all). + +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). + +-define(TIMEOUT, 30000). +-define(VHOST, <<"/">>). + +-define(VARIABLE_QUEUE_TESTCASES, [ + variable_queue_dynamic_duration_change, + variable_queue_partial_segments_delta_thing, + variable_queue_all_the_bits_not_covered_elsewhere_A, + variable_queue_all_the_bits_not_covered_elsewhere_B, + variable_queue_drop, + variable_queue_fold_msg_on_disk, + variable_queue_dropfetchwhile, + variable_queue_dropwhile_varying_ram_duration, + variable_queue_fetchwhile_varying_ram_duration, + variable_queue_ack_limiting, + variable_queue_purge, + variable_queue_requeue, + variable_queue_requeue_ram_beta, + variable_queue_fold, + variable_queue_batch_publish, + variable_queue_batch_publish_delivered + ]). + +-define(BACKING_QUEUE_TESTCASES, [ + bq_queue_index, + bq_queue_index_props, + {variable_queue_default, [parallel], ?VARIABLE_QUEUE_TESTCASES}, + {variable_queue_lazy, [parallel], ?VARIABLE_QUEUE_TESTCASES ++ + [variable_queue_mode_change]}, + bq_variable_queue_delete_msg_store_files_callback, + bq_queue_recover + ]). + +all() -> + [ + {group, backing_queue_tests} + ]. + +groups() -> + [ + {backing_queue_tests, [], [ + msg_store, + {backing_queue_embed_limit_0, [], ?BACKING_QUEUE_TESTCASES}, + {backing_queue_embed_limit_1024, [], ?BACKING_QUEUE_TESTCASES} + ]} + ]. + +group(backing_queue_tests) -> + [ + %% Several tests based on lazy queues may take more than 30 minutes. + {timetrap, {hours, 1}} + ]; +group(_) -> + []. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ [ + fun(C) -> init_per_group1(Group, C) end, + fun setup_file_handle_cache/1 + ]); + false -> + rabbit_ct_helpers:run_steps(Config, [ + fun(C) -> init_per_group1(Group, C) end + ]) + end. + +init_per_group1(backing_queue_tests, Config) -> + Module = rabbit_ct_broker_helpers:rpc(Config, 0, + application, get_env, [rabbit, backing_queue_module]), + case Module of + {ok, rabbit_priority_queue} -> + rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, setup_backing_queue_test_group, [Config]); + _ -> + {skip, rabbit_misc:format( + "Backing queue module not supported by this test group: ~p~n", + [Module])} + end; +init_per_group1(backing_queue_embed_limit_0, Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, queue_index_embed_msgs_below, 0]), + Config; +init_per_group1(backing_queue_embed_limit_1024, Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, queue_index_embed_msgs_below, 1024]), + Config; +init_per_group1(variable_queue_default, Config) -> + rabbit_ct_helpers:set_config(Config, {variable_queue_type, default}); +init_per_group1(variable_queue_lazy, Config) -> + rabbit_ct_helpers:set_config(Config, {variable_queue_type, lazy}); +init_per_group1(from_cluster_node1, Config) -> + rabbit_ct_helpers:set_config(Config, {test_direction, {0, 1}}); +init_per_group1(from_cluster_node2, Config) -> + rabbit_ct_helpers:set_config(Config, {test_direction, {1, 0}}); +init_per_group1(_, Config) -> + Config. + +setup_file_handle_cache(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, setup_file_handle_cache1, []), + Config. + +setup_file_handle_cache1() -> + %% FIXME: Why are we doing this? + application:set_env(rabbit, file_handles_high_watermark, 10), + ok = file_handle_cache:set_limit(10), + ok. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + [fun(C) -> end_per_group1(Group, C) end] ++ + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +end_per_group1(backing_queue_tests, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, teardown_backing_queue_test_group, [Config]); +end_per_group1(Group, Config) +when Group =:= backing_queue_embed_limit_0 +orelse Group =:= backing_queue_embed_limit_1024 -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, queue_index_embed_msgs_below, + ?config(rmq_queue_index_embed_msgs_below, Config)]), + Config; +end_per_group1(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; + Testcase == variable_queue_fold -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbit, queue_explicit_gc_run_operation_threshold, 0]), + rabbit_ct_helpers:testcase_started(Config, Testcase); +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; + Testcase == variable_queue_fold -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, application, set_env, + [rabbit, queue_explicit_gc_run_operation_threshold, 1000]), + rabbit_ct_helpers:testcase_finished(Config, Testcase); +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Message store. +%% ------------------------------------------------------------------- + +msg_store(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, msg_store1, [Config]). + +msg_store1(_Config) -> + restart_msg_store_empty(), + MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], + {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), + Ref = rabbit_guid:gen(), + {Cap, MSCState} = msg_store_client_init_capture( + ?PERSISTENT_MSG_STORE, Ref), + Ref2 = rabbit_guid:gen(), + {Cap2, MSC2State} = msg_store_client_init_capture( + ?PERSISTENT_MSG_STORE, Ref2), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, MsgIds, MSCState), + %% test confirm logic + passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, MsgIds, MSCState), + %% publish the first half + ok = msg_store_write(MsgIds1stHalf, MSCState), + %% sync on the first half + ok = on_disk_await(Cap, MsgIds1stHalf), + %% publish the second half + ok = msg_store_write(MsgIds2ndHalf, MSCState), + %% check they're all in there + true = msg_store_contains(true, MsgIds, MSCState), + %% publish the latter half twice so we hit the caching and ref + %% count code. We need to do this through a 2nd client since a + %% single client is not supposed to write the same message more + %% than once without first removing it. + ok = msg_store_write(MsgIds2ndHalf, MSC2State), + %% check they're still all in there + true = msg_store_contains(true, MsgIds, MSCState), + %% sync on the 2nd half + ok = on_disk_await(Cap2, MsgIds2ndHalf), + %% cleanup + ok = on_disk_stop(Cap2), + ok = rabbit_msg_store:client_delete_and_terminate(MSC2State), + ok = on_disk_stop(Cap), + %% read them all + MSCState1 = msg_store_read(MsgIds, MSCState), + %% read them all again - this will hit the cache, not disk + MSCState2 = msg_store_read(MsgIds, MSCState1), + %% remove them all + ok = msg_store_remove(MsgIds, MSCState2), + %% check first half doesn't exist + false = msg_store_contains(false, MsgIds1stHalf, MSCState2), + %% check second half does exist + true = msg_store_contains(true, MsgIds2ndHalf, MSCState2), + %% read the second half again + MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), + %% read the second half again, just for fun (aka code coverage) + MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), + ok = rabbit_msg_store:client_terminate(MSCState4), + %% stop and restart, preserving every other msg in 2nd half + ok = rabbit_variable_queue:stop_msg_store(?VHOST), + ok = rabbit_variable_queue:start_msg_store(?VHOST, + [], {fun ([]) -> finished; + ([MsgId|MsgIdsTail]) + when length(MsgIdsTail) rem 2 == 0 -> + {MsgId, 1, MsgIdsTail}; + ([MsgId|MsgIdsTail]) -> + {MsgId, 0, MsgIdsTail} + end, MsgIds2ndHalf}), + MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + %% check we have the right msgs left + lists:foldl( + fun (MsgId, Bool) -> + not(Bool = rabbit_msg_store:contains(MsgId, MSCState5)) + end, false, MsgIds2ndHalf), + ok = rabbit_msg_store:client_terminate(MSCState5), + %% restart empty + restart_msg_store_empty(), + MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + %% check we don't contain any of the msgs + false = msg_store_contains(false, MsgIds, MSCState6), + %% publish the first half again + ok = msg_store_write(MsgIds1stHalf, MSCState6), + %% this should force some sort of sync internally otherwise misread + ok = rabbit_msg_store:client_terminate( + msg_store_read(MsgIds1stHalf, MSCState6)), + MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + ok = msg_store_remove(MsgIds1stHalf, MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7), + %% restart empty + restart_msg_store_empty(), %% now safe to reuse msg_ids + %% push a lot of msgs in... at least 100 files worth + {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), + PayloadSizeBits = 65536, + BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), + MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], + Payload = << 0:PayloadSizeBits >>, + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MSCStateM) -> + [ok = rabbit_msg_store:write(MsgId, Payload, MSCStateM) || + MsgId <- MsgIdsBig], + MSCStateM + end), + %% now read them to ensure we hit the fast client-side reading + ok = foreach_with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MsgId, MSCStateM) -> + {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( + MsgId, MSCStateM), + MSCStateN + end, MsgIdsBig), + %% .., then 3s by 1... + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [msg_id_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + %% .., then remove 3s by 2, from the young end first. This hits + %% GC (under 50% good data left, but no empty files. Must GC). + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [msg_id_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + %% .., then remove 3s by 3, from the young end first. This hits + %% GC... + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [msg_id_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + %% ensure empty + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MSCStateM) -> + false = msg_store_contains(false, MsgIdsBig, MSCStateM), + MSCStateM + end), + %% + passed = test_msg_store_client_delete_and_terminate(), + %% restart empty + restart_msg_store_empty(), + passed. + +restart_msg_store_empty() -> + ok = rabbit_variable_queue:stop_msg_store(?VHOST), + ok = rabbit_variable_queue:start_msg_store(?VHOST, + undefined, {fun (ok) -> finished end, ok}). + +msg_id_bin(X) -> + erlang:md5(term_to_binary(X)). + +on_disk_capture() -> + receive + {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid); + stop -> done + end. + +on_disk_capture([_|_], _Awaiting, Pid) -> + Pid ! {self(), surplus}; +on_disk_capture(OnDisk, Awaiting, Pid) -> + receive + {on_disk, MsgIdsS} -> + MsgIds = gb_sets:to_list(MsgIdsS), + on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds, + Pid); + stop -> + done + after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) -> + case Awaiting of + [] -> Pid ! {self(), arrived}, on_disk_capture(); + _ -> Pid ! {self(), timeout} + end + end. + +on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> + Pid ! {await, MsgIds, self()}, + receive + {Pid, arrived} -> ok; + {Pid, Error} -> Error + end. + +on_disk_stop(Pid) -> + MRef = erlang:monitor(process, Pid), + Pid ! stop, + receive {'DOWN', MRef, process, Pid, _Reason} -> + ok + end. + +msg_store_client_init_capture(MsgStore, Ref) -> + Pid = spawn(fun on_disk_capture/0), + {Pid, rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, + fun (MsgIds, _ActionTaken) -> + Pid ! {on_disk, MsgIds} + end, undefined)}. + +msg_store_contains(Atom, MsgIds, MSCState) -> + Atom = lists:foldl( + fun (MsgId, Atom1) when Atom1 =:= Atom -> + rabbit_msg_store:contains(MsgId, MSCState) end, + Atom, MsgIds). + +msg_store_read(MsgIds, MSCState) -> + lists:foldl(fun (MsgId, MSCStateM) -> + {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( + MsgId, MSCStateM), + MSCStateN + end, MSCState, MsgIds). + +msg_store_write(MsgIds, MSCState) -> + ok = lists:foldl(fun (MsgId, ok) -> + rabbit_msg_store:write(MsgId, MsgId, MSCState) + end, ok, MsgIds). + +msg_store_write_flow(MsgIds, MSCState) -> + ok = lists:foldl(fun (MsgId, ok) -> + rabbit_msg_store:write_flow(MsgId, MsgId, MSCState) + end, ok, MsgIds). + +msg_store_remove(MsgIds, MSCState) -> + rabbit_msg_store:remove(MsgIds, MSCState). + +msg_store_remove(MsgStore, Ref, MsgIds) -> + with_msg_store_client(MsgStore, Ref, + fun (MSCStateM) -> + ok = msg_store_remove(MsgIds, MSCStateM), + MSCStateM + end). + +with_msg_store_client(MsgStore, Ref, Fun) -> + rabbit_msg_store:client_terminate( + Fun(msg_store_client_init(MsgStore, Ref))). + +foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> + rabbit_msg_store:client_terminate( + lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end, + msg_store_client_init(MsgStore, Ref), L)). + +test_msg_store_confirms(MsgIds, Cap, MSCState) -> + %% write -> confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% remove -> _ + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, []), + %% write, remove -> confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% write, remove, write -> confirmed, confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds ++ MsgIds), + %% remove, write -> confirmed + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% remove, write, remove -> confirmed + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% confirmation on timer-based sync + passed = test_msg_store_confirm_timer(), + passed. + +test_msg_store_confirm_timer() -> + Ref = rabbit_guid:gen(), + MsgId = msg_id_bin(1), + Self = self(), + MSCState = rabbit_vhost_msg_store:client_init( + ?VHOST, + ?PERSISTENT_MSG_STORE, + Ref, + fun (MsgIds, _ActionTaken) -> + case gb_sets:is_member(MsgId, MsgIds) of + true -> Self ! on_disk; + false -> ok + end + end, undefined), + ok = msg_store_write([MsgId], MSCState), + ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), + ok = msg_store_remove([MsgId], MSCState), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), + passed. + +msg_store_keep_busy_until_confirm(MsgIds, MSCState, Blocked) -> + After = case Blocked of + false -> 0; + true -> ?MAX_WAIT + end, + Recurse = fun () -> msg_store_keep_busy_until_confirm( + MsgIds, MSCState, credit_flow:blocked()) end, + receive + on_disk -> ok; + {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), + Recurse() + after After -> + ok = msg_store_write_flow(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + Recurse() + end. + +test_msg_store_client_delete_and_terminate() -> + restart_msg_store_empty(), + MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], + Ref = rabbit_guid:gen(), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + ok = msg_store_write(MsgIds, MSCState), + %% test the 'dying client' fast path for writes + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), + passed. + +%% ------------------------------------------------------------------- +%% Backing queue. +%% ------------------------------------------------------------------- + +setup_backing_queue_test_group(Config) -> + {ok, FileSizeLimit} = + application:get_env(rabbit, msg_store_file_size_limit), + application:set_env(rabbit, msg_store_file_size_limit, 512), + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + application:set_env(rabbit, queue_index_max_journal_entries, 128), + application:set_env(rabbit, msg_store_file_size_limit, + FileSizeLimit), + {ok, Bytes} = + application:get_env(rabbit, queue_index_embed_msgs_below), + rabbit_ct_helpers:set_config(Config, [ + {rmq_queue_index_max_journal_entries, MaxJournal}, + {rmq_queue_index_embed_msgs_below, Bytes} + ]). + +teardown_backing_queue_test_group(Config) -> + %% FIXME: Undo all the setup function did. + application:set_env(rabbit, queue_index_max_journal_entries, + ?config(rmq_queue_index_max_journal_entries, Config)), + %% We will have restarted the message store, and thus changed + %% the order of the children of rabbit_sup. This will cause + %% problems if there are subsequent failures - see bug 24262. + ok = restart_app(), + Config. + +bq_queue_index(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, bq_queue_index1, [Config]). + +bq_queue_index1(_Config) -> + SegmentSize = rabbit_queue_index:next_segment_boundary(0), + TwoSegs = SegmentSize + SegmentSize, + MostOfASegment = trunc(SegmentSize*0.75), + SeqIdsA = lists:seq(0, MostOfASegment-1), + SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment), + SeqIdsC = lists:seq(0, trunc(SegmentSize/2)), + SeqIdsD = lists:seq(0, SegmentSize*4), + + with_empty_test_queue( + fun (Qi0, QName) -> + {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), + {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), + {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), + {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), + ok = verify_read_with_published(false, false, ReadA, + lists:reverse(SeqIdsMsgIdsA)), + %% should get length back as 0, as all the msgs were transient + {0, 0, Qi6} = restart_test_queue(Qi4, QName), + {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), + {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), + {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), + {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), + ok = verify_read_with_published(false, true, ReadB, + lists:reverse(SeqIdsMsgIdsB)), + %% should get length back as MostOfASegment + LenB = length(SeqIdsB), + BytesB = LenB * 10, + {LenB, BytesB, Qi12} = restart_test_queue(Qi10, QName), + {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), + Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), + {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), + ok = verify_read_with_published(true, true, ReadC, + lists:reverse(SeqIdsMsgIdsB)), + Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), + Qi17 = rabbit_queue_index:flush(Qi16), + %% Everything will have gone now because #pubs == #acks + {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), + %% should get length back as 0 because all persistent + %% msgs have been acked + {0, 0, Qi19} = restart_test_queue(Qi18, QName), + Qi19 + end), + + %% These next bits are just to hit the auto deletion of segment files. + %% First, partials: + %% a) partial pub+del+ack, then move to new segment + with_empty_test_queue( + fun (Qi0, _QName) -> + {Qi1, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, + false, Qi0), + Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), + Qi3 = rabbit_queue_index:ack(SeqIdsC, Qi2), + Qi4 = rabbit_queue_index:flush(Qi3), + {Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize], + false, Qi4), + Qi5 + end), + + %% b) partial pub+del, then move to new segment, then ack all in old segment + with_empty_test_queue( + fun (Qi0, _QName) -> + {Qi1, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, + false, Qi0), + Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), + {Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], + false, Qi2), + Qi4 = rabbit_queue_index:ack(SeqIdsC, Qi3), + rabbit_queue_index:flush(Qi4) + end), + + %% c) just fill up several segments of all pubs, then +dels, then +acks + with_empty_test_queue( + fun (Qi0, _QName) -> + {Qi1, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, + false, Qi0), + Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1), + Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2), + rabbit_queue_index:flush(Qi3) + end), + + %% d) get messages in all states to a segment, then flush, then do + %% the same again, don't flush and read. This will hit all + %% possibilities in combining the segment with the journal. + with_empty_test_queue( + fun (Qi0, _QName) -> + {Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], + false, Qi0), + Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), + Qi3 = rabbit_queue_index:ack([0], Qi2), + Qi4 = rabbit_queue_index:flush(Qi3), + {Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4), + Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), + Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), + {[], Qi8} = rabbit_queue_index:read(0, 4, Qi7), + {ReadD, Qi9} = rabbit_queue_index:read(4, 7, Qi8), + ok = verify_read_with_published(true, false, ReadD, + [Four, Five, Six]), + {ReadE, Qi10} = rabbit_queue_index:read(7, 9, Qi9), + ok = verify_read_with_published(false, false, ReadE, + [Seven, Eight]), + Qi10 + end), + + %% e) as for (d), but use terminate instead of read, which will + %% exercise journal_minus_segment, not segment_plus_journal. + with_empty_test_queue( + fun (Qi0, QName) -> + {Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7], + true, Qi0), + Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), + Qi3 = rabbit_queue_index:ack([0], Qi2), + {5, 50, Qi4} = restart_test_queue(Qi3, QName), + {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4), + Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), + Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), + {5, 50, Qi8} = restart_test_queue(Qi7, QName), + Qi8 + end), + + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), + + passed. + +bq_queue_index_props(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, bq_queue_index_props1, [Config]). + +bq_queue_index_props1(_Config) -> + with_empty_test_queue( + fun(Qi0, _QName) -> + MsgId = rabbit_guid:gen(), + Props = #message_properties{expiry=12345, size = 10}, + Qi1 = rabbit_queue_index:publish( + MsgId, 1, Props, true, infinity, Qi0), + {[{MsgId, 1, Props, _, _}], Qi2} = + rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), + + passed. + +bq_variable_queue_delete_msg_store_files_callback(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, bq_variable_queue_delete_msg_store_files_callback1, [Config]). + +bq_variable_queue_delete_msg_store_files_callback1(Config) -> + ok = restart_msg_store_empty(), + QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + Payload = <<0:8388608>>, %% 1MB + Count = 30, + QTState = publish_and_confirm(Q, Payload, Count), + + rabbit_amqqueue:set_ram_duration_target(QPid, 0), + + {ok, Limiter} = rabbit_limiter:start_link(no_id), + + CountMinusOne = Count - 1, + {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}, _} = + rabbit_amqqueue:basic_get(Q, true, Limiter, + <<"bq_variable_queue_delete_msg_store_files_callback1">>, + QTState), + {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), + + %% give the queue a second to receive the close_fds callback msg + timer:sleep(1000), + + rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>), + passed. + +bq_queue_recover(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, bq_queue_recover1, [Config]). + +bq_queue_recover1(Config) -> + Count = 2 * rabbit_queue_index:next_segment_boundary(0), + QName0 = queue_name(Config, <<"bq_queue_recover-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + QT = publish_and_confirm(Q, <<>>, Count), + SupPid = get_queue_sup_pid(Q), + true = is_pid(SupPid), + exit(SupPid, kill), + exit(QPid, kill), + MRef = erlang:monitor(process, QPid), + receive {'DOWN', MRef, process, QPid, _Info} -> ok + after 10000 -> exit(timeout_waiting_for_queue_death) + end, + rabbit_amqqueue:stop(?VHOST), + {Recovered, []} = rabbit_amqqueue:recover(?VHOST), + rabbit_amqqueue:start(Recovered), + {ok, Limiter} = rabbit_limiter:start_link(no_id), + rabbit_amqqueue:with_or_die( + QName, + fun (Q1) when ?is_amqqueue(Q1) -> + QPid1 = amqqueue:get_pid(Q1), + CountMinusOne = Count - 1, + {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}, _} = + rabbit_amqqueue:basic_get(Q1, false, Limiter, + <<"bq_queue_recover1">>, QT), + exit(QPid1, shutdown), + VQ1 = variable_queue_init(Q, true), + {{_Msg1, true, _AckTag1}, VQ2} = + rabbit_variable_queue:fetch(true, VQ1), + CountMinusOne = rabbit_variable_queue:len(VQ2), + _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), + ok = rabbit_amqqueue:internal_delete(QName, <<"acting-user">>) + end), + passed. + +%% Return the PID of the given queue's supervisor. +get_queue_sup_pid(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + VHost = QName#resource.virtual_host, + {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)), + Sups = supervisor:which_children(AmqSup), + get_queue_sup_pid(Sups, QPid). + +get_queue_sup_pid([{_, SupPid, _, _} | Rest], QueuePid) -> + WorkerPids = [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)], + case lists:member(QueuePid, WorkerPids) of + true -> SupPid; + false -> get_queue_sup_pid(Rest, QueuePid) + end; +get_queue_sup_pid([], _QueuePid) -> + undefined. + +variable_queue_dynamic_duration_change(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_dynamic_duration_change1, [Config]). + +variable_queue_dynamic_duration_change1(Config) -> + with_fresh_variable_queue( + fun variable_queue_dynamic_duration_change2/2, + ?config(variable_queue_type, Config)). + +variable_queue_dynamic_duration_change2(VQ0, _QName) -> + SegmentSize = rabbit_queue_index:next_segment_boundary(0), + + %% start by sending in a couple of segments worth + Len = 2*SegmentSize, + VQ1 = variable_queue_publish(false, Len, VQ0), + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + VQ7 = lists:foldl( + fun (Duration1, VQ4) -> + {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), + VQ6 = variable_queue_set_ram_duration_target( + Duration1, VQ5), + publish_fetch_and_ack(Churn, Len, VQ6) + end, VQ3, [Duration / 4, 0, Duration / 4, infinity]), + + %% drain + {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), + + VQ10. + +variable_queue_partial_segments_delta_thing(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_partial_segments_delta_thing1, [Config]). + +variable_queue_partial_segments_delta_thing1(Config) -> + with_fresh_variable_queue( + fun variable_queue_partial_segments_delta_thing2/2, + ?config(variable_queue_type, Config)). + +variable_queue_partial_segments_delta_thing2(VQ0, _QName) -> + SegmentSize = rabbit_queue_index:next_segment_boundary(0), + HalfSegment = SegmentSize div 2, + OneAndAHalfSegment = SegmentSize + HalfSegment, + VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0), + {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), + VQ3 = check_variable_queue_status( + variable_queue_set_ram_duration_target(0, VQ2), + %% one segment in q3, and half a segment in delta + [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, + {q3, SegmentSize}, + {len, SegmentSize + HalfSegment}]), + VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), + VQ5 = check_variable_queue_status( + variable_queue_publish(true, 1, VQ4), + %% one alpha, but it's in the same segment as the deltas + [{q1, 1}, + {delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, + {q3, SegmentSize}, + {len, SegmentSize + HalfSegment + 1}]), + {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false, + SegmentSize + HalfSegment + 1, VQ5), + VQ7 = check_variable_queue_status( + VQ6, + %% the half segment should now be in q3 + [{q1, 1}, + {delta, {delta, undefined, 0, undefined}}, + {q3, HalfSegment}, + {len, HalfSegment + 1}]), + {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, + HalfSegment + 1, VQ7), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + %% should be empty now + {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), + VQ10. + +variable_queue_all_the_bits_not_covered_elsewhere_A(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, [Config]). + +variable_queue_all_the_bits_not_covered_elsewhere_A1(Config) -> + with_fresh_variable_queue( + fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2, + ?config(variable_queue_type, Config)). + +variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) -> + Count = 2 * rabbit_queue_index:next_segment_boundary(0), + VQ1 = variable_queue_publish(true, Count, VQ0), + VQ2 = variable_queue_publish(false, Count, VQ1), + VQ3 = variable_queue_set_ram_duration_target(0, VQ2), + {VQ4, _AckTags} = variable_queue_fetch(Count, true, false, + Count + Count, VQ3), + {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, + Count, VQ4), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), + VQ7 = variable_queue_init(test_amqqueue(QName, true), true), + {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), + Count1 = rabbit_variable_queue:len(VQ8), + VQ9 = variable_queue_publish(false, 1, VQ8), + VQ10 = variable_queue_set_ram_duration_target(0, VQ9), + {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), + {VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11), + VQ12. + +variable_queue_all_the_bits_not_covered_elsewhere_B(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, [Config]). + +variable_queue_all_the_bits_not_covered_elsewhere_B1(Config) -> + with_fresh_variable_queue( + fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2, + ?config(variable_queue_type, Config)). + +variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ0, QName) -> + VQ1 = variable_queue_set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish(false, 4, VQ1), + {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), + {_Guids, VQ4} = + rabbit_variable_queue:requeue(AckTags, VQ3), + VQ5 = rabbit_variable_queue:timeout(VQ4), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), + VQ7 = variable_queue_init(test_amqqueue(QName, true), true), + {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), + VQ8. + +variable_queue_drop(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_drop1, [Config]). + +variable_queue_drop1(Config) -> + with_fresh_variable_queue( + fun variable_queue_drop2/2, + ?config(variable_queue_type, Config)). + +variable_queue_drop2(VQ0, _QName) -> + %% start by sending a messages + VQ1 = variable_queue_publish(false, 1, VQ0), + %% drop message with AckRequired = true + {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + true = rabbit_variable_queue:is_empty(VQ2), + true = AckTag =/= undefinded, + %% drop again -> empty + {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), + %% requeue + {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), + %% drop message with AckRequired = false + {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + true = rabbit_variable_queue:is_empty(VQ5), + VQ5. + +variable_queue_fold_msg_on_disk(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_fold_msg_on_disk1, [Config]). + +variable_queue_fold_msg_on_disk1(Config) -> + with_fresh_variable_queue( + fun variable_queue_fold_msg_on_disk2/2, + ?config(variable_queue_type, Config)). + +variable_queue_fold_msg_on_disk2(VQ0, _QName) -> + VQ1 = variable_queue_publish(true, 1, VQ0), + {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), + {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end, + ok, VQ2, AckTags), + VQ3. + +variable_queue_dropfetchwhile(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_dropfetchwhile1, [Config]). + +variable_queue_dropfetchwhile1(Config) -> + with_fresh_variable_queue( + fun variable_queue_dropfetchwhile2/2, + ?config(variable_queue_type, Config)). + +variable_queue_dropfetchwhile2(VQ0, _QName) -> + Count = 10, + + %% add messages with sequential expiry + VQ1 = variable_queue_publish( + false, 1, Count, + fun (N, Props) -> Props#message_properties{expiry = N} end, + fun erlang:term_to_binary/1, VQ0), + + %% fetch the first 5 messages + {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = + rabbit_variable_queue:fetchwhile( + fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, + fun (Msg, AckTag, {MsgAcc, AckAcc}) -> + {[Msg | MsgAcc], [AckTag | AckAcc]} + end, {[], []}, VQ1), + true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], + + %% requeue them + {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2), + + %% drop the first 5 messages + {#message_properties{expiry = 6}, VQ4} = + rabbit_variable_queue:dropwhile( + fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3), + + %% fetch 5 + VQ5 = lists:foldl(fun (N, VQN) -> + {{Msg, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + true = msg2int(Msg) == N, + VQM + end, VQ4, lists:seq(6, Count)), + + %% should be empty now + true = rabbit_variable_queue:is_empty(VQ5), + + VQ5. + +variable_queue_dropwhile_varying_ram_duration(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_dropwhile_varying_ram_duration1, [Config]). + +variable_queue_dropwhile_varying_ram_duration1(Config) -> + with_fresh_variable_queue( + fun variable_queue_dropwhile_varying_ram_duration2/2, + ?config(variable_queue_type, Config)). + +variable_queue_dropwhile_varying_ram_duration2(VQ0, _QName) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, VQ2} = rabbit_variable_queue:dropwhile( + fun (_) -> false end, VQ1), + VQ2 + end, VQ0). + +variable_queue_fetchwhile_varying_ram_duration(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_fetchwhile_varying_ram_duration1, [Config]). + +variable_queue_fetchwhile_varying_ram_duration1(Config) -> + with_fresh_variable_queue( + fun variable_queue_fetchwhile_varying_ram_duration2/2, + ?config(variable_queue_type, Config)). + +variable_queue_fetchwhile_varying_ram_duration2(VQ0, _QName) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( + fun (_) -> false end, + fun (_, _, A) -> A end, + ok, VQ1), + VQ2 + end, VQ0). + +test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> + VQ1 = variable_queue_publish(false, 1, VQ0), + VQ2 = variable_queue_set_ram_duration_target(0, VQ1), + VQ3 = Fun(VQ2), + VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), + VQ5 = variable_queue_publish(false, 1, VQ4), + VQ6 = Fun(VQ5), + VQ6. + +variable_queue_ack_limiting(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_ack_limiting1, [Config]). + +variable_queue_ack_limiting1(Config) -> + with_fresh_variable_queue( + fun variable_queue_ack_limiting2/2, + ?config(variable_queue_type, Config)). + +variable_queue_ack_limiting2(VQ0, _Config) -> + %% start by sending in a bunch of messages + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status( + VQ4, [{len, Len div 2}, + {messages_unacknowledged_ram, Len div 2}, + {messages_ready_ram, Len div 2}, + {messages_ram, Len}]), + + %% ensure all acks go to disk on 0 duration target + VQ6 = check_variable_queue_status( + variable_queue_set_ram_duration_target(0, VQ5), + [{len, Len div 2}, + {target_ram_count, 0}, + {messages_unacknowledged_ram, 0}, + {messages_ready_ram, 0}, + {messages_ram, 0}]), + + VQ6. + +variable_queue_purge(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_purge1, [Config]). + +variable_queue_purge1(Config) -> + with_fresh_variable_queue( + fun variable_queue_purge2/2, + ?config(variable_queue_type, Config)). + +variable_queue_purge2(VQ0, _Config) -> + LenDepth = fun (VQ) -> + {rabbit_variable_queue:len(VQ), + rabbit_variable_queue:depth(VQ)} + end, + VQ1 = variable_queue_publish(false, 10, VQ0), + {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1), + {4, VQ3} = rabbit_variable_queue:purge(VQ2), + {0, 6} = LenDepth(VQ3), + {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3), + {2, 6} = LenDepth(VQ4), + VQ5 = rabbit_variable_queue:purge_acks(VQ4), + {2, 2} = LenDepth(VQ5), + VQ5. + +variable_queue_requeue(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_requeue1, [Config]). + +variable_queue_requeue1(Config) -> + with_fresh_variable_queue( + fun variable_queue_requeue2/2, + ?config(variable_queue_type, Config)). + +variable_queue_requeue2(VQ0, _Config) -> + {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Msgs = + lists:zip(RequeuedMsgs, + lists:duplicate(length(RequeuedMsgs), true)) ++ + lists:zip(FreshMsgs, + lists:duplicate(length(FreshMsgs), false)), + VQ2 = lists:foldl(fun ({I, Requeued}, VQa) -> + {{M, MRequeued, _}, VQb} = + rabbit_variable_queue:fetch(true, VQa), + Requeued = MRequeued, %% assertion + I = msg2int(M), %% assertion + VQb + end, VQ1, Msgs), + {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), + VQ3. + +%% requeue from ram_pending_ack into q3, move to delta and then empty queue +variable_queue_requeue_ram_beta(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_requeue_ram_beta1, [Config]). + +variable_queue_requeue_ram_beta1(Config) -> + with_fresh_variable_queue( + fun variable_queue_requeue_ram_beta2/2, + ?config(variable_queue_type, Config)). + +variable_queue_requeue_ram_beta2(VQ0, _Config) -> + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, + VQ1 = variable_queue_publish(false, Count, VQ0), + {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1), + {Back, Front} = lists:split(Count div 2, AcksR), + {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2), + VQ4 = variable_queue_set_ram_duration_target(0, VQ3), + {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4), + VQ6 = requeue_one_by_one(Front, VQ5), + {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6), + {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7), + VQ8. + +variable_queue_fold(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_fold1, [Config]). + +variable_queue_fold1(Config) -> + with_fresh_variable_queue( + fun variable_queue_fold2/2, + ?config(variable_queue_type, Config)). + +variable_queue_fold2(VQ0, _Config) -> + {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Count = rabbit_variable_queue:depth(VQ1), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> + {Acc, VQ1} = rabbit_variable_queue:fold( + fun (M, _, Pending, A) -> + MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert + case MInt =< Cut of + true -> {cont, [MInt | A]}; + false -> {stop, A} + end + end, [], VQ0), + Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs), + Expected = lists:reverse(Acc), %% assertion + VQ1. + +variable_queue_batch_publish(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_batch_publish1, [Config]). + +variable_queue_batch_publish1(Config) -> + with_fresh_variable_queue( + fun variable_queue_batch_publish2/2, + ?config(variable_queue_type, Config)). + +variable_queue_batch_publish2(VQ, _Config) -> + Count = 10, + VQ1 = variable_queue_batch_publish(true, Count, VQ), + Count = rabbit_variable_queue:len(VQ1), + VQ1. + +variable_queue_batch_publish_delivered(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_batch_publish_delivered1, [Config]). + +variable_queue_batch_publish_delivered1(Config) -> + with_fresh_variable_queue( + fun variable_queue_batch_publish_delivered2/2, + ?config(variable_queue_type, Config)). + +variable_queue_batch_publish_delivered2(VQ, _Config) -> + Count = 10, + VQ1 = variable_queue_batch_publish_delivered(true, Count, VQ), + Count = rabbit_variable_queue:depth(VQ1), + VQ1. + +%% same as test_variable_queue_requeue_ram_beta but randomly changing +%% the queue mode after every step. +variable_queue_mode_change(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_mode_change1, [Config]). + +variable_queue_mode_change1(Config) -> + with_fresh_variable_queue( + fun variable_queue_mode_change2/2, + ?config(variable_queue_type, Config)). + +variable_queue_mode_change2(VQ0, _Config) -> + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, + VQ1 = variable_queue_publish(false, Count, VQ0), + VQ2 = maybe_switch_queue_mode(VQ1), + {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), + VQ4 = maybe_switch_queue_mode(VQ3), + {Back, Front} = lists:split(Count div 2, AcksR), + {_, VQ5} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ4), + VQ6 = maybe_switch_queue_mode(VQ5), + VQ7 = variable_queue_set_ram_duration_target(0, VQ6), + VQ8 = maybe_switch_queue_mode(VQ7), + {_, VQ9} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ8), + VQ10 = maybe_switch_queue_mode(VQ9), + VQ11 = requeue_one_by_one(Front, VQ10), + VQ12 = maybe_switch_queue_mode(VQ11), + {VQ13, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ12), + VQ14 = maybe_switch_queue_mode(VQ13), + {_, VQ15} = rabbit_variable_queue:ack(AcksAll, VQ14), + VQ16 = maybe_switch_queue_mode(VQ15), + VQ16. + +maybe_switch_queue_mode(VQ) -> + Mode = random_queue_mode(), + set_queue_mode(Mode, VQ). + +random_queue_mode() -> + Modes = [lazy, default], + lists:nth(rand:uniform(length(Modes)), Modes). + +pub_res({_, VQS}) -> + VQS; +pub_res(VQS) -> + VQS. + +make_publish(IsPersistent, PayloadFun, PropFun, N) -> + {rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{size = 10}), + false}. + +make_publish_delivered(IsPersistent, PayloadFun, PropFun, N) -> + {rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{size = 10})}. + +queue_name(Config, Name) -> + Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)), + queue_name(Name1). + +queue_name(Name) -> + rabbit_misc:r(<<"/">>, queue, Name). + +test_queue() -> + queue_name(rabbit_guid:gen()). + +init_test_queue(QName) -> + PRef = rabbit_guid:gen(), + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), + Res = rabbit_queue_index:recover( + QName, [], false, + fun (MsgId) -> + rabbit_msg_store:contains(MsgId, PersistentClient) + end, + fun nop/1, fun nop/1), + ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), + Res. + +restart_test_queue(Qi, QName) -> + _ = rabbit_queue_index:terminate(?VHOST, [], Qi), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, [QName]), + init_test_queue(QName). + +empty_test_queue(QName) -> + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), + {0, 0, Qi} = init_test_queue(QName), + _ = rabbit_queue_index:delete_and_terminate(Qi), + ok. + +unin_empty_test_queue(QName) -> + {0, 0, Qi} = init_test_queue(QName), + _ = rabbit_queue_index:delete_and_terminate(Qi), + ok. + +with_empty_test_queue(Fun) -> + QName = test_queue(), + ok = empty_test_queue(QName), + {0, 0, Qi} = init_test_queue(QName), + rabbit_queue_index:delete_and_terminate(Fun(Qi, QName)). + +restart_app() -> + rabbit:stop(), + rabbit:start(). + +queue_index_publish(SeqIds, Persistent, Qi) -> + Ref = rabbit_guid:gen(), + MsgStore = case Persistent of + true -> ?PERSISTENT_MSG_STORE; + false -> ?TRANSIENT_MSG_STORE + end, + MSCState = msg_store_client_init(MsgStore, Ref), + {A, B = [{_SeqId, LastMsgIdWritten} | _]} = + lists:foldl( + fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> + MsgId = rabbit_guid:gen(), + QiM = rabbit_queue_index:publish( + MsgId, SeqId, #message_properties{size = 10}, + Persistent, infinity, QiN), + ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), + {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} + end, {Qi, []}, SeqIds), + %% do this just to force all of the publishes through to the msg_store: + true = rabbit_msg_store:contains(LastMsgIdWritten, MSCState), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), + {A, B}. + +verify_read_with_published(_Delivered, _Persistent, [], _) -> + ok; +verify_read_with_published(Delivered, Persistent, + [{MsgId, SeqId, _Props, Persistent, Delivered}|Read], + [{SeqId, MsgId}|Published]) -> + verify_read_with_published(Delivered, Persistent, Read, Published); +verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> + ko. + +nop(_) -> ok. +nop(_, _) -> ok. + +msg_store_client_init(MsgStore, Ref) -> + rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined, undefined). + +variable_queue_init(Q, Recover) -> + rabbit_variable_queue:init( + Q, case Recover of + true -> non_clean_shutdown; + false -> new + end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). + +publish_and_confirm(Q, Payload, Count) -> + Seqs = lists:seq(1, Count), + QTState0 = rabbit_queue_type:new(Q, rabbit_queue_type:init()), + QTState = + lists:foldl( + fun (Seq, Acc0) -> + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, + Payload), + Delivery = #delivery{mandatory = false, sender = self(), + confirm = true, message = Msg, msg_seq_no = Seq, + flow = noflow}, + {ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0), + Acc + end, QTState0, Seqs), + wait_for_confirms(gb_sets:from_list(Seqs)), + QTState. + +wait_for_confirms(Unconfirmed) -> + case gb_sets:is_empty(Unconfirmed) of + true -> ok; + false -> + receive + {'$gen_cast', + {queue_event, _QName, {confirm, Confirmed, _}}} -> + wait_for_confirms( + rabbit_misc:gb_sets_difference( + Unconfirmed, gb_sets:from_list(Confirmed))); + {'$gen_cast', {confirm, Confirmed, _}} -> + wait_for_confirms( + rabbit_misc:gb_sets_difference( + Unconfirmed, gb_sets:from_list(Confirmed))) + after ?TIMEOUT -> + flush(), + exit(timeout_waiting_for_confirm) + end + end. + +with_fresh_variable_queue(Fun, Mode) -> + Ref = make_ref(), + Me = self(), + %% Run in a separate process since rabbit_msg_store will send + %% bump_credit messages and we want to ignore them + spawn_link(fun() -> + QName = test_queue(), + ok = unin_empty_test_queue(QName), + VQ = variable_queue_init(test_amqqueue(QName, true), false), + S0 = variable_queue_status(VQ), + assert_props(S0, [{q1, 0}, {q2, 0}, + {delta, + {delta, undefined, 0, undefined}}, + {q3, 0}, {q4, 0}, + {len, 0}]), + VQ1 = set_queue_mode(Mode, VQ), + try + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ1, QName)), + Me ! Ref + catch + Type:Error:Stacktrace -> + Me ! {Ref, Type, Error, Stacktrace} + end + end), + receive + Ref -> ok; + {Ref, Type, Error, ST} -> exit({Type, Error, ST}) + end, + passed. + +set_queue_mode(Mode, VQ) -> + VQ1 = rabbit_variable_queue:set_queue_mode(Mode, VQ), + S1 = variable_queue_status(VQ1), + assert_props(S1, [{mode, Mode}]), + VQ1. + +variable_queue_publish(IsPersistent, Count, VQ) -> + variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> + variable_queue_publish(IsPersistent, 1, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> + variable_queue_wait_for_shuffling_end( + lists:foldl( + fun (N, VQN) -> + + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{size = 10}), + false, self(), noflow, VQN) + end, VQ, lists:seq(Start, Start + Count - 1))). + +variable_queue_batch_publish(IsPersistent, Count, VQ) -> + variable_queue_batch_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_batch_publish(IsPersistent, Count, PropFun, VQ) -> + variable_queue_batch_publish(IsPersistent, 1, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_batch_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> + variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, + PayloadFun, fun make_publish/4, + fun rabbit_variable_queue:batch_publish/4, + VQ). + +variable_queue_batch_publish_delivered(IsPersistent, Count, VQ) -> + variable_queue_batch_publish_delivered(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_batch_publish_delivered(IsPersistent, Count, PropFun, VQ) -> + variable_queue_batch_publish_delivered(IsPersistent, 1, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_batch_publish_delivered(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> + variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, + PayloadFun, fun make_publish_delivered/4, + fun rabbit_variable_queue:batch_publish_delivered/4, + VQ). + +variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, PayloadFun, + MakePubFun, PubFun, VQ) -> + Publishes = + [MakePubFun(IsPersistent, PayloadFun, PropFun, N) + || N <- lists:seq(Start, Start + Count - 1)], + Res = PubFun(Publishes, self(), noflow, VQ), + VQ1 = pub_res(Res), + variable_queue_wait_for_shuffling_end(VQ1). + +variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> + lists:foldl(fun (N, {VQN, AckTagsAcc}) -> + Rem = Len - N, + {{#basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTagN}, VQM} = + rabbit_variable_queue:fetch(true, VQN), + Rem = rabbit_variable_queue:len(VQM), + {VQM, [AckTagN | AckTagsAcc]} + end, {VQ, []}, lists:seq(1, Count)). + +test_amqqueue(QName, Durable) -> + rabbit_amqqueue:pseudo_queue(QName, self(), Durable). + +assert_prop(List, Prop, Value) -> + case proplists:get_value(Prop, List)of + Value -> ok; + _ -> {exit, Prop, exp, Value, List} + end. + +assert_props(List, PropVals) -> + [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. + +variable_queue_set_ram_duration_target(Duration, VQ) -> + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). + +publish_fetch_and_ack(0, _Len, VQ0) -> + VQ0; +publish_fetch_and_ack(N, Len, VQ0) -> + VQ1 = variable_queue_publish(false, 1, VQ0), + {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + Len = rabbit_variable_queue:len(VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). + +variable_queue_status(VQ) -> + Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status], + [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++ + rabbit_variable_queue:info(backing_queue_status, VQ). + +variable_queue_wait_for_shuffling_end(VQ) -> + case credit_flow:blocked() of + false -> VQ; + true -> + receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:resume(VQ)) + end + end. + +msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> + binary_to_term(list_to_binary(lists:reverse(P))). + +ack_subset(AckSeqs, Interval, Rem) -> + lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs). + +requeue_one_by_one(Acks, VQ) -> + lists:foldl(fun (AckTag, VQN) -> + {_MsgId, VQM} = rabbit_variable_queue:requeue( + [AckTag], VQN), + VQM + end, VQ, Acks). + +%% Create a vq with messages in q1, delta, and q3, and holes (in the +%% form of pending acks) in the latter two. +variable_queue_with_holes(VQ0) -> + Interval = 2048, %% should match vq:IO_BATCH_SIZE + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, + Seq = lists:seq(1, Count), + VQ1 = variable_queue_set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish( + false, 1, Count, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), + Acks = lists:reverse(AcksR), + AckSeqs = lists:zip(Acks, Seq), + [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] = + [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]], + %% we requeue in three phases in order to exercise requeuing logic + %% in various vq states + {_MsgIds, VQ4} = rabbit_variable_queue:requeue( + Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), + VQ5 = requeue_one_by_one(Subset1, VQ4), + %% by now we have some messages (and holes) in delta + VQ6 = requeue_one_by_one(Subset2, VQ5), + VQ7 = variable_queue_set_ram_duration_target(infinity, VQ6), + %% add the q1 tail + VQ8 = variable_queue_publish( + true, Count + 1, Interval, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), + %% assertions + Status = variable_queue_status(VQ8), + + vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)), + Depth = Count + Interval, + Depth = rabbit_variable_queue:depth(VQ8), + Len = Depth - length(Subset3), + Len = rabbit_variable_queue:len(VQ8), + + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. + +vq_with_holes_assertions(VQ, default) -> + [false = + case V of + {delta, _, 0, _} -> true; + 0 -> true; + _ -> false + end || {K, V} <- variable_queue_status(VQ), + lists:member(K, [q1, delta, q3])]; +vq_with_holes_assertions(VQ, lazy) -> + [false = + case V of + {delta, _, 0, _} -> true; + _ -> false + end || {K, V} <- variable_queue_status(VQ), + lists:member(K, [delta])]. + +check_variable_queue_status(VQ0, Props) -> + VQ1 = variable_queue_wait_for_shuffling_end(VQ0), + S = variable_queue_status(VQ1), + assert_props(S, Props), + VQ1. + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. |