diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 22:29:04 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 22:29:04 +0000 |
commit | 8b5f304474d529bce7ad7e0be8e00392baffbf8c (patch) | |
tree | d2dae11bf47d9dcca629ca0a22bb10a857264737 | |
parent | c409185407227ffd5830c03e922b61b1891219a6 (diff) | |
download | rabbitmq-server-8b5f304474d529bce7ad7e0be8e00392baffbf8c.tar.gz |
some inlining, cosmetics, and more explicit sequencing
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 42 |
2 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c65347e4..a698a435 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -505,7 +505,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - send_mandatory(Delivery), + send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9bcded68..1da4b0fd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -348,9 +348,9 @@ handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); -handle_cast({mandatory_received, MsgSeqNo}, State) -> +handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. - noreply_coalesce(handle_mandatory(MsgSeqNo, State)); + noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); handle_cast({confirm, MsgSeqNos, From}, State) -> %% NB: don't call noreply/1 since we don't want to send confirms. @@ -631,10 +631,6 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), record_confirms(MXs, State#ch{unconfirmed = UC1}). -handle_mandatory(MsgSeqNo, State = #ch{mandatory = Mand}) -> - Mand1 = dtree:drop(MsgSeqNo, Mand), - State#ch{mandatory = Mand1}. - handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? State1 = State#ch{state = running}, @@ -1545,33 +1541,35 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ false -> dict:store(QPid, QName, QNames0) end, pmon:monitor(QPid, QMons0)} end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), - State1 = process_routing_confirm( - DeliveredQPids, Confirm, MsgSeqNo, XName, - process_routing_mandatory( - DeliveredQPids, Mandatory, MsgSeqNo, Message, - State#ch{queue_names = QNames1, - queue_monitors = QMons1})), + State1 = State#ch{queue_names = QNames1, + queue_monitors = QMons1}, + %% NB: the order here is important since basic.returns must be + %% sent before confirms. + State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo, + Message, State1), + State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, + XName, State2), ?INCR_STATS([{exchange_stats, XName, 1} | [{queue_exchange_stats, {QName, XName}, 1} || QPid <- DeliveredQPids, {ok, QName} <- [dict:find(QPid, QNames1)]]], - publish, State1), - State1. + publish, State3), + State3. -process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) -> +process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> State; -process_routing_mandatory([], true, _MsgSeqNo, Msg, State) -> +process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), State; -process_routing_mandatory(QPids, true, MsgSeqNo, Msg, State) -> - State#ch{mandatory = dtree:insert( - MsgSeqNo, QPids, Msg, State#ch.mandatory)}. +process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, + State#ch.mandatory)}. -process_routing_confirm(_, false, _MsgSeqNo, _XName, State) -> +process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> State; -process_routing_confirm([], true, MsgSeqNo, XName, State) -> +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_confirm(QPids, true, MsgSeqNo, XName, State) -> +process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. |