diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-13 01:11:30 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-13 01:11:30 +0000 |
commit | 3bd08a3f8f02048e2fa0d1d7da09393d7d624305 (patch) | |
tree | 12b8a98916905bd0e5705ba3b29baa142ba69f9d | |
parent | c8c64e8ef1dfd9bf6ac1c4f73e987b874d13912c (diff) | |
download | rabbitmq-server-3bd08a3f8f02048e2fa0d1d7da09393d7d624305.tar.gz |
optimise confirms for unroutable messages; don't do any unnecessary set/dict operations
-rw-r--r-- | src/rabbit_channel.erl | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 930e48e6..f4122e95 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -475,13 +475,14 @@ confirm([], _QPid, State) -> State; confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) -> State; -confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)], - MS = gb_sets:from_list(MsgSeqNos), - QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM), - send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS), - queues_for_msg = QFM1}); +confirm(MsgSeqNos, undefined, State) -> + %% This case is for confirms originating in the channel: it is + %% only triggered by unroutable messages marked mandatory or + %% immediate or messages routed to zero queues. In this case, 1) + %% the unconfirmed set does not contain the message, and 2) the + %% message was not delivered to any queues, so queues_for_msg will + %% not have any relevant entries. + send_confirms(MsgSeqNos, State); confirm(MsgSeqNos, QPid, State) -> {DoneMessages, State1} = lists:foldl( @@ -544,9 +545,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, false -> {undefined, State}; true -> SeqNo = State#ch.publish_seqno, {SeqNo, - State#ch{publish_seqno = SeqNo + 1, - unconfirmed = - gb_sets:add(SeqNo, State#ch.unconfirmed)}} + State#ch{publish_seqno = SeqNo + 1}} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -1227,10 +1226,12 @@ process_routing_result(routed, [], MsgSeqNo, _, State) -> process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> + State = #ch{queues_for_msg = QFM, + unconfirmed = UC}) -> QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = QFM1, + unconfirmed = gb_sets:add(MsgSeqNo, UC)}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; |