From f9cf13987aecfe7756db2de53bfd89149ad59aab Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 3 Dec 2014 17:27:16 +0000 Subject: Fix confirms --- src/rabbit_queue_index.erl | 75 ++++++++++++++++++++++++++++--------------- src/rabbit_variable_queue.erl | 31 +++++++++++------- test/src/rabbit_tests.erl | 2 +- 3 files changed, 69 insertions(+), 39 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 90729e33..a78dacec 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_index). --export([erase/1, init/2, recover/5, +-export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -173,10 +173,11 @@ %%---------------------------------------------------------------------------- --record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unconfirmed }). +-record(qistate, {dir, segments, journal_handle, dirty_count, + max_journal_entries, on_sync, on_sync_msg, + unconfirmed, unconfirmed_msg}). --record(segment, { num, path, journal_entries, unacked }). +-record(segment, {num, path, journal_entries, unacked}). -include("rabbit.hrl"). @@ -204,7 +205,9 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unconfirmed :: gb_sets:set() + on_sync_msg :: on_sync_fun(), + unconfirmed :: gb_sets:set(), + unconfirmed_msg :: gb_sets:set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -212,9 +215,11 @@ -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). -spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). --spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). --spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - contains_predicate(), on_sync_fun()) -> +-spec(init/3 :: (rabbit_amqqueue:name(), + on_sync_fun(), on_sync_fun()) -> qistate()). +-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + contains_predicate(), + on_sync_fun(), on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). @@ -253,14 +258,17 @@ erase(Name) -> false -> ok end. -init(Name, OnSyncFun) -> +init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir - State #qistate { on_sync = OnSyncFun }. + State#qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, + OnSyncFun, OnSyncMsgFun) -> State = blank_state(Name), - State1 = State #qistate { on_sync = OnSyncFun }, + State1 = State #qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}, CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), @@ -280,7 +288,8 @@ delete_and_terminate(State) -> State1. publish(MsgOrId, SeqId, MsgProps, IsPersistent, - State = #qistate { unconfirmed = Unconfirmed }) -> + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM}) -> MsgId = case MsgOrId of #basic_message{id = Id} -> Id; Id when is_binary(Id) -> Id @@ -288,10 +297,12 @@ publish(MsgOrId, SeqId, MsgProps, IsPersistent, ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - case MsgProps#message_properties.needs_confirming of - true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), - State #qistate { unconfirmed = Unconfirmed1 }; - false -> State + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State end), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of @@ -317,10 +328,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -needs_sync(#qistate { journal_handle = undefined }) -> +needs_sync(#qistate{journal_handle = undefined}) -> false; -needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) -> - case gb_sets:is_empty(UC) of +needs_sync(#qistate{journal_handle = JournalHdl, + unconfirmed = UC, + unconfirmed_msg = UCM}) -> + case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of true -> case file_handle_cache:needs_sync(JournalHdl) of true -> other; false -> false @@ -425,7 +438,9 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unconfirmed = gb_sets:new() }. + on_sync_msg = fun (_) -> ok end, + unconfirmed = gb_sets:new(), + unconfirmed_msg = gb_sets:new() }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -781,11 +796,19 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> - case gb_sets:is_empty(UC) of - true -> State; - false -> OnSyncFun(UC), - State #qistate { unconfirmed = gb_sets:new() } +notify_sync(State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}) -> + State1 = case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State#qistate{unconfirmed = gb_sets:new()} + end, + case gb_sets:is_empty(UCM) of + true -> State1; + false -> OnSyncMsgFun(UCM), + State1#qistate{unconfirmed_msg = gb_sets:new()} end. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 21c955db..d63378eb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -28,7 +28,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- %% Definitions: @@ -362,7 +362,7 @@ out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), - msgs_on_disk :: gb_sets:set(), %% TODO fix confirms! + msgs_on_disk :: gb_sets:set(), msg_indices_on_disk :: gb_sets:set(), unconfirmed :: gb_sets:set(), confirmed :: gb_sets:set(), @@ -426,16 +426,19 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback) -> - init(Queue, Recover, AsyncCallback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). +init(Queue, Recover, Callback) -> + init( + Queue, Recover, Callback, + fun (MsgIds, ActionTaken) -> + msgs_written_to_disk(Callback, MsgIds, ActionTaken) + end, + fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, + fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> + IndexState = rabbit_queue_index:init(QueueName, + MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, @@ -446,7 +449,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), {PersistentClient, ContainsCheckFun} = case IsDurable of @@ -461,7 +464,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - ContainsCheckFun, MsgIdxOnDiskFun), + ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -1479,6 +1482,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> gb_sets:union(MIOD, Confirmed) }) end). +msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). + %%---------------------------------------------------------------------------- %% Internal plumbing for requeue %%---------------------------------------------------------------------------- diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index dcbec8f6..1fc708df 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -2422,7 +2422,7 @@ variable_queue_init(Q, Recover) -> Q, case Recover of true -> non_clean_shutdown; false -> new - end, fun nop/2, fun nop/2, fun nop/1). + end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). -- cgit v1.2.1