diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-29 09:04:22 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-29 09:04:22 +0000 |
commit | feb225f1a7a842d565047c1d1a91c74be053b548 (patch) | |
tree | e24739d81ad79bdca74dd97ca40ef8fcbd50e0e6 | |
parent | 3e86486f8505085fb2d7b6a74b260c61951f8c20 (diff) | |
download | rabbitmq-server-feb225f1a7a842d565047c1d1a91c74be053b548.tar.gz |
amqqueue confirms in batches
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 |
1 files changed, 28 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e6e3989f..ca77be1e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -426,14 +426,34 @@ deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. confirm_messages(Guids, State) -> - lists:foldl(fun confirm_message_by_guid/2, State, Guids). + {CMs, State1} = annote_confirms_with_channel(Guids, State), + CMs1 = group_confirms_by_channel(CMs), + [rabbit_channel:confirm(ChPid, Msgs) || {ChPid, Msgs} <- CMs1], + State1. + +annote_confirms_with_channel(Guids, State) -> + lists:foldl(fun(Guid, {CMs, State0 = #q{guid_to_channel = GTC0}}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], + State0#q{guid_to_channel = + dict:erase(Guid, GTC0)}}; + _ -> + {CMs, State0} + end + end, {[], State}, Guids). -confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> - case dict:find(Guid, GTC) of - {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok - end, - State#q{guid_to_channel = dict:erase(Guid, GTC)}. +group_confirms_by_channel([]) -> + []; +group_confirms_by_channel([{Ch, Msg} | CMs]) -> + group_confirms_by_channel(lists:usort(CMs), [{Ch, [Msg]}]). + +group_confirms_by_channel([], Acc) -> + Acc; +group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); +group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; @@ -466,7 +486,7 @@ attempt_delivery(#delivery{txn = none, NeedsConfirming = Message#basic_message.is_persistent andalso Q#amqqueue.durable, case NeedsConfirming of - false -> rabbit_channel:confirm(ChPid, MsgSeqNo); + false -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, |