summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-13 01:11:30 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-13 01:11:30 +0000
commit3bd08a3f8f02048e2fa0d1d7da09393d7d624305 (patch)
tree12b8a98916905bd0e5705ba3b29baa142ba69f9d
parentc8c64e8ef1dfd9bf6ac1c4f73e987b874d13912c (diff)
downloadrabbitmq-server-3bd08a3f8f02048e2fa0d1d7da09393d7d624305.tar.gz
optimise confirms for unroutable messages; don't do any unnecessary set/dict operations
-rw-r--r--src/rabbit_channel.erl25
1 files changed, 13 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 930e48e6..f4122e95 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -475,13 +475,14 @@ confirm([], _QPid, State) ->
State;
confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
- MS = gb_sets:from_list(MsgSeqNos),
- QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM),
- send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS),
- queues_for_msg = QFM1});
+confirm(MsgSeqNos, undefined, State) ->
+ %% This case is for confirms originating in the channel: it is
+ %% only triggered by unroutable messages marked mandatory or
+ %% immediate or messages routed to zero queues. In this case, 1)
+ %% the unconfirmed set does not contain the message, and 2) the
+ %% message was not delivered to any queues, so queues_for_msg will
+ %% not have any relevant entries.
+ send_confirms(MsgSeqNos, State);
confirm(MsgSeqNos, QPid, State) ->
{DoneMessages, State1} =
lists:foldl(
@@ -544,9 +545,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo,
- State#ch{publish_seqno = SeqNo + 1,
- unconfirmed =
- gb_sets:add(SeqNo, State#ch.unconfirmed)}}
+ State#ch{publish_seqno = SeqNo + 1}}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -1227,10 +1226,12 @@ process_routing_result(routed, [], MsgSeqNo, _, State) ->
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
+ State = #ch{queues_for_msg = QFM,
+ unconfirmed = UC}) ->
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = QFM1,
+ unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};