summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-09-29 10:14:36 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-09-29 10:14:36 +0100
commitfa79e2edcab3f749d3b1ce45234d0f0547cc7eca (patch)
tree33c57781dbad071fb12288f45672c71cba689e41
parent3a61cb9d66fabd330f9f9dacc66cef9a7f88e645 (diff)
downloadrabbitmq-server-fa79e2edcab3f749d3b1ce45234d0f0547cc7eca.tar.gz
Only ack messages when necessary
-rw-r--r--src/rabbit_variable_queue.erl21
1 files changed, 11 insertions, 10 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index df8bb312..d11aa639 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1169,7 +1169,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {MsgIdsByStore, _AllMsgIds} =
+ {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1181,7 +1181,7 @@ remove_pending_ack(KeepPersistent,
State1
end;
false -> IndexState1 =
- rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState),
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
@@ -1190,7 +1190,7 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
{[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{MsgIdsByStore, AllMsgIds},
+ {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1205,7 +1205,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(AckTags, IndexState),
+ IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
@@ -1215,17 +1215,18 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false,
msg_id = MsgId },
- {MsgIdsByStore, AllMsgIds}) ->
- {MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
- {MsgIdsByStore, AllMsgIds}) ->
- {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
+accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, IndexOnDisk},
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
[MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->