summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-29 09:04:22 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-29 09:04:22 +0000
commitfeb225f1a7a842d565047c1d1a91c74be053b548 (patch)
treee24739d81ad79bdca74dd97ca40ef8fcbd50e0e6
parent3e86486f8505085fb2d7b6a74b260c61951f8c20 (diff)
downloadrabbitmq-server-feb225f1a7a842d565047c1d1a91c74be053b548.tar.gz
amqqueue confirms in batches
-rw-r--r--src/rabbit_amqqueue_process.erl36
1 files changed, 28 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e6e3989f..ca77be1e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -426,14 +426,34 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
confirm_messages(Guids, State) ->
- lists:foldl(fun confirm_message_by_guid/2, State, Guids).
+ {CMs, State1} = annote_confirms_with_channel(Guids, State),
+ CMs1 = group_confirms_by_channel(CMs),
+ [rabbit_channel:confirm(ChPid, Msgs) || {ChPid, Msgs} <- CMs1],
+ State1.
+
+annote_confirms_with_channel(Guids, State) ->
+ lists:foldl(fun(Guid, {CMs, State0 = #q{guid_to_channel = GTC0}}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs],
+ State0#q{guid_to_channel =
+ dict:erase(Guid, GTC0)}};
+ _ ->
+ {CMs, State0}
+ end
+ end, {[], State}, Guids).
-confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
- case dict:find(Guid, GTC) of
- {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
- end,
- State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+group_confirms_by_channel([]) ->
+ [];
+group_confirms_by_channel([{Ch, Msg} | CMs]) ->
+ group_confirms_by_channel(lists:usort(CMs), [{Ch, [Msg]}]).
+
+group_confirms_by_channel([], Acc) ->
+ Acc;
+group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
@@ -466,7 +486,7 @@ attempt_delivery(#delivery{txn = none,
NeedsConfirming = Message#basic_message.is_persistent andalso
Q#amqqueue.durable,
case NeedsConfirming of
- false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
+ false -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,