diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-24 17:53:37 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-24 17:53:37 +0100 |
commit | 5749a9a6c1f0e1845a07e55aae3a41602f8ab8c2 (patch) | |
tree | e3696cc129f77d9585817528ed99b17bea1037a3 | |
parent | 1250b1ce9cb51af5da445bb781580b6f8621cf43 (diff) | |
parent | ea7ca9c0d32ed23df7296db66a1f8e6da7f99c3d (diff) | |
download | rabbitmq-server-5749a9a6c1f0e1845a07e55aae3a41602f8ab8c2.tar.gz |
Merging bug21673 into bug22896
-rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1a9301c0..06a9f9e7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -510,7 +510,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end. ack(AckTags, State) -> - a(ack(fun (_AckEntry, State1) -> State1 end, AckTags, State)). + a(ack(remove, fun (_AckEntry, State1) -> State1 end, AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, State = #vqstate { durable = IsDurable, @@ -562,7 +562,8 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> requeue(AckTags, State) -> a(reduce_memory_use( - ack(fun (#msg_status { msg = Msg }, State1) -> + ack(release, + fun (#msg_status { msg = Msg }, State1) -> {_SeqId, State2} = publish(Msg, true, false, State1), State2; ({IsPersistent, Guid}, State1) -> @@ -851,9 +852,10 @@ beta_fold(Fun, Init, Q) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -ack(_Fun, [], State) -> +ack(_ReleaseOrRemove, _Fun, [], State) -> State; -ack(Fun, AckTags, State) -> +ack(ReleaseOrRemove, Fun, AckTags, State) when ReleaseOrRemove =:= remove orelse + ReleaseOrRemove =:= release -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( @@ -871,7 +873,7 @@ ack(Fun, AckTags, State) -> end, {{[], dict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:release(MsgStore, Guids) + rabbit_msg_store:ReleaseOrRemove(MsgStore, Guids) end, ok, GuidsByStore), PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; |