diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_backing_queue.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_backing_queue.erl | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl new file mode 100644 index 0000000000..4d709e14d0 --- /dev/null +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -0,0 +1,264 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_backing_queue). + +-export([info_keys/0]). + +-define(INFO_KEYS, [messages_ram, messages_ready_ram, + messages_unacknowledged_ram, messages_persistent, + message_bytes, message_bytes_ready, + message_bytes_unacknowledged, message_bytes_ram, + message_bytes_persistent, head_message_timestamp, + disk_reads, disk_writes, backing_queue_status, + messages_paged_out, message_bytes_paged_out]). + +%% We can't specify a per-queue ack/state with callback signatures +-type ack() :: any(). +-type state() :: any(). + +-type flow() :: 'flow' | 'noflow'. +-type msg_ids() :: [rabbit_types:msg_id()]. +-type publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties(), boolean()}. +-type delivered_publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties()}. +-type fetch_result(Ack) :: + ('empty' | {rabbit_types:basic_message(), boolean(), Ack}). +-type drop_result(Ack) :: + ('empty' | {rabbit_types:msg_id(), Ack}). +-type recovery_terms() :: [term()] | 'non_clean_shutdown'. +-type recovery_info() :: 'new' | recovery_terms(). +-type purged_msg_count() :: non_neg_integer(). +-type async_callback() :: + fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok'). +-type duration() :: ('undefined' | 'infinity' | number()). + +-type msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A). +-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()). + +-type queue_mode() :: atom(). + +%% Called on startup with a vhost and a list of durable queue names on this vhost. +%% The queues aren't being started at this point, but this call allows the +%% backing queue to perform any checking necessary for the consistency +%% of those queues, or initialise any other shared resources. +%% +%% The list of queue recovery terms returned as {ok, Terms} must be given +%% in the same order as the list of queue names supplied. +-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). + +%% Called to tear down any state/resources for vhost. NB: Implementations should +%% not depend on this function being called on shutdown and instead +%% should hook into the rabbit supervision hierarchy. +-callback stop(rabbit_types:vhost()) -> 'ok'. + +%% Initialise the backing queue and its state. +%% +%% Takes +%% 1. the amqqueue record +%% 2. a term indicating whether the queue is an existing queue that +%% should be recovered or not. When 'new' is given, no recovery is +%% taking place, otherwise a list of recovery terms is given, or +%% the atom 'non_clean_shutdown' if no recovery terms are available. +%% 3. an asynchronous callback which accepts a function of type +%% backing-queue-state to backing-queue-state. This callback +%% function can be safely invoked from any process, which makes it +%% useful for passing messages back into the backing queue, +%% especially as the backing queue does not have control of its own +%% mailbox. +-callback init(amqqueue:amqqueue(), recovery_info(), + async_callback()) -> state(). + +%% Called on queue shutdown when queue isn't being deleted. +-callback terminate(any(), state()) -> state(). + +%% Called when the queue is terminating and needs to delete all its +%% content. +-callback delete_and_terminate(any(), state()) -> state(). + +%% Called to clean up after a crashed queue. In this case we don't +%% have a process and thus a state(), we are just removing on-disk data. +-callback delete_crashed(amqqueue:amqqueue()) -> 'ok'. + +%% Remove all 'fetchable' messages from the queue, i.e. all messages +%% except those that have been fetched already and are pending acks. +-callback purge(state()) -> {purged_msg_count(), state()}. + +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + +%% Publish a message. +-callback publish(rabbit_types:basic_message(), + rabbit_types:message_properties(), boolean(), pid(), flow(), + state()) -> state(). + +%% Like publish/6 but for batches of publishes. +-callback batch_publish([publish()], pid(), flow(), state()) -> state(). + +%% Called for messages which have already been passed straight +%% out to a client. The queue will be empty for these calls +%% (i.e. saves the round trip through the backing queue). +-callback publish_delivered(rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), flow(), + state()) + -> {ack(), state()}. + +%% Like publish_delivered/5 but for batches of publishes. +-callback batch_publish_delivered([delivered_publish()], pid(), flow(), + state()) + -> {[ack()], state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ. +-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). + +%% Return ids of messages which have been confirmed since the last +%% invocation of this function (or initialisation). +%% +%% Message ids should only appear in the result of drain_confirmed +%% under the following circumstances: +%% +%% 1. The message appears in a call to publish_delivered/4 and the +%% first argument (ack_required) is false; or +%% 2. The message is fetched from the queue with fetch/2 and the first +%% argument (ack_required) is false; or +%% 3. The message is acked (ack/2 is called for the message); or +%% 4. The message is fully fsync'd to disk in such a way that the +%% recovery of the message is guaranteed in the event of a crash of +%% this rabbit node (excluding hardware failure). +%% +%% In addition to the above conditions, a message id may only appear +%% in the result of drain_confirmed if +%% #message_properties.needs_confirming = true when the msg was +%% published (through whichever means) to the backing queue. +%% +%% It is legal for the same message id to appear in the results of +%% multiple calls to drain_confirmed, which means that the backing +%% queue is not required to keep track of which messages it has +%% already confirmed. The confirm will be issued to the publisher the +%% first time the message id appears in the result of +%% drain_confirmed. All subsequent appearances of that message id will +%% be ignored. +-callback drain_confirmed(state()) -> {msg_ids(), state()}. + +%% Drop messages from the head of the queue while the supplied +%% predicate on message properties returns true. Returns the first +%% message properties for which the predicate returned false, or +%% 'undefined' if the whole backing queue was traversed w/o the +%% predicate ever returning false. +-callback dropwhile(msg_pred(), state()) + -> {rabbit_types:message_properties() | undefined, state()}. + +%% Like dropwhile, except messages are fetched in "require +%% acknowledgement" mode and are passed, together with their ack tag, +%% to the supplied function. The function is also fed an +%% accumulator. The result of fetchwhile is as for dropwhile plus the +%% accumulator. +-callback fetchwhile(msg_pred(), msg_fun(A), A, state()) + -> {rabbit_types:message_properties() | undefined, + A, state()}. + +%% Produce the next message. +-callback fetch(true, state()) -> {fetch_result(ack()), state()}; + (false, state()) -> {fetch_result(undefined), state()}. + +%% Remove the next message. +-callback drop(true, state()) -> {drop_result(ack()), state()}; + (false, state()) -> {drop_result(undefined), state()}. + +%% Acktags supplied are for messages which can now be forgotten +%% about. Must return 1 msg_id per Ack, in the same order as Acks. +-callback ack([ack()], state()) -> {msg_ids(), state()}. + +%% Reinsert messages into the queue which have already been delivered +%% and were pending acknowledgement. +-callback requeue([ack()], state()) -> {msg_ids(), state()}. + +%% Fold over messages by ack tag. The supplied function is called with +%% each message, its ack tag, and an accumulator. +-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. + +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), + rabbit_types:message_properties(), + boolean(), A) -> {('stop' | 'cont'), A}), + A, state()) -> {A, state()}. + +%% How long is my queue? +-callback len(state()) -> non_neg_integer(). + +%% Is my queue empty? +-callback is_empty(state()) -> boolean(). + +%% What's the queue depth, where depth = length + number of pending acks +-callback depth(state()) -> non_neg_integer(). + +%% For the next three functions, the assumption is that you're +%% monitoring something like the ingress and egress rates of the +%% queue. The RAM duration is thus the length of time represented by +%% the messages held in RAM given the current rates. If you want to +%% ignore all of this stuff, then do so, and return 0 in +%% ram_duration/1. + +%% The target is to have no more messages in RAM than indicated by the +%% duration and the current queue rates. +-callback set_ram_duration_target(duration(), state()) -> state(). + +%% Optionally recalculate the duration internally (likely to be just +%% update your internal rates), and report how many seconds the +%% messages in RAM represent given the current rates of the queue. +-callback ram_duration(state()) -> {duration(), state()}. + +%% Should 'timeout' be called as soon as the queue process can manage +%% (either on an empty mailbox, or when a timer fires)? +-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'. + +%% Called (eventually) after needs_timeout returns 'idle' or 'timed'. +%% Note this may be called more than once for each 'idle' or 'timed' +%% returned from needs_timeout +-callback timeout(state()) -> state(). + +%% Called immediately before the queue hibernates. +-callback handle_pre_hibernate(state()) -> state(). + +%% Called when more credit has become available for credit_flow. +-callback resume(state()) -> state(). + +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + +-callback info(atom(), state()) -> any(). + +%% Passed a function to be invoked with the relevant backing queue's +%% state. Useful for when the backing queue or other components need +%% to pass functions into the backing queue. +-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). + +%% Called prior to a publish or publish_delivered call. Allows the BQ +%% to signal that it's already seen this message, (e.g. it was published +%% or discarded previously) specifying whether to drop the message or reject it. +-callback is_duplicate(rabbit_types:basic_message(), state()) + -> {{true, drop} | {true, reject} | boolean(), state()}. + +-callback set_queue_mode(queue_mode(), state()) -> state(). + +-callback zip_msgs_and_acks([delivered_publish()], + [ack()], Acc, state()) + -> Acc. + +%% Called when rabbit_amqqueue_process receives a message via +%% handle_info and it should be processed by the backing +%% queue +-callback handle_info(term(), state()) -> state(). + +-spec info_keys() -> rabbit_types:info_keys(). + +info_keys() -> ?INFO_KEYS. |