diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 14:36:35 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 14:36:35 +0000 |
commit | bbe604746948bb799fdd496ffb9d49b62ca148b2 (patch) | |
tree | 535b825bd9ce66e4c17389cfaaf94d968f5fa8b2 /src/rabbit_mirror_queue_slave.erl | |
parent | 12eea5e00a656f2dda3888ff089f7c4d47acb40c (diff) | |
download | rabbitmq-server-bbe604746948bb799fdd496ffb9d49b62ca148b2.tar.gz |
don't remove rejected message before it is confirmed by the DLQ
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7238b169..a8c2006d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -417,7 +417,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> Acc end end, {gb_trees:empty(), MS}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State #state { msg_id_status = MS1 }. handle_process_result({ok, State}) -> noreply(State); @@ -649,7 +649,7 @@ maybe_enqueue_message( {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { sender_queues = SQ1, msg_id_status = dict:erase(MsgId, MS) }; @@ -666,7 +666,7 @@ maybe_enqueue_message( msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } @@ -728,7 +728,7 @@ process_instruction( {MQ2, sets:add_element(MsgId, PendingCh), dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), {MQ2, PendingCh, MS} end; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> |