summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-25 16:08:47 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-25 16:08:47 +0000
commitbdb629a5e6910c1683703d255d1055fe0f402771 (patch)
tree19beabaca231db2cb1c958f3322c826cc434ad41 /src/rabbit_queue_index.erl
parent3c7d26cee18f985ee5166875d9e1a8710ca97946 (diff)
parent84c7a67e489d8291bd52980bfe9bb5254417f841 (diff)
downloadrabbitmq-server-bdb629a5e6910c1683703d255d1055fe0f402771.tar.gz
merge with default
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 43dbf9d4..b646f330 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/1, shutdown_terms/1, recover/4,
+-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -174,7 +174,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries }).
+ max_journal_entries, on_sync, unsynced_guids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -195,21 +195,24 @@
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
+-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
-type(qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer(),
- max_journal_entries :: non_neg_integer()
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: on_sync_fun(),
+ unsynced_guids :: [rabbit_guid:guid()]
}).
-type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
+ {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
A}).
-type(shutdown_terms() :: [any()]).
--spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()).
+-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
--spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
+-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
{'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
@@ -227,8 +230,8 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 ::
- ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
+ {[[any()]], startup_fun_state()}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -239,10 +242,10 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name) ->
+init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = filelib:is_file(Dir), %% is_file == is file or dir
- State.
+ State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
@@ -251,13 +254,14 @@ shutdown_terms(Name) ->
{ok, Terms1} -> Terms1
end.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
+ State1 = State #qistate { on_sync = OnSyncFun },
CleanShutdown = detect_clean_shutdown(Dir),
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
- init_clean(RecoveredCounts, State);
- false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
+ init_clean(RecoveredCounts, State1);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
terminate(Terms, State) ->
@@ -270,9 +274,13 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_guids = UnsyncedGuids })
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
- {JournalHdl, State1} = get_journal_handle(State),
+ {JournalHdl, State1} = get_journal_handle(
+ State #qistate {
+ unsynced_guids = [Guid | UnsyncedGuids] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -303,7 +311,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- State.
+ notify_sync(State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -393,7 +401,9 @@ blank_state(QueueName) ->
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
- max_journal_entries = MaxJournal }.
+ max_journal_entries = MaxJournal,
+ on_sync = fun (_) -> ok end,
+ unsynced_guids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -625,7 +635,7 @@ flush_journal(State = #qistate { segments = Segments }) ->
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
+ notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
@@ -713,6 +723,10 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+ OnSyncFun(gb_sets:from_list(UG)),
+ State #qistate { unsynced_guids = [] }.
+
%%----------------------------------------------------------------------------
%% segment manipulation
%%----------------------------------------------------------------------------