summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl237
1 files changed, 28 insertions, 209 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c6d99deb..ea72de66 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,20 +16,18 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/2, delete_and_terminate/2,
+-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2,
- tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1,
+ dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/3, discard/3,
+ status/1, invoke/3, is_duplicate/2, discard/3,
multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([start_msg_store/2, stop_msg_store/0, init/5]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -239,12 +237,10 @@
ram_ack_index,
index_state,
msg_store_clients,
- on_sync,
durable,
transient_threshold,
async_callback,
- sync_callback,
len,
persistent_count,
@@ -285,10 +281,6 @@
end_seq_id %% end_seq_id is exclusive
}).
--record(tx, { pending_messages, pending_acks }).
-
--record(sync, { acks_persistent, acks_all, pubs, funs }).
-
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -321,12 +313,6 @@
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
--type(sync() :: #sync { acks_persistent :: [[seq_id()]],
- acks_all :: [[seq_id()]],
- pubs :: [{message_properties_transformer(),
- [rabbit_types:basic_message()]}],
- funs :: [fun (() -> any())] }).
-
-type(state() :: #vqstate {
q1 :: queue(),
q2 :: bpqueue:bpqueue(),
@@ -339,12 +325,10 @@
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- on_sync :: sync(),
durable :: boolean(),
transient_threshold :: non_neg_integer(),
async_callback :: async_callback(),
- sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -377,11 +361,6 @@
count = 0,
end_seq_id = Z }).
--define(BLANK_SYNC, #sync { acks_persistent = [],
- acks_all = [],
- pubs = [],
- funs = [] }).
-
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -410,17 +389,17 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover, AsyncCallback, SyncCallback) ->
- init(Queue, Recover, AsyncCallback, SyncCallback,
+init(Queue, Recover, AsyncCallback) ->
+ init(Queue, Recover, AsyncCallback,
fun (MsgIds, ActionTaken) ->
msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
- AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
+ init(IsDurable, IndexState, 0, [], AsyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -429,7 +408,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
init(#amqqueue { name = QueueName, durable = true }, true,
- AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -450,14 +429,14 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(true, tx_commit_index(State)),
+ remove_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
@@ -590,59 +569,6 @@ ack(AckTags, State) ->
AckTags, State),
{MsgIds, a(State1)}.
-tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- _ChPid, State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
- Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
- case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
- #msg_status { msg_on_disk = true } =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState);
- false -> ok
- end,
- a(State).
-
-tx_ack(Txn, AckTags, State) ->
- Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- State.
-
-tx_rollback(Txn, State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
- #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
- erase_tx(Txn),
- ok = case IsDurable of
- true -> msg_store_remove(MSCState, true,
- persistent_msg_ids(Pubs));
- false -> ok
- end,
- {lists:append(AckTags), a(State)}.
-
-tx_commit(Txn, Fun, MsgPropsFun,
- State = #vqstate { durable = IsDurable,
- async_callback = AsyncCallback,
- sync_callback = SyncCallback,
- msg_store_clients = MSCState }) ->
- #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
- erase_tx(Txn),
- AckTags1 = lists:append(AckTags),
- PersistentMsgIds = persistent_msg_ids(Pubs),
- HasPersistentPubs = PersistentMsgIds =/= [],
- {AckTags1,
- a(case IsDurable andalso HasPersistentPubs of
- true -> MsgStoreCallback =
- fun () -> msg_store_callback(
- PersistentMsgIds, Pubs, AckTags1, Fun,
- MsgPropsFun, AsyncCallback, SyncCallback)
- end,
- ok = msg_store_sync(MSCState, true, PersistentMsgIds,
- fun () -> spawn(MsgStoreCallback) end),
- State;
- false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
- Fun, MsgPropsFun, State)
- end)}.
-
requeue(AckTags, MsgPropsFun, State) ->
MsgPropsFun1 = fun (MsgProps) ->
(MsgPropsFun(MsgProps)) #message_properties {
@@ -748,23 +674,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { on_sync = OnSync }) ->
- case {OnSync, needs_index_sync(State)} of
- {?BLANK_SYNC, false} ->
- case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- _ ->
- timed
+needs_timeout(State) ->
+ case needs_index_sync(State) of
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end;
+ true -> timed
end.
timeout(State) ->
- a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
+ a(reduce_memory_use(confirm_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -774,7 +699,6 @@ status(#vqstate {
len = Len,
pending_ack = PA,
ram_ack_index = RAI,
- on_sync = #sync { funs = From },
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
@@ -791,7 +715,6 @@ status(#vqstate {
{q4 , queue:len(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
@@ -803,10 +726,9 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
-invoke(?MODULE, Fun, State) ->
- Fun(?MODULE, State).
+invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
-is_duplicate(_Txn, _Msg, State) -> {false, State}.
+is_duplicate(_Msg, State) -> {false, State}.
discard(_Msg, _ChPid, State) -> State.
@@ -902,11 +824,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) ->
MSCState, IsPersistent,
fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end).
-
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
MSCState, IsPersistent,
@@ -923,20 +840,6 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-lookup_tx(Txn) -> case get({txn, Txn}) of
- undefined -> #tx { pending_messages = [],
- pending_acks = [] };
- V -> V
- end.
-
-store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
-
-erase_tx(Txn) -> erase({txn, Txn}).
-
-persistent_msg_ids(Pubs) ->
- [MsgId || {#basic_message { id = MsgId,
- is_persistent = true }, _MsgProps} <- Pubs].
-
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
@@ -1000,8 +903,8 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms,
- AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
+init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+ PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1023,12 +926,10 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
durable = IsDurable,
transient_threshold = NextSeqId,
async_callback = AsyncCallback,
- sync_callback = SyncCallback,
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1146,88 +1047,6 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
len = Len1,
persistent_count = PCount1 }}.
-msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
- AsyncCallback, SyncCallback) ->
- case SyncCallback(?MODULE,
- fun (?MODULE, StateN) ->
- tx_commit_post_msg_store(true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
- end) of
- ok -> ok;
- error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback)
- end.
-
-remove_persistent_messages(MsgIds, AsyncCallback) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
- undefined, AsyncCallback),
- ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
- rabbit_msg_store:client_delete_and_terminate(PersistentClient).
-
-tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
- State = #vqstate {
- on_sync = OnSync = #sync {
- acks_persistent = SPAcks,
- acks_all = SAcks,
- pubs = SPubs,
- funs = SFuns },
- pending_ack = PA,
- durable = IsDurable }) ->
- PersistentAcks =
- case IsDurable of
- true -> [AckTag || AckTag <- AckTags,
- case dict:fetch(AckTag, PA) of
- #msg_status {} ->
- false;
- {IsPersistent, _MsgId, _MsgProps} ->
- IsPersistent
- end];
- false -> []
- end,
- case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
- true -> State #vqstate {
- on_sync = #sync {
- acks_persistent = [PersistentAcks | SPAcks],
- acks_all = [AckTags | SAcks],
- pubs = [{MsgPropsFun, Pubs} | SPubs],
- funs = [Fun | SFuns] }};
- false -> State1 = tx_commit_index(
- State #vqstate {
- on_sync = #sync {
- acks_persistent = [],
- acks_all = [AckTags],
- pubs = [{MsgPropsFun, Pubs}],
- funs = [Fun] } }),
- State1 #vqstate { on_sync = OnSync }
- end.
-
-tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- State;
-tx_commit_index(State = #vqstate { on_sync = #sync {
- acks_persistent = SPAcks,
- acks_all = SAcks,
- pubs = SPubs,
- funs = SFuns },
- durable = IsDurable }) ->
- PAcks = lists:append(SPAcks),
- Acks = lists:append(SAcks),
- Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
- {Msg, MsgProps} <- lists:reverse(PubsN)],
- {_MsgIds, State1} = ack(Acks, State),
- {SeqIds, State2 = #vqstate { index_state = IndexState }} =
- lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent },
- MsgProps},
- {SeqIdsAcc, State3}) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State4} =
- publish(Msg, MsgProps, false, IsPersistent1, State3),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
- end, {PAcks, State1}, Pubs),
- IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
- [ Fun() || Fun <- lists:reverse(SFuns) ],
- reduce_memory_use(
- State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
-
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,