diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-22 11:55:35 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-22 11:55:35 +0000 |
commit | 3a29d0d519589d8aba1011d85ae36e93e75a2989 (patch) | |
tree | 0f26cd77ee141e4002ffaa239dc38c540e743c1a /src/rabbit_channel.erl | |
parent | 860e1265688f1d56150b6a980e3460829ae4751e (diff) | |
download | rabbitmq-server-3a29d0d519589d8aba1011d85ae36e93e75a2989.tar.gz |
Substantially improve both performance and correctness: 1) really don't treat presence of MsgSeqNo as a cue we are doing confirms, and 2) get the right dtree function when we receive a MsgSeqNo.
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b862766a..346dd2e1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -633,7 +633,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) -> - {_MMsgs, Mand1} = dtree:take_all([MsgSeqNo], Mand), + Mand1 = dtree:drop(MsgSeqNo, Mand), State#ch{mandatory = Mand1}. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -708,16 +708,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(Props, State), check_expiration_header(Props), + DoConfirm = Tx =/= none orelse ConfirmEnabled, {MsgSeqNo, State1} = - case {Tx, ConfirmEnabled orelse Mandatory} of - {none, false} -> {undefined, State}; - {_, _} -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case DoConfirm orelse Mandatory of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), + Delivery = rabbit_basic:delivery( + Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), DQ = {Delivery, QNames}, {noreply, case Tx of @@ -1511,7 +1513,6 @@ notify_limiter(Limiter, Acked) -> end. deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - msg_seq_no = undefined, mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), @@ -1519,6 +1520,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, mandatory = Mandatory, + confirmed = Confirmed, msg_seq_no = MsgSeqNo}, DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> @@ -1542,10 +1544,12 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ false -> dict:store(QPid, QName, QNames0) end, pmon:monitor(QPid, QMons0)} end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), - State1 = process_routing_result(DeliveredQPids, XName, Mandatory, MsgSeqNo, - Message, - State#ch{queue_names = QNames1, - queue_monitors = QMons1}), + State1 = process_routing_confirm( + DeliveredQPids, Confirmed, MsgSeqNo, XName, + process_routing_mandatory( + DeliveredQPids, Mandatory, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1})), ?INCR_STATS([{exchange_stats, XName, 1} | [{queue_exchange_stats, {QName, XName}, 1} || QPid <- DeliveredQPids, @@ -1555,22 +1559,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ %% TODO unbreak basic.return stats -process_routing_result(_, _, _, undefined, _Msg, State) -> +process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) -> State; -process_routing_result([], XName, false, MsgSeqNo, _Msg, State) -> - record_confirms([{MsgSeqNo, XName}], State); -process_routing_result([], XName, true, MsgSeqNo, Msg, State) -> +process_routing_mandatory([], true, _MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), + State; +process_routing_mandatory(QPids, true, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert( + MsgSeqNo, QPids, Msg, State#ch.mandatory)}. + +process_routing_confirm(_, false, _MsgSeqNo, _XName, State) -> + State; +process_routing_confirm([], true, MsgSeqNo, XName, State) -> + exit(bang), record_confirms([{MsgSeqNo, XName}], State); -process_routing_result(QPids, XName, Mandatory, MsgSeqNo, Msg, State) -> - MandatoryTree = case Mandatory of - false -> State#ch.mandatory; - true -> dtree:insert(MsgSeqNo, QPids, Msg, - State#ch.mandatory) - end, +process_routing_confirm(QPids, true, MsgSeqNo, XName, State) -> + exit(bang), State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed), - mandatory = MandatoryTree}. + State#ch.unconfirmed)}. %% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> %% ok = basic_return(Msg, State, no_route), |