summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/channel_operation_timeout_test_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/test/channel_operation_timeout_test_queue.erl')
-rw-r--r--deps/rabbit/test/channel_operation_timeout_test_queue.erl323
1 files changed, 323 insertions, 0 deletions
diff --git a/deps/rabbit/test/channel_operation_timeout_test_queue.erl b/deps/rabbit/test/channel_operation_timeout_test_queue.erl
new file mode 100644
index 0000000000..3190dad7a8
--- /dev/null
+++ b/deps/rabbit/test/channel_operation_timeout_test_queue.erl
@@ -0,0 +1,323 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(channel_operation_timeout_test_queue).
+
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
+ publish/6, publish_delivered/5,
+ batch_publish/4, batch_publish_delivered/4,
+ discard/4, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
+ ackfold/4, fold/3, len/1, is_empty/1, depth/1,
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
+ handle_pre_hibernate/1, resume/1, msg_rates/1,
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]).
+
+%%----------------------------------------------------------------------------
+%% This test backing queue follows the variable queue implementation, with
+%% the exception that it will introduce infinite delays on some operations if
+%% the test message has been published, and is awaiting acknowledgement in the
+%% queue index. Test message is "timeout_test_msg!".
+%%
+%%----------------------------------------------------------------------------
+
+-behaviour(rabbit_backing_queue).
+
+-record(vqstate,
+ { q1,
+ q2,
+ delta,
+ q3,
+ q4,
+ next_seq_id,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
+ index_state,
+ msg_store_clients,
+ durable,
+ transient_threshold,
+ qi_embed_msgs_below,
+
+ len, %% w/o unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
+ delta_transient_bytes, %%
+
+ target_ram_count,
+ ram_msg_count, %% w/o unacked
+ ram_msg_count_prev,
+ ram_ack_count_prev,
+ ram_bytes, %% w unacked
+ out_counter,
+ in_counter,
+ rates,
+ msgs_on_disk,
+ msg_indices_on_disk,
+ unconfirmed,
+ confirmed,
+ ack_out_counter,
+ ack_in_counter,
+ %% Unlike the other counters these two do not feed into
+ %% #rates{} and get reset
+ disk_read_count,
+ disk_write_count,
+
+ io_batch_size,
+
+ %% default queue or lazy queue
+ mode,
+ %% number of reduce_memory_usage executions, once it
+ %% reaches a threshold the queue will manually trigger a runtime GC
+ %% see: maybe_execute_gc/1
+ memory_reduction_run_count,
+ %% Queue data is grouped by VHost. We need to store it
+ %% to work with queue index.
+ virtual_host,
+ waiting_bump = false
+ }).
+
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
+
+-record(msg_status,
+ { seq_id,
+ msg_id,
+ msg,
+ is_persistent,
+ is_delivered,
+ msg_in_store,
+ index_on_disk,
+ persist_to,
+ msg_props
+ }).
+
+-record(delta,
+ { start_seq_id, %% start_seq_id is inclusive
+ count,
+ transient,
+ end_seq_id %% end_seq_id is exclusive
+ }).
+
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-define(QUEUE, lqueue).
+-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
+
+%%----------------------------------------------------------------------------
+
+-type seq_id() :: non_neg_integer().
+
+-type rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: rabbit_types:timestamp()}.
+
+-type delta() :: #delta { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer(),
+ end_seq_id :: non_neg_integer() }.
+
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type ack() :: seq_id().
+-type state() :: #vqstate {
+ q1 :: ?QUEUE:?QUEUE(),
+ q2 :: ?QUEUE:?QUEUE(),
+ delta :: delta(),
+ q3 :: ?QUEUE:?QUEUE(),
+ q4 :: ?QUEUE:?QUEUE(),
+ next_seq_id :: seq_id(),
+ ram_pending_ack :: gb_trees:tree(),
+ disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
+ index_state :: any(),
+ msg_store_clients :: 'undefined' | {{any(), binary()},
+ {any(), binary()}},
+ durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+ qi_embed_msgs_below :: non_neg_integer(),
+
+ len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
+ persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
+
+ target_ram_count :: non_neg_integer() | 'infinity',
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ rates :: rates(),
+ msgs_on_disk :: gb_sets:set(),
+ msg_indices_on_disk :: gb_sets:set(),
+ unconfirmed :: gb_sets:set(),
+ confirmed :: gb_sets:set(),
+ ack_out_counter :: non_neg_integer(),
+ ack_in_counter :: non_neg_integer(),
+ disk_read_count :: non_neg_integer(),
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer(),
+ mode :: 'default' | 'lazy',
+ virtual_host :: rabbit_types:vhost() }.
+%% Duplicated from rabbit_backing_queue
+-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start(VHost, DurableQueues) ->
+ rabbit_variable_queue:start(VHost, DurableQueues).
+
+stop(VHost) ->
+ rabbit_variable_queue:stop(VHost).
+
+init(Queue, Recover, Callback) ->
+ rabbit_variable_queue:init(Queue, Recover, Callback).
+
+terminate(Reason, State) ->
+ rabbit_variable_queue:terminate(Reason, State).
+
+delete_and_terminate(Reason, State) ->
+ rabbit_variable_queue:delete_and_terminate(Reason, State).
+
+delete_crashed(Q) ->
+ rabbit_variable_queue:delete_crashed(Q).
+
+purge(State = #vqstate { qi_pending_ack= QPA }) ->
+ maybe_delay(QPA),
+ rabbit_variable_queue:purge(State).
+
+purge_acks(State) ->
+ rabbit_variable_queue:purge_acks(State).
+
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
+ rabbit_variable_queue:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State).
+
+batch_publish(Publishes, ChPid, Flow, State) ->
+ rabbit_variable_queue:batch_publish(Publishes, ChPid, Flow, State).
+
+publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
+ rabbit_variable_queue:publish_delivered(Msg, MsgProps, ChPid, Flow, State).
+
+batch_publish_delivered(Publishes, ChPid, Flow, State) ->
+ rabbit_variable_queue:batch_publish_delivered(Publishes, ChPid, Flow, State).
+
+discard(_MsgId, _ChPid, _Flow, State) -> State.
+
+drain_confirmed(State) ->
+ rabbit_variable_queue:drain_confirmed(State).
+
+dropwhile(Pred, State) ->
+ rabbit_variable_queue:dropwhile(Pred, State).
+
+fetchwhile(Pred, Fun, Acc, State) ->
+ rabbit_variable_queue:fetchwhile(Pred, Fun, Acc, State).
+
+fetch(AckRequired, State) ->
+ rabbit_variable_queue:fetch(AckRequired, State).
+
+drop(AckRequired, State) ->
+ rabbit_variable_queue:drop(AckRequired, State).
+
+ack(List, State) ->
+ rabbit_variable_queue:ack(List, State).
+
+requeue(AckTags, #vqstate { qi_pending_ack = QPA } = State) ->
+ maybe_delay(QPA),
+ rabbit_variable_queue:requeue(AckTags, State).
+
+ackfold(MsgFun, Acc, State, AckTags) ->
+ rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags).
+
+fold(Fun, Acc, State) ->
+ rabbit_variable_queue:fold(Fun, Acc, State).
+
+len(#vqstate { qi_pending_ack = QPA } = State) ->
+ maybe_delay(QPA),
+ rabbit_variable_queue:len(State).
+
+is_empty(State) -> 0 == len(State).
+
+depth(State) ->
+ rabbit_variable_queue:depth(State).
+
+set_ram_duration_target(DurationTarget, State) ->
+ rabbit_variable_queue:set_ram_duration_target(DurationTarget, State).
+
+ram_duration(State) ->
+ rabbit_variable_queue:ram_duration(State).
+
+needs_timeout(State) ->
+ rabbit_variable_queue:needs_timeout(State).
+
+timeout(State) ->
+ rabbit_variable_queue:timeout(State).
+
+handle_pre_hibernate(State) ->
+ rabbit_variable_queue:handle_pre_hibernate(State).
+
+handle_info(Msg, State) ->
+ rabbit_variable_queue:handle_info(Msg, State).
+
+resume(State) -> rabbit_variable_queue:resume(State).
+
+msg_rates(State) ->
+ rabbit_variable_queue:msg_rates(State).
+
+info(Info, State) ->
+ rabbit_variable_queue:info(Info, State).
+
+invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State).
+
+is_duplicate(Msg, State) -> rabbit_variable_queue:is_duplicate(Msg, State).
+
+set_queue_mode(Mode, State) ->
+ rabbit_variable_queue:set_queue_mode(Mode, State).
+
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, State) ->
+ rabbit_variable_queue:zip_msgs_and_acks(Msgs, AckTags, Accumulator, State).
+
+%% Delay
+maybe_delay(QPA) ->
+ case is_timeout_test(gb_trees:values(QPA)) of
+ true -> receive
+ %% The queue received an EXIT message, it's probably the
+ %% node being stopped with "rabbitmqctl stop". Thus, abort
+ %% the wait and requeue the EXIT message.
+ {'EXIT', _, shutdown} = ExitMsg -> self() ! ExitMsg,
+ void
+ after infinity -> void
+ end;
+ _ -> void
+ end.
+
+is_timeout_test([]) -> false;
+is_timeout_test([#msg_status{
+ msg = #basic_message{
+ content = #content{
+ payload_fragments_rev = PFR}}}|Rem]) ->
+ case lists:member(?TIMEOUT_TEST_MSG, PFR) of
+ T = true -> T;
+ _ -> is_timeout_test(Rem)
+ end;
+is_timeout_test([_|Rem]) -> is_timeout_test(Rem).