summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-10 12:29:58 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-10 12:29:58 +0000
commitb8dd9736bacab6e918369990e9ba7cd3eb9fd17d (patch)
treeebf822ac0e3edc9275538836e2aa4c7899645922
parentfbdf07599e6f4234cc1b0458bcdb1d4f6f533d36 (diff)
downloadrabbitmq-server-b8dd9736bacab6e918369990e9ba7cd3eb9fd17d.tar.gz
sync queue_index when variable_queue has outstanding confirms and is idle (branch off default)
-rw-r--r--src/rabbit_queue_index.erl10
-rw-r--r--src/rabbit_variable_queue.erl32
2 files changed, 30 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef..3ebf200a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,6 +297,14 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = [] }) ->
+ State;
+sync(State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
sync([], State) ->
State;
sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 19d7c576..ed0589c2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -809,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
-
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+needs_idle_timeout(State = #vqstate { on_sync = OS, unconfirmed = UC }) ->
+ case {OS, gb_sets:size(UC)} of
+ {?BLANK_SYNC, 0} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
+
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1386,6 +1391,11 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { unconfirmed = [] }) ->
+ State;
+confirm_commit_index(State = #vqstate { index_state = IS }) ->
+ State #vqstate { index_state = rabbit_queue_index:sync(IS) }.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->