diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-11 12:02:44 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-11 12:02:44 +0000 |
commit | c2206e2db98730e6102b5d774e3af3dc8c743b3f (patch) | |
tree | 52113092f174ee4614f756947b03209f121eb7e2 | |
parent | f509871c161571d9e872e66b47662033349e8c00 (diff) | |
download | rabbitmq-server-c2206e2db98730e6102b5d774e3af3dc8c743b3f.tar.gz |
don't confirm messages on requeue
-rw-r--r-- | src/rabbit_variable_queue.erl | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 54cda76f..9b3cc58a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -621,9 +621,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { pending_ack = PA1 })}. ack(AckTags, State) -> - {Guids, State1} = ack(fun msg_store_remove/3, - fun (_AckEntry, State1) -> State1 end, - AckTags, State), + {Guids, State1} = + ack(fun msg_store_remove/3, + fun ({_IsPersistent, Guid, _MsgProps}, State1) -> + remove_confirms(gb_sets:singleton(Guid), State1), + State1 + end, + AckTags, State), {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, @@ -832,7 +836,8 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set). +%% when requeueing, we re-add a guid to the unconfimred set +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProps) -> @@ -1271,10 +1276,9 @@ ack(MsgStoreFun, Fun, AckTags, State) -> MsgStoreFun(MSCState, IsPersistent, Guids), [Guids | Gs] end, [], GuidsByStore)), - State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {AckdGuids, State2 #vqstate { index_state = IndexState1, + {AckdGuids, State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS |