summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-02-08 10:53:02 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-02-08 10:53:02 +0000
commit0dd6aa85db597cd239a261a7f2bc45860f5f810a (patch)
tree90aed3133925b7f4986ba8eecf03d7f966cbc468
parent4d77a3cbf93bdf3da5467903b24db514e17afabb (diff)
parente1199f009198360d869c0977545a2ff42a05e5ef (diff)
downloadrabbitmq-server-0dd6aa85db597cd239a261a7f2bc45860f5f810a.tar.gz
Merge bug 25429
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_variable_queue.erl45
2 files changed, 21 insertions, 36 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 27ed4722..ea70208f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -210,7 +210,7 @@
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
--spec(needs_sync/1 :: (qistate()) -> boolean()).
+-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false').
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -305,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
needs_sync(#qistate { journal_handle = undefined }) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_sync(JournalHdl).
+needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
+ case gb_sets:is_empty(UC) of
+ true -> case file_handle_cache:needs_sync(JournalHdl) of
+ true -> other;
+ false -> false
+ end;
+ false -> confirms
+ end.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index faa1b0b1..5d463f57 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -772,24 +772,18 @@ ram_duration(State = #vqstate {
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case TargetRamCount of
- infinity -> false;
- _ -> case
- reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
- end
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false when TargetRamCount == infinity -> false;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -1337,21 +1331,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
-
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);