diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-17 10:32:30 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-17 10:32:30 +0000 |
commit | 2185b71116ace8425723168cf90ab9e234d451bb (patch) | |
tree | b44f87b1fd274e646d338dbb4130268d3b0f308b | |
parent | 9208142f12f1dde74a5aeade8420cab43ef90e41 (diff) | |
download | rabbitmq-server-2185b71116ace8425723168cf90ab9e234d451bb.tar.gz |
coalesce confirms in channel, for better performance
We accumulate confirms in the channel and only send them when the
channel goes idle.
-rw-r--r-- | src/rabbit_channel.erl | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5c900b0b..47a721bd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed}). + confirm_enabled, publish_seqno, unconfirmed, confirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -186,7 +186,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty()}, + unconfirmed = gb_trees:empty(), + confirmed = []}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -283,7 +284,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> hibernate}; handle_cast({confirm, MsgSeqNos, From}, State) -> - {noreply, confirm(MsgSeqNos, From, State), hibernate}. + noreply(confirm(MsgSeqNos, From, State)). + +handle_info(timeout, State = #ch{confirmed = C}) -> + {noreply, send_confirms(lists:append(C), State #ch{confirmed = []}), + hibernate}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{unconfirmed = UC}) -> @@ -293,9 +298,9 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, {MsgSeqNos, UC1} = remove_queue_unconfirmed( gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}), - State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State1), hibernate}. + noreply(queue_blocked(QPid, record_confirms(MsgSeqNos, + State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -325,11 +330,15 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- +reply(Reply, NewState = #ch{confirmed = []}) -> + {reply, Reply, ensure_stats_timer(NewState), hibernate}; reply(Reply, NewState) -> - {reply, Reply, ensure_stats_timer(NewState), hibernate}. + {reply, Reply, ensure_stats_timer(NewState), 0}. +noreply(NewState = #ch{confirmed = []}) -> + {noreply, ensure_stats_timer(NewState), hibernate}; noreply(NewState) -> - {noreply, ensure_stats_timer(NewState), hibernate}. + {noreply, ensure_stats_timer(NewState), 0}. ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), @@ -474,6 +483,11 @@ remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). +record_confirms([], State) -> + State; +record_confirms(MsgSeqNos, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MsgSeqNos | C]}. + confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> @@ -485,7 +499,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) end end, {[], UC}, MsgSeqNos), - send_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + record_confirms(DoneMessages, State#ch{unconfirmed = UC2}). remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> Qs1 = sets:del_element(QPid, Qs), |