summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 10:32:39 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 10:32:39 +0100
commit9ee19e49a6da5e72aae9e1683997c5cf31488df9 (patch)
tree49544479716a15780b1efc145e6f0f7b392458fe /src/rabbit_queue_index.erl
parentf2e188bbc4a9d8a3cc147275427adb284d5f0505 (diff)
parent537ee7a8b46852b183c3e2f6b68ad13ade9ee602 (diff)
downloadrabbitmq-server-9ee19e49a6da5e72aae9e1683997c5cf31488df9.tar.gz
pull from default
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl53
1 files changed, 33 insertions, 20 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0b98290c..d4b613ff 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/5, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -166,7 +166,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries }).
+ max_journal_entries, on_sync, unsynced_guids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -189,15 +189,18 @@
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer(),
- max_journal_entries :: non_neg_integer()
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: fun (([rabbit_guid:guid()]) -> ok),
+ unsynced_guids :: [rabbit_guid:guid()]
}).
-type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
- A}).
+ {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
+ A}).
--spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
- {'undefined' | non_neg_integer(), [any()], qistate()}).
+-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean()),
+ fun (([seq_id()]) -> ok))
+ -> {'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) ->
@@ -222,25 +225,28 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+init(Name, false, _MsgStoreRecovered, _ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = filelib:is_file(Dir), %% is_file == is file or dir
- {0, [], State};
+ {0, [], State #qistate { on_sync = OnSyncFun,
+ unsynced_guids = []}};
-init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+init(Name, true, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
+ State1 = State #qistate { on_sync = OnSyncFun,
+ unsynced_guids = [] },
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
end,
CleanShutdown = detect_clean_shutdown(Dir),
- {Count, State1} =
+ {Count, State2} =
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
- init_clean(RecoveredCounts, State);
- false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
+ init_clean(RecoveredCounts, State1);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end,
- {Count, Terms, State1}.
+ {Count, Terms, State2}.
terminate(Terms, State) ->
{SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
@@ -260,7 +266,9 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
+ State2 = State1 #qistate { unsynced_guids =
+ [Guid | State1#qistate.unsynced_guids] },
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -272,7 +280,8 @@ sync([], State) ->
State;
sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+sync(_SeqIds, State = #qistate { journal_handle = JournalHdl,
+ on_sync = OnSyncFun }) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -282,7 +291,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- State.
+ OnSyncFun(State#qistate.unsynced_guids),
+ State#qistate { unsynced_guids = [] }.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -561,7 +571,9 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount,
maybe_flush_journal(State) ->
State.
-flush_journal(State = #qistate { segments = Segments }) ->
+flush_journal(State = #qistate { segments = Segments,
+ on_sync = OnSyncFun,
+ unsynced_guids = UGs }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
@@ -576,7 +588,8 @@ flush_journal(State = #qistate { segments = Segments }) ->
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
+ OnSyncFun(UGs),
+ State1 #qistate { dirty_count = 0, unsynced_guids = [] }.
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->