diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 10:32:39 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 10:32:39 +0100 |
commit | 9ee19e49a6da5e72aae9e1683997c5cf31488df9 (patch) | |
tree | 49544479716a15780b1efc145e6f0f7b392458fe /src/rabbit_queue_index.erl | |
parent | f2e188bbc4a9d8a3cc147275427adb284d5f0505 (diff) | |
parent | 537ee7a8b46852b183c3e2f6b68ad13ade9ee602 (diff) | |
download | rabbitmq-server-9ee19e49a6da5e72aae9e1683997c5cf31488df9.tar.gz |
pull from default
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 53 |
1 files changed, 33 insertions, 20 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0b98290c..d4b613ff 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/5, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -166,7 +166,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -189,15 +189,18 @@ segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: fun (([rabbit_guid:guid()]) -> ok), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), - A}). + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), + A}). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), [any()], qistate()}). +-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), + fun (([seq_id()]) -> ok)) + -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> @@ -222,25 +225,28 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name, false, _MsgStoreRecovered, _ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + {0, [], State #qistate { on_sync = OnSyncFun, + unsynced_guids = []}}; -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +init(Name, true, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), + State1 = State #qistate { on_sync = OnSyncFun, + unsynced_guids = [] }, Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 end, CleanShutdown = detect_clean_shutdown(Dir), - {Count, State1} = + {Count, State2} = case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + init_clean(RecoveredCounts, State1); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end, - {Count, Terms, State1}. + {Count, Terms, State2}. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -260,7 +266,9 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + State2 = State1 #qistate { unsynced_guids = + [Guid | State1#qistate.unsynced_guids] }, + maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -272,7 +280,8 @@ sync([], State) -> State; sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +sync(_SeqIds, State = #qistate { journal_handle = JournalHdl, + on_sync = OnSyncFun }) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -282,7 +291,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + OnSyncFun(State#qistate.unsynced_guids), + State#qistate { unsynced_guids = [] }. flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -561,7 +571,9 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount, maybe_flush_journal(State) -> State. -flush_journal(State = #qistate { segments = Segments }) -> +flush_journal(State = #qistate { segments = Segments, + on_sync = OnSyncFun, + unsynced_guids = UGs }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -576,7 +588,8 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + OnSyncFun(UGs), + State1 #qistate { dirty_count = 0, unsynced_guids = [] }. append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> |