summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-11 12:02:44 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-11 12:02:44 +0000
commitc2206e2db98730e6102b5d774e3af3dc8c743b3f (patch)
tree52113092f174ee4614f756947b03209f121eb7e2
parentf509871c161571d9e872e66b47662033349e8c00 (diff)
downloadrabbitmq-server-c2206e2db98730e6102b5d774e3af3dc8c743b3f.tar.gz
don't confirm messages on requeue
-rw-r--r--src/rabbit_variable_queue.erl16
1 files changed, 10 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 54cda76f..9b3cc58a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -621,9 +621,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
pending_ack = PA1 })}.
ack(AckTags, State) ->
- {Guids, State1} = ack(fun msg_store_remove/3,
- fun (_AckEntry, State1) -> State1 end,
- AckTags, State),
+ {Guids, State1} =
+ ack(fun msg_store_remove/3,
+ fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
+ remove_confirms(gb_sets:singleton(Guid), State1),
+ State1
+ end,
+ AckTags, State),
{Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
@@ -832,7 +836,8 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set).
+%% when requeueing, we re-add a guid to the unconfimred set
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
MsgProps) ->
@@ -1271,10 +1276,9 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
MsgStoreFun(MSCState, IsPersistent, Guids),
[Guids | Gs]
end, [], GuidsByStore)),
- State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {AckdGuids, State2 #vqstate { index_state = IndexState1,
+ {AckdGuids, State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS