summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-30 21:16:03 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-30 21:16:03 +0000
commit7f6543ca82df40f3710262aed52a3076898310dc (patch)
tree1459123426bb678735938df0163d7c010e412b4c
parentb13b647820669e190ce7c8451d3c85496864c6f7 (diff)
downloadrabbitmq-server-7f6543ca82df40f3710262aed52a3076898310dc.tar.gz
ask qi whether it needs sync'ing, and why
-rw-r--r--src/rabbit_queue_index.erl9
-rw-r--r--src/rabbit_variable_queue.erl40
2 files changed, 17 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 07117f9a..a055742e 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -210,7 +210,7 @@
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
--spec(needs_sync/1 :: (qistate()) -> boolean()).
+-spec(needs_sync/1 :: (qistate()) -> 'confirms' | boolean()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -305,8 +305,11 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
needs_sync(#qistate { journal_handle = undefined }) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_sync(JournalHdl).
+needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
+ case gb_sets:is_empty(UC) of
+ true -> file_handle_cache:needs_sync(JournalHdl);
+ false -> confirms
+ end.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1acc9ef0..0be81569 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -755,20 +755,17 @@ ram_duration(State = #vqstate {
ram_ack_count_prev = RamAckCount }}.
needs_timeout(State = #vqstate { index_state = IndexState }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ true -> idle;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -1304,21 +1301,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
-
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);