summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl64
1 files changed, 39 insertions, 25 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3841b680..ea70208f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -162,7 +162,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_msg_ids }).
+ max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -190,7 +190,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_msg_ids :: gb_set()
+ unconfirmed :: gb_set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -210,7 +210,7 @@
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
--spec(needs_sync/1 :: (qistate()) -> boolean()).
+-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false').
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -269,13 +269,16 @@ delete_and_terminate(State) ->
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ State = #qistate { unconfirmed = Unconfirmed })
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- State #qistate {
- unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }),
+ case MsgProps#message_properties.needs_confirming of
+ true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
+ State #qistate { unconfirmed = Unconfirmed1 };
+ false -> State
+ end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -302,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
needs_sync(#qistate { journal_handle = undefined }) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_sync(JournalHdl).
+needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
+ case gb_sets:is_empty(UC) of
+ true -> case file_handle_cache:needs_sync(JournalHdl) of
+ true -> other;
+ false -> false
+ end;
+ false -> confirms
+ end.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -398,7 +407,7 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_msg_ids = gb_sets:new() }.
+ unconfirmed = gb_sets:new() }.
clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -607,19 +616,21 @@ add_to_journal(RelSeq, Action,
end};
add_to_journal(RelSeq, Action, JEntries) ->
- Val = case array:get(RelSeq, JEntries) of
- undefined ->
- case Action of
- ?PUB -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end;
- ({Pub, no_del, no_ack}) when Action == del ->
- {Pub, del, no_ack};
- ({Pub, del, no_ack}) when Action == ack ->
- {Pub, del, ack}
- end,
- array:set(RelSeq, Val, JEntries).
+ case array:get(RelSeq, JEntries) of
+ undefined ->
+ array:set(RelSeq,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end, JEntries);
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ ({no_pub, del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ ({?PUB, del, no_ack}) when Action == ack ->
+ array:reset(RelSeq, JEntries)
+ end.
maybe_flush_journal(State = #qistate { dirty_count = DCount,
max_journal_entries = MaxJournal })
@@ -732,9 +743,12 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
- State #qistate { unsynced_msg_ids = gb_sets:new() }.
+notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
+ case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State #qistate { unconfirmed = gb_sets:new() }
+ end.
%%----------------------------------------------------------------------------
%% segment manipulation