summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl1433
1 files changed, 1433 insertions, 0 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
new file mode 100644
index 00000000..0f52eee8
--- /dev/null
+++ b/src/rabbit_variable_queue.erl
@@ -0,0 +1,1433 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_variable_queue).
+
+-export([init/3, terminate/1, delete_and_terminate/1,
+ purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
+ tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
+ requeue/2, len/1, is_empty/1,
+ set_ram_duration_target/2, ram_duration/1,
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1]).
+
+-export([start/1, stop/0]).
+
+%% exported for testing only
+-export([start_msg_store/2, stop_msg_store/0]).
+
+%%----------------------------------------------------------------------------
+%% Definitions:
+
+%% alpha: this is a message where both the message itself, and its
+%% position within the queue are held in RAM
+%%
+%% beta: this is a message where the message itself is only held on
+%% disk, but its position within the queue is held in RAM.
+%%
+%% gamma: this is a message where the message itself is only held on
+%% disk, but its position is both in RAM and on disk.
+%%
+%% delta: this is a collection of messages, represented by a single
+%% term, where the messages and their position are only held on
+%% disk.
+%%
+%% Note that for persistent messages, the message and its position
+%% within the queue are always held on disk, *in addition* to being in
+%% one of the above classifications.
+%%
+%% Also note that within this code, the term gamma never
+%% appears. Instead, gammas are defined by betas who have had their
+%% queue position recorded on disk.
+%%
+%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
+%% many of these steps are frequently skipped. q1 and q4 only hold
+%% alphas, q2 and q3 hold both betas and gammas (as queues of queues,
+%% using the bpqueue module where the block prefix determines whether
+%% they're betas or gammas). When a message arrives, its
+%% classification is determined. It is then added to the rightmost
+%% appropriate queue.
+%%
+%% If a new message is determined to be a beta or gamma, q1 is
+%% empty. If a new message is determined to be a delta, q1 and q2 are
+%% empty (and actually q4 too).
+%%
+%% When removing messages from a queue, if q4 is empty then q3 is read
+%% directly. If q3 becomes empty then the next segment's worth of
+%% messages from delta are read into q3, reducing the size of
+%% delta. If the queue is non empty, either q4 or q3 contain
+%% entries. It is never permitted for delta to hold all the messages
+%% in the queue.
+%%
+%% The duration indicated to us by the memory_monitor is used to
+%% calculate, given our current ingress and egress rates, how many
+%% messages we should hold in RAM. When we need to push alphas to
+%% betas or betas to gammas, we favour writing out messages that are
+%% further from the head of the queue. This minimises writes to disk,
+%% as the messages closer to the tail of the queue stay in the queue
+%% for longer, thus do not need to be replaced as quickly by sending
+%% other messages to disk.
+%%
+%% Whilst messages are pushed to disk and forgotten from RAM as soon
+%% as requested by a new setting of the queue RAM duration, the
+%% inverse is not true: we only load messages back into RAM as
+%% demanded as the queue is read from. Thus only publishes to the
+%% queue will take up available spare capacity.
+%%
+%% When we report our duration to the memory monitor, we calculate
+%% average ingress and egress rates over the last two samples, and
+%% then calculate our duration based on the sum of the ingress and
+%% egress rates. More than two samples could be used, but it's a
+%% balance between responding quickly enough to changes in
+%% producers/consumers versus ignoring temporary blips. The problem
+%% with temporary blips is that with just a few queues, they can have
+%% substantial impact on the calculation of the average duration and
+%% hence cause unnecessary I/O. Another alternative is to increase the
+%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
+%% seconds. However, that then runs the risk of being too slow to
+%% inform the memory monitor of changes. Thus a 5 second interval,
+%% plus a rolling average over the last two samples seems to work
+%% well in practice.
+%%
+%% The sum of the ingress and egress rates is used because the egress
+%% rate alone is not sufficient. Adding in the ingress rate means that
+%% queues which are being flooded by messages are given more memory,
+%% resulting in them being able to process the messages faster (by
+%% doing less I/O, or at least deferring it) and thus helping keep
+%% their mailboxes empty and thus the queue as a whole is more
+%% responsive. If such a queue also has fast but previously idle
+%% consumers, the consumer can then start to be driven as fast as it
+%% can go, whereas if only egress rate was being used, the incoming
+%% messages may have to be written to disk and then read back in,
+%% resulting in the hard disk being a bottleneck in driving the
+%% consumers. Generally, we want to give Rabbit every chance of
+%% getting rid of messages as fast as possible and remaining
+%% responsive, and using only the egress rate impacts that goal.
+%%
+%% If a queue is full of transient messages, then the transition from
+%% betas to deltas will be potentially very expensive as millions of
+%% entries must be written to disk by the queue_index module. This can
+%% badly stall the queue. In order to avoid this, the proportion of
+%% gammas / (betas+gammas) must not be lower than (betas+gammas) /
+%% (alphas+betas+gammas). As the queue grows or available memory
+%% shrinks, the latter ratio increases, requiring the conversion of
+%% more gammas to betas in order to maintain the invariant. At the
+%% point at which betas and gammas must be converted to deltas, there
+%% should be very few betas remaining, thus the transition is fast (no
+%% work needs to be done for the gamma -> delta transition).
+%%
+%% The conversion of betas to gammas is done in batches of exactly
+%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
+%% frequent operations on the queues of q2 and q3 will not be
+%% effectively amortised (switching the direction of queue access
+%% defeats amortisation), nor should it be too big, otherwise
+%% converting a batch stalls the queue for too long. Therefore, it
+%% must be just right. ram_index_count is used here and is the number
+%% of betas.
+%%
+%% The conversion from alphas to betas is also chunked, but only to
+%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
+%% any one time. This further smooths the effects of changes to the
+%% target_ram_msg_count and ensures the queue remains responsive
+%% even when there is a large amount of IO work to do. The
+%% idle_timeout callback is utilised to ensure that conversions are
+%% done as promptly as possible whilst ensuring the queue remains
+%% responsive.
+%%
+%% In the queue we keep track of both messages that are pending
+%% delivery and messages that are pending acks. This ensures that
+%% purging (deleting the former) and deletion (deleting the former and
+%% the latter) are both cheap and do require any scanning through qi
+%% segments.
+%%
+%% Notes on Clean Shutdown
+%% (This documents behaviour in variable_queue, queue_index and
+%% msg_store.)
+%%
+%% In order to try to achieve as fast a start-up as possible, if a
+%% clean shutdown occurs, we try to save out state to disk to reduce
+%% work on startup. In the msg_store this takes the form of the
+%% index_module's state, plus the file_summary ets table, and client
+%% refs. In the VQ, this takes the form of the count of persistent
+%% messages in the queue and references into the msg_stores. The
+%% queue_index adds to these terms the details of its segments and
+%% stores the terms in the queue directory.
+%%
+%% Two message stores are used. One is created for persistent messages
+%% to durable queues that must survive restarts, and the other is used
+%% for all other messages that just happen to need to be written to
+%% disk. On start up we can therefore nuke the transient message
+%% store, and be sure that the messages in the persistent store are
+%% all that we need.
+%%
+%% The references to the msg_stores are there so that the msg_store
+%% knows to only trust its saved state if all of the queues it was
+%% previously talking to come up cleanly. Likewise, the queues
+%% themselves (esp queue_index) skips work in init if all the queues
+%% and msg_store were shutdown cleanly. This gives both good speed
+%% improvements and also robustness so that if anything possibly went
+%% wrong in shutdown (or there was subsequent manual tampering), all
+%% messages and queues that can be recovered are recovered, safely.
+%%
+%% To delete transient messages lazily, the variable_queue, on
+%% startup, stores the next_seq_id reported by the queue_index as the
+%% transient_threshold. From that point on, whenever it's reading a
+%% message off disk via the queue_index, if the seq_id is below this
+%% threshold and the message is transient then it drops the message
+%% (the message itself won't exist on disk because it would have been
+%% stored in the transient msg_store which would have had its saved
+%% state nuked on startup). This avoids the expensive operation of
+%% scanning the entire queue on startup in order to delete transient
+%% messages that were only pushed to disk to save memory.
+%%
+%%----------------------------------------------------------------------------
+
+-behaviour(rabbit_backing_queue).
+
+-record(vqstate,
+ { q1,
+ q2,
+ delta,
+ q3,
+ q4,
+ next_seq_id,
+ pending_ack,
+ index_state,
+ msg_store_clients,
+ on_sync,
+ durable,
+ transient_threshold,
+
+ len,
+ persistent_count,
+
+ duration_target,
+ target_ram_msg_count,
+ ram_msg_count,
+ ram_msg_count_prev,
+ ram_index_count,
+ out_counter,
+ in_counter,
+ rates
+ }).
+
+-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
+
+-record(msg_status,
+ { seq_id,
+ guid,
+ msg,
+ is_persistent,
+ is_delivered,
+ msg_on_disk,
+ index_on_disk
+ }).
+
+-record(delta,
+ { start_seq_id, %% start_seq_id is inclusive
+ count,
+ end_seq_id %% end_seq_id is exclusive
+ }).
+
+-record(tx, { pending_messages, pending_acks }).
+
+-record(sync, { acks_persistent, acks_all, pubs, funs }).
+
+%% When we discover, on publish, that we should write some indices to
+%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
+%% betas that we must be due to write indices for before we do any
+%% work at all. This is both a minimum and a maximum - we don't write
+%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
+%% write more - we can always come back on the next publish to do
+%% more.
+-define(IO_BATCH_SIZE, 64).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
+-type(seq_id() :: non_neg_integer()).
+-type(ack() :: seq_id() | 'blank_ack').
+
+-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
+ ingress :: {timestamp(), non_neg_integer()},
+ avg_egress :: float(),
+ avg_ingress :: float(),
+ timestamp :: timestamp() }).
+
+-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer (),
+ end_seq_id :: non_neg_integer() }).
+
+-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
+ acks_all :: [[seq_id()]],
+ pubs :: [[rabbit_guid:guid()]],
+ funs :: [fun (() -> any())] }).
+
+-type(state() :: #vqstate {
+ q1 :: queue(),
+ q2 :: bpqueue:bpqueue(),
+ delta :: delta(),
+ q3 :: bpqueue:bpqueue(),
+ q4 :: queue(),
+ next_seq_id :: seq_id(),
+ pending_ack :: dict:dictionary(),
+ index_state :: any(),
+ msg_store_clients :: 'undefined' | {{any(), binary()},
+ {any(), binary()}},
+ on_sync :: sync(),
+ durable :: boolean(),
+
+ len :: non_neg_integer(),
+ persistent_count :: non_neg_integer(),
+
+ transient_threshold :: non_neg_integer(),
+ duration_target :: number() | 'infinity',
+ target_ram_msg_count :: non_neg_integer() | 'infinity',
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_index_count :: non_neg_integer(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ rates :: rates() }).
+
+-include("rabbit_backing_queue_spec.hrl").
+
+-endif.
+
+-define(BLANK_DELTA, #delta { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }).
+-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
+ count = 0,
+ end_seq_id = Z }).
+
+-define(BLANK_SYNC, #sync { acks_persistent = [],
+ acks_all = [],
+ pubs = [],
+ funs = [] }).
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start(DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
+ start_msg_store(
+ [Ref || Terms <- AllTerms,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ StartFunState).
+
+stop() -> stop_msg_store().
+
+start_msg_store(Refs, StartFunState) ->
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+ undefined, {fun (ok) -> finished end, ok}]),
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ Refs, StartFunState]).
+
+stop_msg_store() ->
+ ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
+ ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
+
+init(QueueName, IsDurable, Recover) ->
+ {DeltaCount, Terms, IndexState} =
+ rabbit_queue_index:init(
+ QueueName, Recover,
+ rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ fun (Guid) ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end),
+ {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+
+ {PRef, TRef, Terms1} =
+ case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
+ [] -> {proplists:get_value(persistent_ref, Terms),
+ proplists:get_value(transient_ref, Terms),
+ Terms};
+ _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
+ end,
+ DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount),
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> #delta { start_seq_id = LowSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId }
+ end,
+ Now = now(),
+ PersistentClient =
+ case IsDurable of
+ true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
+ false -> undefined
+ end,
+ TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
+ State = #vqstate {
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ index_state = IndexState1,
+ msg_store_clients = {{PersistentClient, PRef},
+ {TransientClient, TRef}},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ duration_target = infinity,
+ target_ram_msg_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = #rates { egress = {Now, 0},
+ ingress = {Now, DeltaCount1},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Now } },
+ a(maybe_deltas_to_betas(State)).
+
+terminate(State) ->
+ State1 = #vqstate { persistent_count = PCount,
+ index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef},
+ {MSCStateT, TRef}} } =
+ remove_pending_ack(true, tx_commit_index(State)),
+ case MSCStateP of
+ undefined -> ok;
+ _ -> rabbit_msg_store:client_terminate(MSCStateP)
+ end,
+ rabbit_msg_store:client_terminate(MSCStateT),
+ Terms = [{persistent_ref, PRef},
+ {transient_ref, TRef},
+ {persistent_count, PCount}],
+ a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
+ Terms, IndexState),
+ msg_store_clients = undefined }).
+
+%% the only difference between purge and delete is that delete also
+%% needs to delete everything that's been delivered and not ack'd.
+delete_and_terminate(State) ->
+ %% TODO: there is no need to interact with qi at all - which we do
+ %% as part of 'purge' and 'remove_pending_ack', other than
+ %% deleting it.
+ {_PurgeCount, State1} = purge(State),
+ State2 = #vqstate { index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef},
+ {MSCStateT, TRef}} } =
+ remove_pending_ack(false, State1),
+ IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ case MSCStateP of
+ undefined -> ok;
+ _ -> rabbit_msg_store:client_delete_and_terminate(
+ MSCStateP, ?PERSISTENT_MSG_STORE, PRef),
+ rabbit_msg_store:client_terminate(MSCStateP)
+ end,
+ rabbit_msg_store:client_delete_and_terminate(
+ MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
+ a(State2 #vqstate { index_state = IndexState1,
+ msg_store_clients = undefined }).
+
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
+ %% TODO: when there are no pending acks, which is a common case,
+ %% we could simply wipe the qi instead of issuing delivers and
+ %% acks for all the messages.
+ IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4,
+ IndexState),
+ State1 = #vqstate { q1 = Q1, index_state = IndexState2 } =
+ purge_betas_and_deltas(State #vqstate { q4 = queue:new(),
+ index_state = IndexState1 }),
+ IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1,
+ IndexState2),
+ {Len, a(State1 #vqstate { q1 = queue:new(),
+ index_state = IndexState3,
+ len = 0,
+ ram_msg_count = 0,
+ ram_index_count = 0,
+ persistent_count = 0 })}.
+
+publish(Msg, State) ->
+ {_SeqId, State1} = publish(Msg, false, false, State),
+ a(reduce_memory_use(State1)).
+
+publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+ {blank_ack, a(State)};
+publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
+ State = #vqstate { len = 0,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ pending_ack = PA,
+ durable = IsDurable }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ #msg_status { is_delivered = true },
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ PA1 = record_pending_ack(m(MsgStatus1), PA),
+ PCount1 = PCount + one_if(IsPersistent1),
+ {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
+
+fetch(AckRequired, State = #vqstate { q4 = Q4,
+ ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ len = Len,
+ persistent_count = PCount,
+ pending_ack = PA }) ->
+ case queue:out(Q4) of
+ {empty, _Q4} ->
+ case fetch_from_q3_to_q4(State) of
+ {empty, State1} = Result -> a(State1), Result;
+ {loaded, State1} -> fetch(AckRequired, State1)
+ end;
+ {{value, MsgStatus = #msg_status {
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ Q4a} ->
+
+ %% 1. Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% 2. Remove from msg_store and queue index, if necessary
+ MsgStore = find_msg_store(IsPersistent),
+ Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
+ IndexState2 =
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
+ {false, true, false, _} -> Rem(), IndexState1;
+ {false, true, true, _} -> Rem(), Ack();
+ { true, true, true, false} -> Ack();
+ _ -> IndexState1
+ end,
+
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, PA1} = case AckRequired of
+ true -> PA2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, PA),
+ {SeqId, PA2};
+ false -> {blank_ack, PA}
+ end,
+
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ Len1 = Len - 1,
+ {{Msg, IsDelivered, AckTag, Len1},
+ a(State #vqstate { q4 = Q4a,
+ ram_msg_count = RamMsgCount - 1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}
+ end.
+
+ack(AckTags, State) ->
+ a(ack(fun rabbit_msg_store:remove/2,
+ fun (_AckEntry, State1) -> State1 end,
+ AckTags, State)).
+
+tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+ State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
+ Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ a(case IsPersistent andalso IsDurable of
+ true -> MsgStatus = msg_status(true, undefined, Msg),
+ {#msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ State #vqstate { msg_store_clients = MSCState1 };
+ false -> State
+ end).
+
+tx_ack(Txn, AckTags, State) ->
+ Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
+ State.
+
+tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
+ #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
+ erase_tx(Txn),
+ ok = case IsDurable of
+ true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE,
+ persistent_guids(Pubs));
+ false -> ok
+ end,
+ {lists:append(AckTags), a(State)}.
+
+tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
+ #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
+ erase_tx(Txn),
+ PubsOrdered = lists:reverse(Pubs),
+ AckTags1 = lists:append(AckTags),
+ PersistentGuids = persistent_guids(PubsOrdered),
+ HasPersistentPubs = PersistentGuids =/= [],
+ {AckTags1,
+ a(case IsDurable andalso HasPersistentPubs of
+ true -> ok = rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE, PersistentGuids,
+ msg_store_callback(PersistentGuids,
+ PubsOrdered, AckTags1, Fun)),
+ State;
+ false -> tx_commit_post_msg_store(
+ HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
+ end)}.
+
+requeue(AckTags, State) ->
+ a(reduce_memory_use(
+ ack(fun rabbit_msg_store:release/2,
+ fun (#msg_status { msg = Msg }, State1) ->
+ {_SeqId, State2} = publish(Msg, true, false, State1),
+ State2;
+ ({IsPersistent, Guid}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, true, true, State2),
+ State3
+ end,
+ AckTags, State))).
+
+len(#vqstate { len = Len }) -> Len.
+
+is_empty(State) -> 0 == len(State).
+
+set_ram_duration_target(DurationTarget,
+ State = #vqstate {
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ target_ram_msg_count = TargetRamMsgCount }) ->
+ Rate = AvgEgressRate + AvgIngressRate,
+ TargetRamMsgCount1 =
+ case DurationTarget of
+ infinity -> infinity;
+ _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
+ end,
+ State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
+ duration_target = DurationTarget },
+ a(case TargetRamMsgCount1 == infinity orelse
+ (TargetRamMsgCount =/= infinity andalso
+ TargetRamMsgCount1 >= TargetRamMsgCount) of
+ true -> State1;
+ false -> reduce_memory_use(State1)
+ end).
+
+ram_duration(State = #vqstate {
+ rates = #rates { egress = Egress,
+ ingress = Ingress,
+ timestamp = Timestamp } = Rates,
+ in_counter = InCount,
+ out_counter = OutCount,
+ ram_msg_count = RamMsgCount,
+ duration_target = DurationTarget,
+ ram_msg_count_prev = RamMsgCountPrev }) ->
+ Now = now(),
+ {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
+ {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
+
+ Duration = %% msgs / (msgs/sec) == sec
+ case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
+ true -> infinity;
+ false -> (RamMsgCountPrev + RamMsgCount) /
+ (2 * (AvgEgressRate + AvgIngressRate))
+ end,
+
+ {Duration, set_ram_duration_target(
+ DurationTarget,
+ State #vqstate {
+ rates = Rates #rates {
+ egress = Egress1,
+ ingress = Ingress1,
+ avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate,
+ timestamp = Now },
+ in_counter = 0,
+ out_counter = 0,
+ ram_msg_count_prev = RamMsgCount })}.
+
+needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
+ {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ State),
+ Res;
+needs_idle_timeout(_State) ->
+ true.
+
+idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+
+handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+
+status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len,
+ pending_ack = PA,
+ on_sync = #sync { funs = From },
+ target_ram_msg_count = TargetRamMsgCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ next_seq_id = NextSeqId,
+ persistent_count = PersistentCount,
+ rates = #rates {
+ avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate } }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {pending_acks , dict:size(PA)},
+ {outstanding_txns , length(From)},
+ {target_ram_msg_count , TargetRamMsgCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_index_count , RamIndexCount},
+ {next_seq_id , NextSeqId},
+ {persistent_count , PersistentCount},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ingress_rate , AvgIngressRate} ].
+
+%%----------------------------------------------------------------------------
+%% Minor helpers
+%%----------------------------------------------------------------------------
+
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len,
+ persistent_count = PersistentCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount }) ->
+ E1 = queue:is_empty(Q1),
+ E2 = bpqueue:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = bpqueue:is_empty(Q3),
+ E4 = queue:is_empty(Q4),
+ LZ = Len == 0,
+
+ true = E1 or not E3,
+ true = E2 or not ED,
+ true = ED or not E3,
+ true = LZ == (E3 and E4),
+
+ true = Len >= 0,
+ true = PersistentCount >= 0,
+ true = RamMsgCount >= 0,
+ true = RamIndexCount >= 0,
+
+ State.
+
+m(MsgStatus = #msg_status { msg = Msg,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk }) ->
+ true = (not IsPersistent) or IndexOnDisk,
+ true = (not IndexOnDisk) or MsgOnDisk,
+ true = (Msg =/= undefined) or MsgOnDisk,
+
+ MsgStatus.
+
+one_if(true ) -> 1;
+one_if(false) -> 0.
+
+cons_if(true, E, L) -> [E | L];
+cons_if(false, _E, L) -> L.
+
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
+ #msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
+ is_persistent = IsPersistent, is_delivered = false,
+ msg_on_disk = false, index_on_disk = false }.
+
+find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
+find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
+
+with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
+ {Result, {{MSCStateP1, PRef}, MSCStateT}};
+with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
+ {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
+ {Result, {MSCStateP, {MSCStateT1, TRef}}}.
+
+read_from_msg_store(MSCState, IsPersistent, Guid) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MsgStore, MSCState1) ->
+ rabbit_msg_store:read(MsgStore, Guid, MSCState1)
+ end).
+
+maybe_write_delivered(false, _SeqId, IndexState) ->
+ IndexState;
+maybe_write_delivered(true, SeqId, IndexState) ->
+ rabbit_queue_index:deliver([SeqId], IndexState).
+
+lookup_tx(Txn) -> case get({txn, Txn}) of
+ undefined -> #tx { pending_messages = [],
+ pending_acks = [] };
+ V -> V
+ end.
+
+store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
+
+erase_tx(Txn) -> erase({txn, Txn}).
+
+persistent_guids(Pubs) ->
+ [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs].
+
+betas_from_index_entries(List, TransientThreshold, IndexState) ->
+ {Filtered, Delivers, Acks} =
+ lists:foldr(
+ fun ({Guid, SeqId, IsPersistent, IsDelivered},
+ {Filtered1, Delivers1, Acks1}) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true -> {Filtered1,
+ cons_if(not IsDelivered, SeqId, Delivers1),
+ [SeqId | Acks1]};
+ false -> {[m(#msg_status { msg = undefined,
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ }) | Filtered1],
+ Delivers1,
+ Acks1}
+ end
+ end, {[], [], []}, List),
+ {bpqueue:from_list([{true, Filtered}]),
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState))}.
+
+%% the first arg is the older delta
+combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
+ ?BLANK_DELTA;
+combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start,
+ count = Count,
+ end_seq_id = End } = B) ->
+ true = Start + Count =< End, %% ASSERTION
+ B;
+combine_deltas(#delta { start_seq_id = Start,
+ count = Count,
+ end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
+ true = Start + Count =< End, %% ASSERTION
+ A;
+combine_deltas(#delta { start_seq_id = StartLow,
+ count = CountLow,
+ end_seq_id = EndLow },
+ #delta { start_seq_id = StartHigh,
+ count = CountHigh,
+ end_seq_id = EndHigh }) ->
+ Count = CountLow + CountHigh,
+ true = (StartLow =< StartHigh) %% ASSERTIONS
+ andalso ((StartLow + CountLow) =< EndLow)
+ andalso ((StartHigh + CountHigh) =< EndHigh)
+ andalso ((StartLow + Count) =< EndHigh),
+ #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
+
+beta_fold(Fun, Init, Q) ->
+ bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
+
+update_rate(Now, Then, Count, {OThen, OCount}) ->
+ %% avg over the current period and the previous
+ {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
+
+%%----------------------------------------------------------------------------
+%% Internal major helpers for Public API
+%%----------------------------------------------------------------------------
+
+msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) ->
+ Self = self(),
+ F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self, fun (StateN) -> tx_commit_post_msg_store(
+ true, Pubs, AckTags, Fun, StateN)
+ end)
+ end,
+ fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
+ fun () -> rabbit_msg_store:remove(
+ ?PERSISTENT_MSG_STORE,
+ PersistentGuids)
+ end, F)
+ end)
+ end.
+
+tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
+ State = #vqstate {
+ on_sync = OnSync = #sync {
+ acks_persistent = SPAcks,
+ acks_all = SAcks,
+ pubs = SPubs,
+ funs = SFuns },
+ pending_ack = PA,
+ durable = IsDurable }) ->
+ PersistentAcks =
+ case IsDurable of
+ true -> [AckTag || AckTag <- AckTags,
+ case dict:fetch(AckTag, PA) of
+ #msg_status {} -> false;
+ {IsPersistent, _Guid} -> IsPersistent
+ end];
+ false -> []
+ end,
+ case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
+ true -> State #vqstate { on_sync = #sync {
+ acks_persistent = [PersistentAcks | SPAcks],
+ acks_all = [AckTags | SAcks],
+ pubs = [Pubs | SPubs],
+ funs = [Fun | SFuns] }};
+ false -> State1 = tx_commit_index(
+ State #vqstate { on_sync = #sync {
+ acks_persistent = [],
+ acks_all = [AckTags],
+ pubs = [Pubs],
+ funs = [Fun] } }),
+ State1 #vqstate { on_sync = OnSync }
+ end.
+
+tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
+ State;
+tx_commit_index(State = #vqstate { on_sync = #sync {
+ acks_persistent = SPAcks,
+ acks_all = SAcks,
+ pubs = SPubs,
+ funs = SFuns },
+ durable = IsDurable }) ->
+ PAcks = lists:append(SPAcks),
+ Acks = lists:append(SAcks),
+ Pubs = lists:append(lists:reverse(SPubs)),
+ {SeqIds, State1 = #vqstate { index_state = IndexState }} =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent },
+ {SeqIdsAcc, State2}) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
+ end, {PAcks, ack(Acks, State)}, Pubs),
+ IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
+ [ Fun() || Fun <- lists:reverse(SFuns) ],
+ reduce_memory_use(
+ State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
+
+purge_betas_and_deltas(State = #vqstate { q3 = Q3,
+ index_state = IndexState }) ->
+ case bpqueue:is_empty(Q3) of
+ true -> State;
+ false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3,
+ IndexState),
+ purge_betas_and_deltas(
+ maybe_deltas_to_betas(
+ State #vqstate { q3 = bpqueue:new(),
+ index_state = IndexState1 }))
+ end.
+
+remove_queue_entries(Fold, Q, IndexState) ->
+ {GuidsByStore, Delivers, Acks} =
+ Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+ ok = orddict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState)).
+
+remove_queue_entries1(
+ #msg_status { guid = Guid, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
+ {GuidsByStore, Delivers, Acks}) ->
+ {case MsgOnDisk of
+ true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid,
+ GuidsByStore);
+ false -> GuidsByStore
+ end,
+ cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ cons_if(IndexOnDisk, SeqId, Acks)}.
+
+%%----------------------------------------------------------------------------
+%% Internal gubbins for publishing
+%%----------------------------------------------------------------------------
+
+publish(Msg = #basic_message { is_persistent = IsPersistent },
+ IsDelivered, MsgOnDisk,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
+ #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ State2 = case bpqueue:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
+ end,
+ PCount1 = PCount + one_if(IsPersistent1),
+ {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1}}.
+
+maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
+ msg_on_disk = true }, MSCState) ->
+ {MsgStatus, MSCState};
+maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg, guid = Guid,
+ is_persistent = IsPersistent }, MSCState)
+ when Force orelse IsPersistent ->
+ {ok, MSCState1} =
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MsgStore, MSCState2) ->
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
+ rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2)
+ end),
+ {MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
+maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
+ {MsgStatus, MSCState}.
+
+maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
+ index_on_disk = true }, IndexState) ->
+ true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ {MsgStatus, IndexState};
+maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered }, IndexState)
+ when Force orelse IsPersistent ->
+ true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
+ IndexState),
+ {MsgStatus #msg_status { index_on_disk = true },
+ maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
+maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
+ {MsgStatus, IndexState}.
+
+maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(
+ ForceMsg, MsgStatus, MSCState),
+ {MsgStatus2, IndexState1} = maybe_write_index_to_disk(
+ ForceIndex, MsgStatus1, IndexState),
+ {MsgStatus2, State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }}.
+
+%%----------------------------------------------------------------------------
+%% Internal gubbins for acks
+%%----------------------------------------------------------------------------
+
+record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ AckEntry = case MsgOnDisk of
+ true -> {IsPersistent, Guid};
+ false -> MsgStatus
+ end,
+ dict:store(SeqId, AckEntry, PA).
+
+remove_pending_ack(KeepPersistent,
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState }) ->
+ {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
+ {[], orddict:new()}, PA),
+ State1 = State #vqstate { pending_ack = dict:new() },
+ case KeepPersistent of
+ true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ error -> State1;
+ {ok, Guids} -> ok = rabbit_msg_store:remove(
+ ?TRANSIENT_MSG_STORE, Guids),
+ State1
+ end;
+ false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
+ ok = orddict:fold(
+ fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ State1 #vqstate { index_state = IndexState1 }
+ end.
+
+ack(_MsgStoreFun, _Fun, [], State) ->
+ State;
+ack(MsgStoreFun, Fun, AckTags, State) ->
+ {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ {ok, AckEntry} = dict:find(SeqId, PA),
+ {accumulate_ack(SeqId, AckEntry, Acc),
+ Fun(AckEntry, State2 #vqstate {
+ pending_ack = dict:erase(SeqId, PA) })}
+ end, {{[], orddict:new()}, State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
+ ok = orddict:fold(fun (MsgStore, Guids, ok) ->
+ MsgStoreFun(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
+ error -> 0;
+ {ok, Guids} -> length(Guids)
+ end,
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }.
+
+accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
+ msg_on_disk = false,
+ index_on_disk = false }, Acc) ->
+ Acc;
+accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
+ {cons_if(IsPersistent, SeqId, SeqIdsAcc),
+ rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+
+%%----------------------------------------------------------------------------
+%% Phase changes
+%%----------------------------------------------------------------------------
+
+%% Determine whether a reduction in memory use is necessary, and call
+%% functions to perform the required phase changes. The function can
+%% also be used to just do the former, by passing in dummy phase
+%% change functions.
+%%
+%% The function does not report on any needed beta->delta conversions,
+%% though the conversion function for that is called as necessary. The
+%% reason is twofold. Firstly, this is safe because the conversion is
+%% only ever necessary just after a transition to a
+%% target_ram_msg_count of zero or after an incremental alpha->beta
+%% conversion. In the former case the conversion is performed straight
+%% away (i.e. any betas present at the time are converted to deltas),
+%% and in the latter case the need for a conversion is flagged up
+%% anyway. Secondly, this is necessary because we do not have a
+%% precise and cheap predicate for determining whether a beta->delta
+%% conversion is necessary - due to the complexities of retaining up
+%% one segment's worth of messages in q3 - and thus would risk
+%% perpetually reporting the need for a conversion when no such
+%% conversion is needed. That in turn could cause an infinite loop.
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
+ {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
+ State #vqstate.target_ram_msg_count) of
+ 0 -> {false, State};
+ S1 -> {true, AlphaBetaFun(S1, State)}
+ end,
+ case State1 #vqstate.target_ram_msg_count of
+ infinity -> {Reduce, State1};
+ 0 -> {Reduce, BetaDeltaFun(State1)};
+ _ -> case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_ram_index_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end
+ end.
+
+reduce_memory_use(State) ->
+ {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
+ fun limit_ram_index/2,
+ fun push_betas_to_deltas/1,
+ State),
+ State1.
+
+limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
+ index_state = IndexState,
+ ram_index_count = RamIndexCount }) ->
+ {Q2a, {Quota1, IndexState1}} = limit_ram_index(
+ fun bpqueue:map_fold_filter_r/4,
+ Q2, {Quota, IndexState}),
+ %% TODO: we shouldn't be writing index entries for messages that
+ %% can never end up in delta due them residing in the only segment
+ %% held by q3.
+ {Q3a, {Quota2, IndexState2}} = limit_ram_index(
+ fun bpqueue:map_fold_filter_r/4,
+ Q3, {Quota1, IndexState1}),
+ State #vqstate { q2 = Q2a, q3 = Q3a,
+ index_state = IndexState2,
+ ram_index_count = RamIndexCount - (Quota - Quota2) }.
+
+limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) ->
+ {Q, {0, IndexState}};
+limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) ->
+ MapFoldFilterFun(
+ fun erlang:'not'/1,
+ fun (MsgStatus, {0, _IndexStateN}) ->
+ false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
+ stop;
+ (MsgStatus, {N, IndexStateN}) when N > 0 ->
+ false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
+ {MsgStatus1, IndexStateN1} =
+ maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
+ {true, m(MsgStatus1), {N-1, IndexStateN1}}
+ end, {Quota, IndexState}, Q).
+
+permitted_ram_index_count(#vqstate { len = 0 }) ->
+ infinity;
+permitted_ram_index_count(#vqstate { len = Len,
+ q2 = Q2,
+ q3 = Q3,
+ delta = #delta { count = DeltaCount } }) ->
+ BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
+ BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
+
+chunk_size(Current, Permitted)
+ when Permitted =:= infinity orelse Permitted >= Current ->
+ 0;
+chunk_size(Current, Permitted) ->
+ lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
+
+fetch_from_q3_to_q4(State = #vqstate {
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ msg_store_clients = MSCState }) ->
+ case bpqueue:out(Q3) of
+ {empty, _Q3} ->
+ {empty, State};
+ {{value, IndexOnDisk, MsgStatus = #msg_status {
+ msg = undefined, guid = Guid,
+ is_persistent = IsPersistent }}, Q3a} ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+ Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4),
+ RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
+ true = RamIndexCount1 >= 0, %% ASSERTION
+ State1 = State #vqstate { q3 = Q3a,
+ q4 = Q4a,
+ ram_msg_count = RamMsgCount + 1,
+ ram_index_count = RamIndexCount1,
+ msg_store_clients = MSCState1 },
+ State2 =
+ case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before; delta is
+ %% still empty. So q2 must be empty, and q1
+ %% can now be joined onto q4
+ true = bpqueue:is_empty(Q2), %% ASSERTION
+ State1 #vqstate { q1 = queue:new(),
+ q4 = queue:join(Q4a, Q1) };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not touched
+ %% delta, so the invariants between q1, q2,
+ %% delta and q3 are maintained
+ State1
+ end,
+ {loaded, State2}
+ end.
+
+maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
+ State;
+maybe_deltas_to_betas(State = #vqstate {
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState,
+ target_ram_msg_count = TargetRamMsgCount,
+ transient_threshold = TransientThreshold }) ->
+ case bpqueue:is_empty(Q3) orelse (TargetRamMsgCount /= 0) of
+ false ->
+ State;
+ true ->
+ #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd } = Delta,
+ DeltaSeqId1 =
+ lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
+ {Q3a, IndexState2} = betas_from_index_entries(
+ List, TransientThreshold, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2 },
+ case bpqueue:len(Q3a) of
+ 0 ->
+ %% we ignored every message in the segment due to
+ %% it being transient and below the threshold
+ maybe_deltas_to_betas(
+ State #vqstate {
+ delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
+ Q3aLen ->
+ Q3b = bpqueue:join(Q3, Q3a),
+ case DeltaCount - Q3aLen of
+ 0 ->
+ %% delta is now empty, but it wasn't
+ %% before, so can now join q2 onto q3
+ State1 #vqstate { q2 = bpqueue:new(),
+ delta = ?BLANK_DELTA,
+ q3 = bpqueue:join(Q3b, Q2) };
+ N when N > 0 ->
+ Delta1 = #delta { start_seq_id = DeltaSeqId1,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd },
+ State1 #vqstate { delta = Delta1,
+ q3 = Q3b }
+ end
+ end
+ end.
+
+push_alphas_to_betas(Quota, State) ->
+ { Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
+ {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ State2.
+
+maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
+ maybe_push_alphas_to_betas(
+ fun queue:out/1,
+ fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
+ Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
+ State1 #vqstate { q1 = Q1a,
+ q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) };
+ (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
+ Q1a, State1 = #vqstate { q2 = Q2 }) ->
+ State1 #vqstate { q1 = Q1a,
+ q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) }
+ end, Quota, Q1, State).
+
+maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
+ maybe_push_alphas_to_betas(
+ fun queue:out_r/1,
+ fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
+ Q4a, State1 = #vqstate { q3 = Q3 }) ->
+ State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ q4 = Q4a }
+ end, Quota, Q4, State).
+
+maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+ State = #vqstate {
+ ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
+ when Quota =:= 0 orelse
+ TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
+ {Quota, State};
+maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, State};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount }} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }),
+ RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
+ ram_index_count = RamIndexCount1 },
+ maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
+ Consumer(MsgStatus2, Qa, State2))
+ end.
+
+push_betas_to_deltas(State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState,
+ ram_index_count = RamIndexCount }) ->
+ {Delta2, Q2a, RamIndexCount2, IndexState2} =
+ push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
+ fun bpqueue:out/1, Q2,
+ RamIndexCount, IndexState),
+ {Delta3, Q3a, RamIndexCount3, IndexState3} =
+ push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
+ fun bpqueue:out_r/1, Q3,
+ RamIndexCount2, IndexState2),
+ Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
+ State #vqstate { q2 = Q2a,
+ delta = Delta4,
+ q3 = Q3a,
+ index_state = IndexState3,
+ ram_index_count = RamIndexCount3 }.
+
+push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
+ case bpqueue:out(Q) of
+ {empty, _Q} ->
+ {?BLANK_DELTA, Q, RamIndexCount, IndexState};
+ {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} ->
+ {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} =
+ bpqueue:out_r(Q),
+ Limit = LimitFun(MinSeqId),
+ case MaxSeqId < Limit of
+ true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState};
+ false -> {Len, Qc, RamIndexCount1, IndexState1} =
+ push_betas_to_deltas(Generator, Limit, Q, 0,
+ RamIndexCount, IndexState),
+ {#delta { start_seq_id = Limit,
+ count = Len,
+ end_seq_id = MaxSeqId + 1 },
+ Qc, RamIndexCount1, IndexState1}
+ end
+ end.
+
+push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ {Count, Q, RamIndexCount, IndexState};
+ {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa}
+ when SeqId < Limit ->
+ {Count, Q, RamIndexCount, IndexState};
+ {{value, IndexOnDisk, MsgStatus}, Qa} ->
+ {RamIndexCount1, IndexState1} =
+ case IndexOnDisk of
+ true -> {RamIndexCount, IndexState};
+ false -> {#msg_status { index_on_disk = true },
+ IndexState2} =
+ maybe_write_index_to_disk(true, MsgStatus,
+ IndexState),
+ {RamIndexCount - 1, IndexState2}
+ end,
+ push_betas_to_deltas(
+ Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
+ end.