summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-17 10:32:30 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-17 10:32:30 +0000
commit2185b71116ace8425723168cf90ab9e234d451bb (patch)
treeb44f87b1fd274e646d338dbb4130268d3b0f308b
parent9208142f12f1dde74a5aeade8420cab43ef90e41 (diff)
downloadrabbitmq-server-2185b71116ace8425723168cf90ab9e234d451bb.tar.gz
coalesce confirms in channel, for better performance
We accumulate confirms in the channel and only send them when the channel goes idle.
-rw-r--r--src/rabbit_channel.erl30
1 files changed, 22 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5c900b0b..47a721bd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed}).
+ confirm_enabled, publish_seqno, unconfirmed, confirmed}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -186,7 +186,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty()},
+ unconfirmed = gb_trees:empty(),
+ confirmed = []},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -283,7 +284,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
hibernate};
handle_cast({confirm, MsgSeqNos, From}, State) ->
- {noreply, confirm(MsgSeqNos, From, State), hibernate}.
+ noreply(confirm(MsgSeqNos, From, State)).
+
+handle_info(timeout, State = #ch{confirmed = C}) ->
+ {noreply, send_confirms(lists:append(C), State #ch{confirmed = []}),
+ hibernate};
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{unconfirmed = UC}) ->
@@ -293,9 +298,9 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
{MsgSeqNos, UC1} = remove_queue_unconfirmed(
gb_trees:next(gb_trees:iterator(UC)), QPid,
{[], UC}),
- State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}),
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State1), hibernate}.
+ noreply(queue_blocked(QPid, record_confirms(MsgSeqNos,
+ State#ch{unconfirmed = UC1}))).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -325,11 +330,15 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
+reply(Reply, NewState = #ch{confirmed = []}) ->
+ {reply, Reply, ensure_stats_timer(NewState), hibernate};
reply(Reply, NewState) ->
- {reply, Reply, ensure_stats_timer(NewState), hibernate}.
+ {reply, Reply, ensure_stats_timer(NewState), 0}.
+noreply(NewState = #ch{confirmed = []}) ->
+ {noreply, ensure_stats_timer(NewState), hibernate};
noreply(NewState) ->
- {noreply, ensure_stats_timer(NewState), hibernate}.
+ {noreply, ensure_stats_timer(NewState), 0}.
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
@@ -474,6 +483,11 @@ remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+record_confirms([], State) ->
+ State;
+record_confirms(MsgSeqNos, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MsgSeqNos | C]}.
+
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
@@ -485,7 +499,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
end
end, {[], UC}, MsgSeqNos),
- send_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ record_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
Qs1 = sets:del_element(QPid, Qs),