diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-10 12:29:58 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-10 12:29:58 +0000 |
commit | b8dd9736bacab6e918369990e9ba7cd3eb9fd17d (patch) | |
tree | ebf822ac0e3edc9275538836e2aa4c7899645922 | |
parent | fbdf07599e6f4234cc1b0458bcdb1d4f6f533d36 (diff) | |
download | rabbitmq-server-b8dd9736bacab6e918369990e9ba7cd3eb9fd17d.tar.gz |
sync queue_index when variable_queue has outstanding confirms and is idle (branch off default)
-rw-r--r-- | src/rabbit_queue_index.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
2 files changed, 30 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef..3ebf200a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -33,7 +33,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, + publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -297,6 +297,14 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). +%% This is only called when there are outstanding confirms and the +%% queue is idle. +sync(State = #qistate { unsynced_guids = [] }) -> + State; +sync(State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + sync([], State) -> State; sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 19d7c576..ed0589c2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -809,17 +809,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State), - Res; -needs_idle_timeout(_State) -> - true. - -idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). +needs_idle_timeout(State = #vqstate { on_sync = OS, unconfirmed = UC }) -> + case {OS, gb_sets:size(UC)} of + {?BLANK_SYNC, 0} -> + {Res, _State} = reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State), + Res; + _ -> + true + end. + +idle_timeout(State) -> + a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1386,6 +1391,11 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- +confirm_commit_index(State = #vqstate { unconfirmed = [] }) -> + State; +confirm_commit_index(State = #vqstate { index_state = IS }) -> + State #vqstate { index_state = rabbit_queue_index:sync(IS) }. + remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> |