summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Fox <tim@rabbitmq.com>2011-02-16 17:01:43 +0000
committerTim Fox <tim@rabbitmq.com>2011-02-16 17:01:43 +0000
commit0b2b3854a9f2a7480560b5200cf87d797e57119b (patch)
tree73b4778cdd36f357af8079f76f38130d4cd20f82
parent5855919d941d049d56a372d1763112302fbece34 (diff)
downloadrabbitmq-server-0b2b3854a9f2a7480560b5200cf87d797e57119b.tar.gz
Applied Matthias recommendations from comment#2 of bugzilla with the exception of using gb_sets throughout (will contact Alexandru first)
-rw-r--r--src/rabbit_channel.erl86
1 files changed, 46 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f49dbd93..f69a49b1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,8 +34,8 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed,
- queue_unconfirmed}).
+ confirm_enabled, publish_seqno, unconfirmed_mq, confirmed,
+ unconfirmed_qm}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -174,9 +174,9 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty(),
+ unconfirmed_mq = gb_trees:empty(),
confirmed = [],
- queue_unconfirmed = dict:new()},
+ unconfirmed_qm = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -280,12 +280,16 @@ handle_info(timeout, State) ->
noreply(State);
handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{queue_unconfirmed = QU}) ->
- MsgSeqNos = case dict:find(QPid, QU) of
- {ok, MsgSet} -> gb_sets:to_list(MsgSet);
+ State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case dict:find(QPid, UQM) of
+ {ok, MsgSet} -> gb_sets:to_list(MsgSet);
error -> []
end,
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ %% We remove the MsgSeqNos from UQM before calling process_confirms to
+ %% prevent each MsgSeqNo being removed from the set one by one which
+ %% which would be inefficient
+ NewState = State#ch{unconfirmed_qm = dict:erase(QPid, UQM)},
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, NewState),
erase_queue_stats(QPid),
State2 = (case Reason of
normal -> fun record_confirms/2;
@@ -493,35 +497,35 @@ confirm(MsgSeqNos, QPid, State) ->
{MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed = UC, queue_unconfirmed = QU}) ->
- {MXs, UC1, QU1} =
+process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
+ {MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0, _QU} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
- {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
- end
- end, {[], UC, QU}, MsgSeqNos),
- {MXs, State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}}.
-
-remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC, QU}, State) ->
+ fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ none -> Acc;
+ {value, XQ} -> confirm_msg(MsgSeqNo, QPid, XQ, Acc, State)
+ end
+ end, {[], UMQ, UQM}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
+
+confirm_msg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC, UQM}, State) ->
Qs1 = sets:del_element(QPid, Qs),
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
-
- case dict:find(QPid, QU) of
- {ok, Msgs} -> Msgs1 = gb_sets:delete(MsgSeqNo, Msgs),
- case gb_sets:is_empty(Msgs1) of
- true -> QU1 = dict:erase(QPid, QU);
- false -> QU1 = dict:store(QPid, Msgs1, QU)
- end;
- _ -> QU1 = QU
- end,
-
+ UQM1 =
+ case dict:find(QPid, UQM) of
+ {ok, Msgs} -> Msgs1 = gb_sets:delete(MsgSeqNo, Msgs),
+ case gb_sets:is_empty(Msgs1) of
+ true -> dict:erase(QPid, UQM);
+ false -> dict:store(QPid, Msgs1, UQM)
+ end;
+ error -> UQM
+ end,
case sets:size(Qs1) of
- 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC), QU1};
- _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC), QU1}
+ 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC), UQM1};
+ _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1258,14 +1262,16 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed = UC, queue_unconfirmed = QU} = State,
- UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
- QU1 = lists:foldl(fun (QPid, QU2) ->
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ UMQ1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UMQ),
+ SingletonSet = gb_sets:singleton(MsgSeqNo),
+ UQM1 = lists:foldl(fun (QPid, UQM2) ->
maybe_monitor(QPid),
- dict:update(QPid, fun (Msgs)-> gb_sets:add(MsgSeqNo, Msgs) end,
- gb_sets:singleton(MsgSeqNo), QU2)
- end, QU, QPids),
- State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}.
+ dict:update(QPid, fun (Msgs)-> gb_sets:add(MsgSeqNo,
+ Msgs) end,
+ SingletonSet, UQM2)
+ end, UQM, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1301,7 +1307,7 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed_mq = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
@@ -1332,7 +1338,7 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+i(messages_unconfirmed, #ch{unconfirmed_mq = UC}) ->
gb_trees:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->