diff options
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 237 |
1 files changed, 28 insertions, 209 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c6d99deb..ea72de66 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,20 +16,18 @@ -module(rabbit_variable_queue). --export([init/4, terminate/2, delete_and_terminate/2, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, - tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, + dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/3, discard/3, + status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/2, stop_msg_store/0, init/5]). %%---------------------------------------------------------------------------- %% Definitions: @@ -239,12 +237,10 @@ ram_ack_index, index_state, msg_store_clients, - on_sync, durable, transient_threshold, async_callback, - sync_callback, len, persistent_count, @@ -285,10 +281,6 @@ end_seq_id %% end_seq_id is exclusive }). --record(tx, { pending_messages, pending_acks }). - --record(sync, { acks_persistent, acks_all, pubs, funs }). - %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -321,12 +313,6 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). --type(sync() :: #sync { acks_persistent :: [[seq_id()]], - acks_all :: [[seq_id()]], - pubs :: [{message_properties_transformer(), - [rabbit_types:basic_message()]}], - funs :: [fun (() -> any())] }). - -type(state() :: #vqstate { q1 :: queue(), q2 :: bpqueue:bpqueue(), @@ -339,12 +325,10 @@ index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), durable :: boolean(), transient_threshold :: non_neg_integer(), async_callback :: async_callback(), - sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -377,11 +361,6 @@ count = 0, end_seq_id = Z }). --define(BLANK_SYNC, #sync { acks_persistent = [], - acks_all = [], - pubs = [], - funs = [] }). - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -410,17 +389,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback, SyncCallback) -> - init(Queue, Recover, AsyncCallback, SyncCallback, +init(Queue, Recover, AsyncCallback) -> + init(Queue, Recover, AsyncCallback, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, false, - AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, + init(IsDurable, IndexState, 0, [], AsyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -429,7 +408,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); init(#amqqueue { name = QueueName, durable = true }, true, - AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -450,14 +429,14 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, PersistentClient, TransientClient). terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(true, tx_commit_index(State)), + remove_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), @@ -590,59 +569,6 @@ ack(AckTags, State) -> AckTags, State), {MsgIds, a(State1)}. -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - _ChPid, State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), - #msg_status { msg_on_disk = true } = - maybe_write_msg_to_disk(false, MsgStatus, MSCState); - false -> ok - end, - a(State). - -tx_ack(Txn, AckTags, State) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - State. - -tx_rollback(Txn, State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> - #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), - erase_tx(Txn), - ok = case IsDurable of - true -> msg_store_remove(MSCState, true, - persistent_msg_ids(Pubs)); - false -> ok - end, - {lists:append(AckTags), a(State)}. - -tx_commit(Txn, Fun, MsgPropsFun, - State = #vqstate { durable = IsDurable, - async_callback = AsyncCallback, - sync_callback = SyncCallback, - msg_store_clients = MSCState }) -> - #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), - erase_tx(Txn), - AckTags1 = lists:append(AckTags), - PersistentMsgIds = persistent_msg_ids(Pubs), - HasPersistentPubs = PersistentMsgIds =/= [], - {AckTags1, - a(case IsDurable andalso HasPersistentPubs of - true -> MsgStoreCallback = - fun () -> msg_store_callback( - PersistentMsgIds, Pubs, AckTags1, Fun, - MsgPropsFun, AsyncCallback, SyncCallback) - end, - ok = msg_store_sync(MSCState, true, PersistentMsgIds, - fun () -> spawn(MsgStoreCallback) end), - State; - false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, - Fun, MsgPropsFun, State) - end)}. - requeue(AckTags, MsgPropsFun, State) -> MsgPropsFun1 = fun (MsgProps) -> (MsgPropsFun(MsgProps)) #message_properties { @@ -748,23 +674,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State = #vqstate { on_sync = OnSync }) -> - case {OnSync, needs_index_sync(State)} of - {?BLANK_SYNC, false} -> - case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - _ -> - timed +needs_timeout(State) -> + case needs_index_sync(State) of + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end; + true -> timed end. timeout(State) -> - a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). + a(reduce_memory_use(confirm_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -774,7 +699,6 @@ status(#vqstate { len = Len, pending_ack = PA, ram_ack_index = RAI, - on_sync = #sync { funs = From }, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, @@ -791,7 +715,6 @@ status(#vqstate { {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, @@ -803,10 +726,9 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. -invoke(?MODULE, Fun, State) -> - Fun(?MODULE, State). +invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). -is_duplicate(_Txn, _Msg, State) -> {false, State}. +is_duplicate(_Msg, State) -> {false, State}. discard(_Msg, _ChPid, State) -> State. @@ -902,11 +824,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) -> MSCState, IsPersistent, fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end). - msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( MSCState, IsPersistent, @@ -923,20 +840,6 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx { pending_messages = [], - pending_acks = [] }; - V -> V - end. - -store_tx(Txn, Tx) -> put({txn, Txn}, Tx). - -erase_tx(Txn) -> erase({txn, Txn}). - -persistent_msg_ids(Pubs) -> - [MsgId || {#basic_message { id = MsgId, - is_persistent = true }, _MsgProps} <- Pubs]. - betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( @@ -1000,8 +903,8 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, - AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> +init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, + PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1023,12 +926,10 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, durable = IsDurable, transient_threshold = NextSeqId, async_callback = AsyncCallback, - sync_callback = SyncCallback, len = DeltaCount1, persistent_count = DeltaCount1, @@ -1146,88 +1047,6 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { len = Len1, persistent_count = PCount1 }}. -msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, - AsyncCallback, SyncCallback) -> - case SyncCallback(?MODULE, - fun (?MODULE, StateN) -> - tx_commit_post_msg_store(true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) - end) of - ok -> ok; - error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback) - end. - -remove_persistent_messages(MsgIds, AsyncCallback) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, - undefined, AsyncCallback), - ok = rabbit_msg_store:remove(MsgIds, PersistentClient), - rabbit_msg_store:client_delete_and_terminate(PersistentClient). - -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, - State = #vqstate { - on_sync = OnSync = #sync { - acks_persistent = SPAcks, - acks_all = SAcks, - pubs = SPubs, - funs = SFuns }, - pending_ack = PA, - durable = IsDurable }) -> - PersistentAcks = - case IsDurable of - true -> [AckTag || AckTag <- AckTags, - case dict:fetch(AckTag, PA) of - #msg_status {} -> - false; - {IsPersistent, _MsgId, _MsgProps} -> - IsPersistent - end]; - false -> [] - end, - case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { - on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [{MsgPropsFun, Pubs} | SPubs], - funs = [Fun | SFuns] }}; - false -> State1 = tx_commit_index( - State #vqstate { - on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [{MsgPropsFun, Pubs}], - funs = [Fun] } }), - State1 #vqstate { on_sync = OnSync } - end. - -tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - State; -tx_commit_index(State = #vqstate { on_sync = #sync { - acks_persistent = SPAcks, - acks_all = SAcks, - pubs = SPubs, - funs = SFuns }, - durable = IsDurable }) -> - PAcks = lists:append(SPAcks), - Acks = lists:append(SAcks), - Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), - {Msg, MsgProps} <- lists:reverse(PubsN)], - {_MsgIds, State1} = ack(Acks, State), - {SeqIds, State2 = #vqstate { index_state = IndexState }} = - lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps}, - {SeqIdsAcc, State3}) -> - IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State4} = - publish(Msg, MsgProps, false, IsPersistent1, State3), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} - end, {PAcks, State1}, Pubs), - IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), - [ Fun() || Fun <- lists:reverse(SFuns) ], - reduce_memory_use( - State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). - purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, |