diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 14:36:35 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 14:36:35 +0000 |
commit | bbe604746948bb799fdd496ffb9d49b62ca148b2 (patch) | |
tree | 535b825bd9ce66e4c17389cfaaf94d968f5fa8b2 /src/rabbit_variable_queue.erl | |
parent | 12eea5e00a656f2dda3888ff089f7c4d47acb40c (diff) | |
download | rabbitmq-server-bbe604746948bb799fdd496ffb9d49b62ca148b2.tar.gz |
don't remove rejected message before it is confirmed by the DLQ
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 54 |
1 files changed, 28 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 34a28afe..811017d9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/4, fetch/2, ack/4, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -581,19 +581,18 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, MsgSeqNo, State) -> +dropwhile(Pred, MsgFun, State) -> case queue_out(State) of {empty, State1} -> - {MsgSeqNo, a(State1)}; + a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of true -> - {MsgSeqNo1, State2} = - MsgFun(read_msg_callback(MsgStatus), MsgSeqNo, State1), + State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1), {_, State3} = internal_fetch(false, MsgStatus, State2), - dropwhile(Pred, MsgFun, MsgSeqNo1, State3); + dropwhile(Pred, MsgFun, State3); false -> - {MsgSeqNo, a(in_r(MsgStatus, State1))} + a(in_r(MsgStatus, State1)) end end. @@ -626,39 +625,42 @@ read_msg_callback1(MsgId, IsPersistent, msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate { msg_store_clients = MSCState1 }}. -ack([], _Fun, MsgSeqNo, State) -> - {[], MsgSeqNo, State}; +ack([], _Fun, State) -> + {[], State}; -ack(AckTags, MsgFun, MsgSeqNo, State) -> +ack(AckTags, undefined, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, - {MsgSeqNo2, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - persistent_count = PCount, - ack_out_counter = AckOutCount }}} = + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, {MsgSeqNo1, State2 = #vqstate{pending_ack = PA}}}) -> - AckEntry = gb_trees:get(SeqId, PA), + fun (SeqId, {Acc, State2}) -> {MsgStatus, State3} = remove_pending_ack(SeqId, State2), - {accumulate_ack(MsgStatus, Acc), - MsgFun(read_msg_callback(AckEntry), MsgSeqNo1, State3)} - end, {accumulate_ack_init(), {MsgSeqNo, State}}, AckTags), + {accumulate_ack(MsgStatus, Acc), State3} + end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), - MsgSeqNo2, a(State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) })}. + ack_out_counter = AckOutCount + length(AckTags) })}; + +ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> + [begin + AckEntry = gb_trees:get(SeqId, PA), + MsgFun(read_msg_callback(AckEntry), SeqId, State) + end || SeqId <- AckTags], + {[], State}. requeue(AckTags, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), fun publish_alpha/2, State), |