summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 14:36:35 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 14:36:35 +0000
commitbbe604746948bb799fdd496ffb9d49b62ca148b2 (patch)
tree535b825bd9ce66e4c17389cfaaf94d968f5fa8b2 /src/rabbit_mirror_queue_slave.erl
parent12eea5e00a656f2dda3888ff089f7c4d47acb40c (diff)
downloadrabbitmq-server-bbe604746948bb799fdd496ffb9d49b62ca148b2.tar.gz
don't remove rejected message before it is confirmed by the DLQ
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl8
1 files changed, 4 insertions, 4 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7238b169..a8c2006d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -417,7 +417,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
Acc
end
end, {gb_trees:empty(), MS}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State #state { msg_id_status = MS1 }.
handle_process_result({ok, State}) -> noreply(State);
@@ -649,7 +649,7 @@ maybe_enqueue_message(
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { sender_queues = SQ1,
msg_id_status = dict:erase(MsgId, MS) };
@@ -666,7 +666,7 @@ maybe_enqueue_message(
msg_id_status =
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
@@ -728,7 +728,7 @@ process_instruction(
{MQ2, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
{MQ2, PendingCh, MS}
end;
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->