diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-01-11 15:15:15 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-01-11 15:15:15 +0000 |
commit | 12f778dfdc9bf99ca45e5e5f36a1d4281f687c24 (patch) | |
tree | f776fca604bc02fae1ff7b93960433c60734d625 /src/rabbit_queue_index.erl | |
parent | 8d1365c898057bec83c45201d99c7ca8d5815e3a (diff) | |
parent | 8b273beb6e40b0647e1ee8e532f57b4220785bd0 (diff) | |
download | rabbitmq-server-12f778dfdc9bf99ca45e5e5f36a1d4281f687c24.tar.gz |
Merge default
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6adcd8b0..9bee84f4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -33,7 +33,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, + publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -297,11 +297,12 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). -sync([], State) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +%% This is only called when there are outstanding confirms and the +%% queue is idle. +sync(State = #qistate { unsynced_guids = Guids }) -> + sync_if([] =/= Guids, State). + +sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled above anyway). - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). + %% emptied (handled by sync_if anyway). + sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +sync_if(false, State) -> + State; +sync_if(_Bool, State = #qistate { journal_handle = undefined }) -> + State; +sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), State #qistate { unsynced_guids = [] }. |